From 4c44aa12b081c777c82192755ac85d1fe0f5bdca Mon Sep 17 00:00:00 2001 From: Joe Ardent Date: Thu, 6 Jul 2023 12:24:25 -0700 Subject: [PATCH] Now takes only 6 seconds, instead of 3600. Ready for final tests. --- results.txt | 6 +-- src/bin/import_users.rs | 117 +++++++++++++++++++++------------------- src/import_utils.rs | 28 +++++++--- 3 files changed, 86 insertions(+), 65 deletions(-) diff --git a/results.txt b/results.txt index d3171b2..c8d9d82 100644 --- a/results.txt +++ b/results.txt @@ -1,5 +1,3 @@ --rw-r--r-- 1 ardent ardent 1.6M Jul 4 12:27 .witch-watch.db --rw-r--r-- 1 ardent ardent 161K Jul 4 12:29 .witch-watch.db-wal --rw-r--r-- 1 ardent ardent 32K Jul 4 12:29 .witch-watch.db-shm +-rw-r--r-- 1 ardent ardent 17M Jul 6 10:05 /home/ardent/.witch-watch.db -4 seconds wall to add 10k movies, added by the omega user. +6 seconds to add 98,713 watch quests. diff --git a/src/bin/import_users.rs b/src/bin/import_users.rs index f2290fe..20a374d 100644 --- a/src/bin/import_users.rs +++ b/src/bin/import_users.rs @@ -1,7 +1,7 @@ -use std::{collections::BTreeSet, ffi::OsString, time::Duration}; +use std::{ffi::OsString, time::Duration}; use clap::Parser; -use rand::{thread_rng, Rng}; +use rand::{rngs::ThreadRng, seq::SliceRandom, thread_rng, Rng}; use rand_distr::Normal; use sqlx::{ sqlite::{SqliteConnectOptions, SqlitePoolOptions}, @@ -11,7 +11,7 @@ use tokio::task::JoinSet; use tokio_retry::Retry; use witch_watch::{ get_db_pool, - import_utils::{add_omega_watches, add_user, add_watch_quest}, + import_utils::{add_omega_watches, add_user, add_watch_quests}, DbId, WatchQuest, }; @@ -61,53 +61,17 @@ async fn main() { let users = &gen_users(num_users, &words, &ww_db).await; let movies = &add_omega_watches(&ww_db, &movie_db).await; - let normal = Normal::new(mpu, mpu / 10.0).unwrap(); let rng = &mut thread_rng(); - for user in users { - let mut joinset = JoinSet::new(); - let mut mset = BTreeSet::new(); - let num_movies = rng.sample(normal) as usize; - - while mset.len() < num_movies { - let idx = rng.gen_range(0..10_000usize); - mset.insert(idx); - } - dbg!("done with mset pop"); - - let retry_strategy = tokio_retry::strategy::ExponentialBackoff::from_millis(100) - .map(tokio_retry::strategy::jitter) - .take(4); - - for movie in mset.iter() { - let movie = movies[*movie]; - let quest = WatchQuest { - id: DbId::new(), - user: *user, - watch: movie, - is_public: true, - already_watched: false, - }; - - let retry_strategy = retry_strategy.clone(); - let db = ww_db.clone(); - let key = quest.id.as_string(); - joinset.spawn(async move { - ( - key, - Retry::spawn(retry_strategy, || async { - add_watch_quest(&db, quest).await - }) - .await, - ) - }); - } - - // stragglers - while (joinset.join_next().await).is_some() {} + let normal = Normal::new(mpu, mpu / 10.0).unwrap(); + for &user in users { + add_quests(user, movies, &ww_db, rng, normal).await; } } +//-************************************************************************ +// add the users +//-************************************************************************ async fn gen_users(num: usize, words: &[&str], pool: &SqlitePool) -> Vec { let mut rng = thread_rng(); let rng = &mut rng; @@ -119,25 +83,70 @@ async fn gen_users(num: usize, words: &[&str], pool: &SqlitePool) -> Vec { let n3 = rng.gen_range(range.clone()); let nn = rng.gen_range(0..200); - let n1 = &words[n1]; - let n2 = &words[n2]; - let email_domain = &words[n3]; + let n1 = words[n1].replace('\'', ""); + let n2 = words[n2].replace('\'', ""); + let email_domain = words[n3].replace('\'', ""); - let username = format!("{n1}{n2}{nn}"); + let username = format!("{n1}_{n2}{nn}"); let displayname = Some(format!("{n1} {n2}")); let email = Some(format!("{username}@{email_domain}")); - let id = DbId::new(); - - add_user( + let id = add_user( pool, &username, displayname.as_deref(), email.as_deref(), - Some(id), + None, ) .await; - (&mut users).push(id); + users.push(id); } users } + +//-************************************************************************ +// batch add quests +//-************************************************************************ +async fn add_quests( + user: DbId, + movies: &[DbId], + ww_db: &SqlitePool, + rng: &mut ThreadRng, + normal: Normal, +) { + let mut tasks = JoinSet::new(); + let num_movies = rng.sample(normal) as usize; + let quests: Vec = movies + .choose_multiple(rng, num_movies) + .cloned() + .map(|movie| { + let id = DbId::new(); + WatchQuest { + id, + user, + watch: movie, + is_public: true, + already_watched: false, + } + }) + .collect(); + + let retry_strategy = tokio_retry::strategy::ExponentialBackoff::from_millis(100) + .map(tokio_retry::strategy::jitter) + .take(4); + + let db = ww_db.clone(); + tasks.spawn(async move { + let movies = quests; + ( + user, + Retry::spawn(retry_strategy, || async { + add_watch_quests(&db, &movies).await + }) + .await, + ) + }); + + // get the stragglers + while (tasks.join_next().await).is_some() {} +} diff --git a/src/import_utils.rs b/src/import_utils.rs index 730d2f7..30a7b2c 100644 --- a/src/import_utils.rs +++ b/src/import_utils.rs @@ -1,4 +1,4 @@ -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use sqlx::{query, query_scalar, SqlitePool}; use tokio::task::JoinSet; @@ -77,13 +77,28 @@ pub async fn add_watch_quest(db_pool: &SqlitePool, quest: WatchQuest) -> Result< } } +pub async fn add_watch_quests(pool: &SqlitePool, quests: &[WatchQuest]) -> Result<(), ()> { + let mut builder = + sqlx::QueryBuilder::new("insert into witch_watch (id, witch, watch, public, watched) "); + builder.push_values(quests.iter(), |mut b, quest| { + b.push_bind(quest.id) + .push_bind(quest.user) + .push_bind(quest.watch) + .push_bind(quest.is_public) + .push_bind(quest.already_watched); + }); + let q = builder.build(); + q.execute(pool).await.map_err(|_| ())?; + Ok(()) +} + pub async fn add_user( db_pool: &SqlitePool, username: &str, displayname: Option<&str>, email: Option<&str>, id: Option, -) { +) -> DbId { let pwhash = "you shall not password"; let id: DbId = id.unwrap_or_else(DbId::new); if query(crate::signup::CREATE_QUERY) @@ -94,12 +109,11 @@ pub async fn add_user( .bind(pwhash) .execute(db_pool) .await - .is_ok() + .is_err() { - println!("{id}"); - } else { eprintln!("failed to add user \"{username}\""); } + id } pub async fn add_omega_watches(ww_db: &SqlitePool, movie_db: &SqlitePool) -> Vec { @@ -112,7 +126,7 @@ pub async fn add_omega_watches(ww_db: &SqlitePool, movie_db: &SqlitePool) -> Vec let mut set = JoinSet::new(); let movie_set = Vec::with_capacity(10_000); - let movie_set = Arc::new(Mutex::new(movie_set)); + let movie_set = Arc::new(std::sync::Mutex::new(movie_set)); let retry_strategy = tokio_retry::strategy::ExponentialBackoff::from_millis(100) .map(tokio_retry::strategy::jitter) @@ -158,7 +172,7 @@ pub async fn ensure_omega(db_pool: &SqlitePool) -> DbId { None, Some(OMEGA_ID.into()), ) - .await + .await; } OMEGA_ID.into() }