do more insertion batching for db in importers

This commit is contained in:
Joe Ardent 2024-01-28 20:50:29 -08:00
parent 4343abfb7b
commit 14925684ee
3 changed files with 23 additions and 12 deletions

View file

@ -34,7 +34,7 @@ fn main() {
let cli = Cli::parse(); let cli = Cli::parse();
let ids = rt.block_on(import_watches(&w2w_db, &cli)); let ids = rt.block_on(import_watches(&w2w_db, &cli));
rt.block_on(save_ids(&cli.db_path, &ids)); rt.block_on(save_ids(&cli.db_path, ids));
} }
async fn import_watches(w2w_db: &SqlitePool, cli: &Cli) -> IdMap { async fn import_watches(w2w_db: &SqlitePool, cli: &Cli) -> IdMap {
@ -59,28 +59,38 @@ async fn import_watches(w2w_db: &SqlitePool, cli: &Cli) -> IdMap {
map map
} }
async fn save_ids(path: &OsStr, ids: &IdMap) { async fn save_ids(path: &OsStr, ids: IdMap) {
let path = Path::new(path); let path = Path::new(path);
let file = path.file_name().unwrap(); let file = path.file_name().unwrap();
let file = file.to_str().unwrap(); let file = file.to_str().unwrap();
let path = format!("{}/w2w-{file}", path.parent().unwrap().to_str().unwrap()); let path = format!("{}/w2w-{file}", path.parent().unwrap().to_str().unwrap());
let conn_opts = SqliteConnectOptions::new() let conn_opts = SqliteConnectOptions::new()
.journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
.synchronous(sqlx::sqlite::SqliteSynchronous::Normal)
.filename(path) .filename(path)
.create_if_missing(true); .create_if_missing(true)
.pragma("mmap_size", "3000000000");
let mut conn = SqliteConnection::connect_with(&conn_opts).await.unwrap(); let mut conn = SqliteConnection::connect_with(&conn_opts).await.unwrap();
let create = let create =
"create table if not exists id_map (imdb text not null primary key, w2w blob not null)"; "create table if not exists id_map (imdb text not null primary key, w2w blob not null)";
let _ = sqlx::query(create).execute(&mut conn).await.unwrap(); let _ = sqlx::query(create).execute(&mut conn).await.unwrap();
for (imdb, w2w) in ids.iter() { let limit = 5000;
sqlx::query("insert into id_map (imdb, w2w) values (?, ?)") let num_ids = ids.len();
.bind(imdb) let mut num_rows = 0;
.bind(w2w) let ids = &mut ids.into_iter();
.execute(&mut conn) while num_rows < num_ids {
.await let num_rows = &mut num_rows;
.unwrap(); let mut q = sqlx::QueryBuilder::new("insert into id_map (imdb, w2w) ");
q.push_values(ids.take(limit), |mut qb, row| {
qb.push_bind(row.0.clone());
qb.push_bind(row.1);
*num_rows += 1;
});
q.build().execute(&mut conn).await.unwrap();
} }
conn.close().await.unwrap(); conn.close().await.unwrap();
} }

View file

@ -43,7 +43,8 @@ pub fn get_db_pool() -> SqlitePool {
// be sure to have run `make` so that the libjulid extension is built // be sure to have run `make` so that the libjulid extension is built
.extension("./libjulid") .extension("./libjulid")
.busy_timeout(Duration::from_secs(TIMEOUT)) .busy_timeout(Duration::from_secs(TIMEOUT))
.create_if_missing(true); .create_if_missing(true)
.pragma("mmap_size", "3000000000");
let pool = SqlitePoolOptions::new() let pool = SqlitePoolOptions::new()
.max_connections(MAX_CONNS) .max_connections(MAX_CONNS)

View file

@ -82,7 +82,7 @@ pub async fn import_imdb_data(w2w_db: &SqlitePool, imdb: &SqlitePool, ids: &mut
.await .await
.unwrap(); .unwrap();
for batch in iwatches.chunks(1_000) { for batch in iwatches.chunks(2_000) {
let mut tx = w2w_db.acquire().await.unwrap(); let mut tx = w2w_db.acquire().await.unwrap();
let mut tx = tx.begin().await.unwrap(); let mut tx = tx.begin().await.unwrap();
for iwatch in batch { for iwatch in batch {