diff --git a/migrations/20230426221940_init.up.sql b/migrations/20230426221940_init.up.sql index 9cc1f88..3a72aab 100644 --- a/migrations/20230426221940_init.up.sql +++ b/migrations/20230426221940_init.up.sql @@ -31,11 +31,10 @@ create table if not exists watches ( create table if not exists witch_watch ( witch blob not null, watch blob not null, - party blob, -- list of witch IDs, but we can also scan for friends that want to watch the same thing priority int, -- 1-5 how much do you want to watch it - public boolean not null, - watched boolean not null, - when_added int, + public boolean not null default true, + watched boolean not null default false, + when_added int not null default (unixepoch()), when_watched int, last_updated int not null default (unixepoch()), foreign key (witch) references witches (id) on delete cascade on update no action, diff --git a/src/bin/import_omega.rs b/src/bin/import_omega.rs index 268cb3d..4a7e1b9 100644 --- a/src/bin/import_omega.rs +++ b/src/bin/import_omega.rs @@ -24,5 +24,16 @@ async fn main() { let ww_db = get_db_pool().await; - add_omega_watches(&ww_db, &movie_db).await; + let start = std::time::Instant::now(); + add_omega_watches(&ww_db, &movie_db).await.unwrap(); + let end = std::time::Instant::now(); + + let dur = (end - start).as_secs_f32(); + + let rows: i32 = sqlx::query_scalar("select count(*) from watches") + .fetch_one(&ww_db) + .await + .unwrap(); + + println!("Added {rows} movies in {dur} seconds"); } diff --git a/src/bin/import_users.rs b/src/bin/import_users.rs index e39da34..e2bf39c 100644 --- a/src/bin/import_users.rs +++ b/src/bin/import_users.rs @@ -1,7 +1,7 @@ use std::{ffi::OsString, time::Duration}; use clap::Parser; -use rand::{rngs::ThreadRng, seq::SliceRandom, thread_rng, Rng}; +use rand::{seq::SliceRandom, thread_rng, Rng}; use rand_distr::Normal; use sqlx::{ sqlite::{SqliteConnectOptions, SqlitePoolOptions}, @@ -11,30 +11,10 @@ use tokio::task::JoinSet; use tokio_retry::Retry; use witch_watch::{ get_db_pool, - import_utils::{add_omega_watches, add_user, add_watch_quests}, - DbId, WatchQuest, + import_utils::{add_omega_watches, add_users, add_watch_quests}, + DbId, User, WatchQuest, }; -#[derive(Debug, Parser)] -struct Cli { - /// path to the movie database - #[clap(long = "database", short)] - pub db_path: OsString, - - /// number of users to create - #[clap(long, short, default_value_t = 1000)] - pub users: usize, - - /// expected gaussian value for number of movies per use - #[clap(long = "movies", short, default_value_t = 100)] - pub movies_per_user: u32, - - /// path to the dictionary to be used for usernames [default: - /// /usr/share/dict/words] - #[clap(long, short)] - pub words: Option, -} - #[tokio::main] async fn main() { let cli = Cli::parse(); @@ -52,21 +32,29 @@ async fn main() { let opts = SqliteConnectOptions::new().filename(&path).read_only(true); let movie_db = SqlitePoolOptions::new() - .idle_timeout(Duration::from_secs(90)) + .idle_timeout(Duration::from_secs(3)) .connect_with(opts) .await .expect("could not open movies db"); let ww_db = get_db_pool().await; let users = &gen_users(num_users, &words, &ww_db).await; - let movies = &add_omega_watches(&ww_db, &movie_db).await; + let movies = &add_omega_watches(&ww_db, &movie_db).await.unwrap(); let rng = &mut thread_rng(); let normal = Normal::new(mpu, mpu / 10.0).unwrap(); + let start = std::time::Instant::now(); for &user in users { add_quests(user, movies, &ww_db, rng, normal).await; } + let end = std::time::Instant::now(); + let rows: i32 = sqlx::query_scalar("select count(*) from witch_watch") + .fetch_one(&ww_db) + .await + .unwrap(); + let dur = (end - start).as_secs_f32(); + println!("Added {rows} quests in {dur} seconds"); } //-************************************************************************ @@ -90,28 +78,30 @@ async fn gen_users(num: usize, words: &[&str], pool: &SqlitePool) -> Vec { let username = format!("{n1}_{n2}{nn}"); let displayname = Some(format!("{n1} {n2}")); let email = Some(format!("{username}@{email_domain}")); - let id = add_user( - pool, - &username, - displayname.as_deref(), - email.as_deref(), - None, - ) - .await; - users.push(id); + let id = DbId::new(); + let user = User { + id, + username, + displayname, + email, + last_seen: None, + pwhash: "can't password this".to_string(), + }; + users.push(user); } + add_users(pool, &users).await.unwrap(); - users + users.into_iter().map(|u| u.id).collect() } //-************************************************************************ // batch add quests //-************************************************************************ -async fn add_quests( +async fn add_quests( user: DbId, movies: &[DbId], ww_db: &SqlitePool, - rng: &mut ThreadRng, + rng: &mut R, normal: Normal, ) { let mut tasks = JoinSet::new(); @@ -119,17 +109,17 @@ async fn add_quests( let quests: Vec = movies .choose_multiple(rng, num_movies) .cloned() - .map(|movie| WatchQuest { + .map(|watch| WatchQuest { user, - watch: movie, + watch, is_public: true, already_watched: false, }) .collect(); - let retry_strategy = tokio_retry::strategy::ExponentialBackoff::from_millis(100) + let retry_strategy = tokio_retry::strategy::ExponentialBackoff::from_millis(10) .map(tokio_retry::strategy::jitter) - .take(4); + .take(3); let db = ww_db.clone(); tasks.spawn(async move { @@ -146,3 +136,23 @@ async fn add_quests( // get the stragglers while (tasks.join_next().await).is_some() {} } + +#[derive(Debug, Parser)] +pub struct Cli { + /// path to the movie database + #[clap(long = "database", short)] + pub db_path: OsString, + + /// number of users to create + #[clap(long, short, default_value_t = 1000)] + pub users: usize, + + /// expected gaussian value for number of movies per use + #[clap(long = "movies", short, default_value_t = 100)] + pub movies_per_user: u32, + + /// path to the dictionary to be used for usernames [default: + /// /usr/share/dict/words] + #[clap(long, short)] + pub words: Option, +} diff --git a/src/import_utils.rs b/src/import_utils.rs index 5fd5ed2..b706cdd 100644 --- a/src/import_utils.rs +++ b/src/import_utils.rs @@ -1,15 +1,6 @@ -use std::sync::Arc; +use sqlx::{query_scalar, SqlitePool}; -use sqlx::{query, query_scalar, SqlitePool}; -use tokio::task::JoinSet; -use tokio_retry::Retry; - -use crate::{ - db_id::DbId, - util::year_to_epoch, - watches::handlers::{add_new_watch_impl, add_watch_quest_impl}, - ShowKind, Watch, WatchQuest, -}; +use crate::{db_id::DbId, util::year_to_epoch, ShowKind, User, Watch, WatchQuest}; const USER_EXISTS_QUERY: &str = "select count(*) from witches where id = $1"; @@ -20,6 +11,8 @@ const MOVIE_QUERY: &str = "select * from movies order by random() limit 10000"; //-************************************************************************ const OMEGA_ID: u128 = u128::MAX; +const BULK_INSERT: usize = 2_000; + #[derive(Debug, sqlx::FromRow, Clone)] pub struct ImportMovieOmega { pub title: String, @@ -58,64 +51,40 @@ impl From<&ImportMovieOmega> for Watch { //-************************************************************************ // utility functions for building CLI tools, currently just for benchmarking //-************************************************************************ -pub async fn add_watch_omega(db_pool: &SqlitePool, movie: &ImportMovieOmega) -> Result { - let watch: Watch = movie.into(); - if add_new_watch_impl(db_pool, &watch, None).await.is_ok() { - Ok(watch.id) - } else { - eprintln!("failed to add \"{}\"", watch.title); - Err(()) - } -} - -pub async fn add_watch_quest(db_pool: &SqlitePool, quest: WatchQuest) -> Result<(), ()> { - if add_watch_quest_impl(db_pool, &quest).await.is_ok() { - Ok(()) - } else { - eprintln!("failed to add {:?}", quest.id()); - Err(()) - } -} - pub async fn add_watch_quests(pool: &SqlitePool, quests: &[WatchQuest]) -> Result<(), ()> { - let mut builder = - sqlx::QueryBuilder::new("insert into witch_watch (witch, watch, public, watched) "); - builder.push_values(quests.iter(), |mut b, quest| { - b.push_bind(quest.user) - .push_bind(quest.watch) - .push_bind(quest.is_public) - .push_bind(quest.already_watched); + let mut builder = sqlx::QueryBuilder::new("insert into witch_watch (witch, watch) "); + builder.push_values(quests, |mut b, quest| { + let user = quest.user; + let watch = quest.watch; + //eprintln!("{user}, {watch}"); + b.push_bind(user).push_bind(watch); }); + let q = builder.build(); - q.execute(pool).await.map_err(|_| ())?; + q.execute(pool).await.map_err(|e| { + dbg!(e); + })?; + Ok(()) } -pub async fn add_user( - db_pool: &SqlitePool, - username: &str, - displayname: Option<&str>, - email: Option<&str>, - id: Option, -) -> DbId { - let pwhash = "you shall not password"; - let id: DbId = id.unwrap_or_else(DbId::new); - if query(crate::signup::CREATE_QUERY) - .bind(id) - .bind(username) - .bind(displayname) - .bind(email) - .bind(pwhash) - .execute(db_pool) - .await - .is_err() - { - eprintln!("failed to add user \"{username}\""); - } - id +pub async fn add_users(db_pool: &SqlitePool, users: &[User]) -> Result<(), ()> { + let mut builder = + sqlx::QueryBuilder::new("insert into witches (id, username, displayname, email, pwhash) "); + + builder.push_values(users.iter(), |mut b, user| { + b.push_bind(user.id) + .push_bind(&user.username) + .push_bind(&user.displayname) + .push_bind(&user.email) + .push_bind(&user.pwhash); + }); + let q = builder.build(); + q.execute(db_pool).await.map_err(|_| ())?; + Ok(()) } -pub async fn add_omega_watches(ww_db: &SqlitePool, movie_db: &SqlitePool) -> Vec { +pub async fn add_omega_watches(ww_db: &SqlitePool, movie_db: &SqlitePool) -> Result, ()> { ensure_omega(ww_db).await; let movies: Vec = sqlx::query_as(MOVIE_QUERY) @@ -123,55 +92,45 @@ pub async fn add_omega_watches(ww_db: &SqlitePool, movie_db: &SqlitePool) -> Vec .await .unwrap(); - let mut set = JoinSet::new(); - let movie_set = Vec::with_capacity(10_000); - let movie_set = Arc::new(std::sync::Mutex::new(movie_set)); + let mut ids = Vec::with_capacity(10_000); + let omega: DbId = OMEGA_ID.into(); - let retry_strategy = tokio_retry::strategy::ExponentialBackoff::from_millis(100) - .map(tokio_retry::strategy::jitter) - .take(4); + for movies in movies.as_slice().chunks(BULK_INSERT) { + let mut builder = sqlx::QueryBuilder::new( + "insert into watches (id, kind, title, length, release_date, added_by) ", + ); - for movie in movies { - let db = ww_db.clone(); - let title = movie.title.as_str(); - let year = movie.year.clone().unwrap(); - let len = movie.length.clone().unwrap(); - let retry_strategy = retry_strategy.clone(); - let movie_set = movie_set.clone(); + builder.push_values(movies, |mut b, movie| { + let id = DbId::new(); + ids.push(id); + let title = &movie.title; - let key = format!("{title}{year}{len}"); - set.spawn(async move { - ( - key, - Retry::spawn(retry_strategy, || async { - if let Ok(id) = add_watch_omega(&db, &movie).await { - let mut mset = movie_set.lock().unwrap(); - mset.push(id); - Ok(()) - } else { - Err(()) - } - }) - .await, - ) + b.push_bind(id) + .push_bind(ShowKind::Movie) + .push_bind(title) + .push_bind(movie.length.as_ref().and_then(|l| l.parse::().ok())) + .push_bind(year_to_epoch(movie.year.as_deref())) + .push_bind(omega); }); + let q = builder.build(); + + q.execute(ww_db).await.map_err(|_| ())?; } - // stragglers - while (set.join_next().await).is_some() {} - let movies = movie_set.lock().unwrap().clone(); - movies + + Ok(ids) } pub async fn ensure_omega(db_pool: &SqlitePool) -> DbId { if !check_omega_exists(db_pool).await { - add_user( - db_pool, - "The Omega User", - Some("I am the end of all watches."), - None, - Some(OMEGA_ID.into()), - ) - .await; + let omega = User { + id: OMEGA_ID.into(), + username: "The Omega User".to_string(), + displayname: Some("I am the end of all watches".to_string()), + email: None, + last_seen: None, + pwhash: "you shall not password".to_string(), + }; + add_users(db_pool, &[omega]).await.unwrap(); } OMEGA_ID.into() } diff --git a/src/users.rs b/src/users.rs index 885d40b..84e9a94 100644 --- a/src/users.rs +++ b/src/users.rs @@ -20,7 +20,7 @@ pub struct User { pub displayname: Option, pub email: Option, pub last_seen: Option, - pub(crate) pwhash: String, + pub pwhash: String, } impl Debug for User { diff --git a/src/watches/handlers.rs b/src/watches/handlers.rs index c252bb9..69a8683 100644 --- a/src/watches/handlers.rs +++ b/src/watches/handlers.rs @@ -199,7 +199,7 @@ pub async fn post_add_watch_quest( todo!() } -pub async fn add_watch_quest_impl(pool: &SqlitePool, quest: &WatchQuest) -> Result<(), ()> { +pub async fn _add_watch_quest_impl(pool: &SqlitePool, quest: &WatchQuest) -> Result<(), ()> { query(ADD_WITCH_WATCH_QUERY) .bind(quest.user) .bind(quest.watch)