inserts 10k rows in 6 seconds

This commit is contained in:
Joe Ardent 2023-07-04 11:30:42 -07:00
parent 825db88039
commit e93e43786f
5 changed files with 74 additions and 9 deletions

13
Cargo.lock generated
View file

@ -2080,6 +2080,17 @@ dependencies = [
"syn 2.0.18",
]
[[package]]
name = "tokio-retry"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f57eb36ecbe0fc510036adff84824dd3c24bb781e21bfa67b69d556aa85214f"
dependencies = [
"pin-project",
"rand",
"tokio",
]
[[package]]
name = "tokio-rustls"
version = "0.23.4"
@ -2560,6 +2571,8 @@ dependencies = [
"sqlx",
"thiserror",
"tokio",
"tokio-retry",
"tokio-stream",
"tower",
"tower-http 0.4.1",
"tracing",

View file

@ -31,6 +31,8 @@ optional_optional_user = {path = "optional_optional_user"}
chrono = { version = "0.4", default-features = false, features = ["std", "clock"] }
clap = { version = "4.3.10", features = ["derive", "env", "unicode", "suggestions", "usage"] }
futures-util = { version = "0.3.28", features = ["tokio-io"] }
tokio-retry = "0.3.0"
tokio-stream = "0.1.14"
[dev-dependencies]
axum-test = "9.0.0"

View file

@ -1,11 +1,15 @@
use std::{ffi::OsString, time::Duration};
use std::{ffi::OsString, pin::Pin, time::Duration};
use clap::Parser;
use futures_util::stream::TryStreamExt;
use futures_util::Stream;
//use futures_util::stream::TryStreamExt;
use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions};
use tokio::task::JoinSet;
use tokio_retry::Retry;
use tokio_stream::StreamExt;
use witch_watch::{
get_db_pool,
import_utils::{add_watch_omega, ensure_omega},
import_utils::{add_watch_omega, ensure_omega, ImportMovieOmega},
};
const MOVIE_QUERY: &str = "select * from movies order by random() limit 10000";
@ -30,11 +34,35 @@ async fn main() {
let ww_db = get_db_pool().await;
let mut movies = sqlx::query_as(MOVIE_QUERY).fetch(&movie_db);
let mut movies: Pin<Box<dyn Stream<Item = Result<ImportMovieOmega, _>> + Send>> =
sqlx::query_as(MOVIE_QUERY).fetch(&movie_db);
ensure_omega(&ww_db).await;
let mut set = JoinSet::new();
let retry_strategy = tokio_retry::strategy::ExponentialBackoff::from_millis(100)
.map(tokio_retry::strategy::jitter)
.take(4);
while let Ok(Some(movie)) = movies.try_next().await {
add_watch_omega(&ww_db, movie).await;
let db = ww_db.clone();
let title = movie.title.as_str();
let year = movie.year.clone().unwrap();
let len = movie.length.clone().unwrap();
let retry_strategy = retry_strategy.clone();
let key = format!("{title}{year}{len}");
set.spawn(async move {
(
key,
Retry::spawn(retry_strategy, || async {
add_watch_omega(&db, &movie).await
})
.await,
)
});
}
// stragglers
while (set.join_next().await).is_some() {}
}

View file

@ -8,8 +8,8 @@ use axum_login::{
use session_store::SqliteSessionStore;
use sqlx::{
migrate::Migrator,
sqlite::{SqliteConnectOptions, SqlitePoolOptions},
SqlitePool,
sqlite::{SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions},
Executor, SqlitePool,
};
use crate::{db_id::DbId, User};
@ -45,7 +45,9 @@ pub async fn get_db_pool() -> SqlitePool {
let conn_opts = SqliteConnectOptions::new()
.foreign_keys(true)
.auto_vacuum(sqlx::sqlite::SqliteAutoVacuum::Incremental)
//.auto_vacuum(sqlx::sqlite::SqliteAutoVacuum::Incremental)
.journal_mode(SqliteJournalMode::Memory)
.synchronous(sqlx::sqlite::SqliteSynchronous::Off)
.filename(&db_filename)
.busy_timeout(Duration::from_secs(TIMEOUT))
.create_if_missing(true);
@ -59,6 +61,10 @@ pub async fn get_db_pool() -> SqlitePool {
.await
.expect("can't connect to database");
let mut conn = pool.acquire().await.unwrap();
conn.execute("PRAGMA cache_size = 1000000;").await.unwrap();
conn.execute("PRAGMA temp_store = MEMORY;").await.unwrap();
// let the filesystem settle before trying anything
// possibly not effective?
tokio::time::sleep(Duration::from_millis(500)).await;

View file

@ -29,15 +29,31 @@ impl From<ImportMovieOmega> for Watch {
}
}
impl From<&ImportMovieOmega> for Watch {
fn from(value: &ImportMovieOmega) -> Self {
Watch {
title: value.title.to_string(),
release_date: year_to_epoch(value.year.as_deref()),
length: value.length.as_ref().and_then(|v| v.parse::<i64>().ok()),
id: DbId::new(),
kind: ShowKind::Movie,
metadata_url: None,
added_by: OMEGA_ID.into(),
}
}
}
//-************************************************************************
// utility functions for building CLI tools, currently just for benchmarking
//-************************************************************************
pub async fn add_watch_omega(db_pool: &SqlitePool, movie: ImportMovieOmega) {
pub async fn add_watch_omega(db_pool: &SqlitePool, movie: &ImportMovieOmega) -> Result<(), ()> {
let watch: Watch = movie.into();
if add_new_watch_impl(db_pool, &watch, None).await.is_ok() {
println!("{}", watch.id);
Ok(())
} else {
eprintln!("failed to add \"{}\"", watch.title);
Err(())
}
}