From e93e43786ff5cf64cff0221cb24b71e5273f7704 Mon Sep 17 00:00:00 2001 From: Joe Ardent Date: Tue, 4 Jul 2023 11:30:42 -0700 Subject: [PATCH] inserts 10k rows in 6 seconds --- Cargo.lock | 13 +++++++++++++ Cargo.toml | 2 ++ src/bin/import_omega.rs | 38 +++++++++++++++++++++++++++++++++----- src/db.rs | 12 +++++++++--- src/import_utils.rs | 18 +++++++++++++++++- 5 files changed, 74 insertions(+), 9 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 75687f7..e3123f6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2080,6 +2080,17 @@ dependencies = [ "syn 2.0.18", ] +[[package]] +name = "tokio-retry" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f57eb36ecbe0fc510036adff84824dd3c24bb781e21bfa67b69d556aa85214f" +dependencies = [ + "pin-project", + "rand", + "tokio", +] + [[package]] name = "tokio-rustls" version = "0.23.4" @@ -2560,6 +2571,8 @@ dependencies = [ "sqlx", "thiserror", "tokio", + "tokio-retry", + "tokio-stream", "tower", "tower-http 0.4.1", "tracing", diff --git a/Cargo.toml b/Cargo.toml index c2518bc..cf4421d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,6 +31,8 @@ optional_optional_user = {path = "optional_optional_user"} chrono = { version = "0.4", default-features = false, features = ["std", "clock"] } clap = { version = "4.3.10", features = ["derive", "env", "unicode", "suggestions", "usage"] } futures-util = { version = "0.3.28", features = ["tokio-io"] } +tokio-retry = "0.3.0" +tokio-stream = "0.1.14" [dev-dependencies] axum-test = "9.0.0" diff --git a/src/bin/import_omega.rs b/src/bin/import_omega.rs index 8f92447..d744d66 100644 --- a/src/bin/import_omega.rs +++ b/src/bin/import_omega.rs @@ -1,11 +1,15 @@ -use std::{ffi::OsString, time::Duration}; +use std::{ffi::OsString, pin::Pin, time::Duration}; use clap::Parser; -use futures_util::stream::TryStreamExt; +use futures_util::Stream; +//use futures_util::stream::TryStreamExt; use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions}; +use tokio::task::JoinSet; +use tokio_retry::Retry; +use tokio_stream::StreamExt; use witch_watch::{ get_db_pool, - import_utils::{add_watch_omega, ensure_omega}, + import_utils::{add_watch_omega, ensure_omega, ImportMovieOmega}, }; const MOVIE_QUERY: &str = "select * from movies order by random() limit 10000"; @@ -30,11 +34,35 @@ async fn main() { let ww_db = get_db_pool().await; - let mut movies = sqlx::query_as(MOVIE_QUERY).fetch(&movie_db); + let mut movies: Pin> + Send>> = + sqlx::query_as(MOVIE_QUERY).fetch(&movie_db); ensure_omega(&ww_db).await; + let mut set = JoinSet::new(); + + let retry_strategy = tokio_retry::strategy::ExponentialBackoff::from_millis(100) + .map(tokio_retry::strategy::jitter) + .take(4); + while let Ok(Some(movie)) = movies.try_next().await { - add_watch_omega(&ww_db, movie).await; + let db = ww_db.clone(); + let title = movie.title.as_str(); + let year = movie.year.clone().unwrap(); + let len = movie.length.clone().unwrap(); + let retry_strategy = retry_strategy.clone(); + + let key = format!("{title}{year}{len}"); + set.spawn(async move { + ( + key, + Retry::spawn(retry_strategy, || async { + add_watch_omega(&db, &movie).await + }) + .await, + ) + }); } + // stragglers + while (set.join_next().await).is_some() {} } diff --git a/src/db.rs b/src/db.rs index 70540b6..9e80e72 100644 --- a/src/db.rs +++ b/src/db.rs @@ -8,8 +8,8 @@ use axum_login::{ use session_store::SqliteSessionStore; use sqlx::{ migrate::Migrator, - sqlite::{SqliteConnectOptions, SqlitePoolOptions}, - SqlitePool, + sqlite::{SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions}, + Executor, SqlitePool, }; use crate::{db_id::DbId, User}; @@ -45,7 +45,9 @@ pub async fn get_db_pool() -> SqlitePool { let conn_opts = SqliteConnectOptions::new() .foreign_keys(true) - .auto_vacuum(sqlx::sqlite::SqliteAutoVacuum::Incremental) + //.auto_vacuum(sqlx::sqlite::SqliteAutoVacuum::Incremental) + .journal_mode(SqliteJournalMode::Memory) + .synchronous(sqlx::sqlite::SqliteSynchronous::Off) .filename(&db_filename) .busy_timeout(Duration::from_secs(TIMEOUT)) .create_if_missing(true); @@ -59,6 +61,10 @@ pub async fn get_db_pool() -> SqlitePool { .await .expect("can't connect to database"); + let mut conn = pool.acquire().await.unwrap(); + conn.execute("PRAGMA cache_size = 1000000;").await.unwrap(); + conn.execute("PRAGMA temp_store = MEMORY;").await.unwrap(); + // let the filesystem settle before trying anything // possibly not effective? tokio::time::sleep(Duration::from_millis(500)).await; diff --git a/src/import_utils.rs b/src/import_utils.rs index 3bd958f..df2b99f 100644 --- a/src/import_utils.rs +++ b/src/import_utils.rs @@ -29,15 +29,31 @@ impl From for Watch { } } +impl From<&ImportMovieOmega> for Watch { + fn from(value: &ImportMovieOmega) -> Self { + Watch { + title: value.title.to_string(), + release_date: year_to_epoch(value.year.as_deref()), + length: value.length.as_ref().and_then(|v| v.parse::().ok()), + id: DbId::new(), + kind: ShowKind::Movie, + metadata_url: None, + added_by: OMEGA_ID.into(), + } + } +} + //-************************************************************************ // utility functions for building CLI tools, currently just for benchmarking //-************************************************************************ -pub async fn add_watch_omega(db_pool: &SqlitePool, movie: ImportMovieOmega) { +pub async fn add_watch_omega(db_pool: &SqlitePool, movie: &ImportMovieOmega) -> Result<(), ()> { let watch: Watch = movie.into(); if add_new_watch_impl(db_pool, &watch, None).await.is_ok() { println!("{}", watch.id); + Ok(()) } else { eprintln!("failed to add \"{}\"", watch.title); + Err(()) } }