use std::{ffi::OsString, time::Duration}; use clap::Parser; use rand::{seq::SliceRandom, thread_rng, Rng}; use rand_distr::Normal; use sqlx::{ sqlite::{SqliteConnectOptions, SqlitePoolOptions}, SqlitePool, }; use tokio::task::JoinSet; use tokio_retry::Retry; use what2watch::{ get_db_pool, import_utils::{add_omega_watches, add_users, add_watch_quests}, DbId, User, WatchQuest, }; #[tokio::main] async fn main() { let cli = Cli::parse(); let path = cli.db_path; let num_users = cli.users; let mpu = cli.movies_per_user as f32; let dict = if let Some(dict) = cli.words { dict } else { "/usr/share/dict/words".into() }; let words = std::fs::read_to_string(dict).expect("tried to open {dict:?}"); let words: Vec<&str> = words.split('\n').collect(); let opts = SqliteConnectOptions::new().filename(&path).read_only(true); let movie_db = SqlitePoolOptions::new() .idle_timeout(Duration::from_secs(3)) .connect_with(opts) .await .expect("could not open movies db"); let w2w_db = get_db_pool().await; let users = &gen_users(num_users, &words, &w2w_db).await; let movies = &add_omega_watches(&w2w_db, &movie_db).await.unwrap(); let rng = &mut thread_rng(); let normal = Normal::new(mpu, mpu / 10.0).unwrap(); let start = std::time::Instant::now(); for &user in users { add_quests(user, movies, &w2w_db, rng, normal).await; } let end = std::time::Instant::now(); let rows: i32 = sqlx::query_scalar("select count(*) from watch_quests") .fetch_one(&w2w_db) .await .unwrap(); let dur = (end - start).as_secs_f32(); println!("Added {rows} quests in {dur} seconds"); } //-************************************************************************ // add the users //-************************************************************************ async fn gen_users(num: usize, words: &[&str], pool: &SqlitePool) -> Vec { let mut rng = thread_rng(); let rng = &mut rng; let range = 0usize..(words.len()); let mut users = Vec::with_capacity(num); for _ in 0..num { let n1 = rng.gen_range(range.clone()); let n2 = rng.gen_range(range.clone()); let n3 = rng.gen_range(range.clone()); let nn = rng.gen_range(0..200); let n1 = words[n1].replace('\'', ""); let n2 = words[n2].replace('\'', ""); let email_domain = words[n3].replace('\'', ""); let username = format!("{n1}_{n2}{nn}"); let displayname = Some(format!("{n1} {n2}")); let email = Some(format!("{username}@{email_domain}")); let id = DbId::new(); let user = User { id, username, displayname, email, last_seen: None, pwhash: "can't password this".to_string(), }; users.push(user); } add_users(pool, &users).await.unwrap(); users.into_iter().map(|u| u.id).collect() } //-************************************************************************ // batch add quests //-************************************************************************ async fn add_quests( user: DbId, movies: &[DbId], w2w_db: &SqlitePool, rng: &mut R, normal: Normal, ) { let mut tasks = JoinSet::new(); let num_movies = rng.sample(normal) as usize; let quests: Vec = movies .choose_multiple(rng, num_movies) .cloned() .map(|watch| WatchQuest { user, watch, is_public: true, already_watched: false, }) .collect(); let retry_strategy = tokio_retry::strategy::ExponentialBackoff::from_millis(10) .map(tokio_retry::strategy::jitter) .take(3); let db = w2w_db.clone(); tasks.spawn(async move { let movies = quests; ( user, Retry::spawn(retry_strategy, || async { add_watch_quests(&db, &movies).await }) .await, ) }); // get the stragglers while (tasks.join_next().await).is_some() {} } #[derive(Debug, Parser)] pub struct Cli { /// path to the movie database #[clap(long = "database", short)] pub db_path: OsString, /// number of users to create #[clap(long, short, default_value_t = 1000)] pub users: usize, /// expected gaussian value for number of movies per use #[clap(long = "movies", short, default_value_t = 100)] pub movies_per_user: u32, /// path to the dictionary to be used for usernames [default: /// /usr/share/dict/words] #[clap(long, short)] pub words: Option, }