diff --git a/Cargo.lock b/Cargo.lock index 8fae242..b96a0ca 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2527,7 +2527,7 @@ dependencies = [ "justerror", "optional_optional_user", "password-hash", - "rand_core", + "rand", "serde", "serde_test", "sqlx", diff --git a/Cargo.toml b/Cargo.toml index 28da39b..43a7758 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,33 +5,34 @@ edition = "2021" default-run = "witch_watch" [dependencies] -axum = { version = "0.6", features = ["macros", "headers"] } +# local proc macro +optional_optional_user = {path = "optional_optional_user"} + +# regular external deps +argon2 = "0.5" askama = { version = "0.12", features = ["with-axum"] } askama_axum = "0.3" -axum-macros = "0.3" -tokio = { version = "1", features = ["full", "tracing"], default-features = false } -tracing = "0.1" -tracing-subscriber = { version = "0.3", features = ["env-filter"] } -tower = { version = "0.4", features = ["util", "timeout"], default-features = false } -tower-http = { version = "0.4", features = ["add-extension", "trace"] } -serde = { version = "1", features = ["derive"] } -sqlx = { version = "0.6", default-features = false, features = ["runtime-tokio-rustls", "any", "sqlite", "chrono", "time"] } -argon2 = "0.5" -rand_core = { version = "0.6", features = ["getrandom"] } -thiserror = "1" -justerror = "1" -password-hash = { version = "0.5", features = ["std", "getrandom"] } -axum-login = { version = "0.5", features = ["sqlite", "sqlx"] } -unicode-segmentation = "1" async-session = "3" -ulid = { version = "1", features = ["rand"] } - -# proc macros: -optional_optional_user = {path = "optional_optional_user"} +axum = { version = "0.6", features = ["macros", "headers"] } +axum-login = { version = "0.5", features = ["sqlite", "sqlx"] } +axum-macros = "0.3" chrono = { version = "0.4", default-features = false, features = ["std", "clock"] } clap = { version = "4.3.10", features = ["derive", "env", "unicode", "suggestions", "usage"] } +justerror = "1" +password-hash = { version = "0.5", features = ["std", "getrandom"] } +rand = "0.8" +serde = { version = "1", features = ["derive"] } +sqlx = { version = "0.6", default-features = false, features = ["runtime-tokio-rustls", "any", "sqlite", "chrono", "time"] } +thiserror = "1" +tokio = { version = "1", features = ["full", "tracing"], default-features = false } tokio-retry = "0.3.0" tokio-stream = "0.1.14" +tower = { version = "0.4", features = ["util", "timeout"], default-features = false } +tower-http = { version = "0.4", features = ["add-extension", "trace"] } +tracing = "0.1" +tracing-subscriber = { version = "0.3", features = ["env-filter"] } +ulid = { version = "1", features = ["rand"] } +unicode-segmentation = "1" [dev-dependencies] axum-test = "9.0.0" diff --git a/src/bin/import_omega.rs b/src/bin/import_omega.rs index 893ca7b..7eaf2c1 100644 --- a/src/bin/import_omega.rs +++ b/src/bin/import_omega.rs @@ -1,13 +1,10 @@ -use std::{ffi::OsString, pin::Pin, time::Duration}; +use std::{ffi::OsString, time::Duration}; use clap::Parser; use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions}; -use tokio::task::JoinSet; -use tokio_retry::Retry; -use tokio_stream::{Stream, StreamExt}; use witch_watch::{ get_db_pool, - import_utils::{add_watch_omega, ensure_omega, ImportMovieOmega}, + import_utils::{add_omega_watches, ImportMovieOmega}, }; const MOVIE_QUERY: &str = "select * from movies order by random() limit 10000"; @@ -32,35 +29,10 @@ async fn main() { let ww_db = get_db_pool().await; - let mut movies: Pin> + Send>> = - sqlx::query_as(MOVIE_QUERY).fetch(&movie_db); + let movies: Vec = sqlx::query_as(MOVIE_QUERY) + .fetch_all(&movie_db) + .await + .unwrap(); - ensure_omega(&ww_db).await; - - let mut set = JoinSet::new(); - - let retry_strategy = tokio_retry::strategy::ExponentialBackoff::from_millis(100) - .map(tokio_retry::strategy::jitter) - .take(4); - - while let Ok(Some(movie)) = movies.try_next().await { - let db = ww_db.clone(); - let title = movie.title.as_str(); - let year = movie.year.clone().unwrap(); - let len = movie.length.clone().unwrap(); - let retry_strategy = retry_strategy.clone(); - - let key = format!("{title}{year}{len}"); - set.spawn(async move { - ( - key, - Retry::spawn(retry_strategy, || async { - add_watch_omega(&db, &movie).await - }) - .await, - ) - }); - } - // stragglers - while (set.join_next().await).is_some() {} + add_omega_watches(&ww_db, movies).await; } diff --git a/src/bin/import_users.rs b/src/bin/import_users.rs new file mode 100644 index 0000000..be6f968 --- /dev/null +++ b/src/bin/import_users.rs @@ -0,0 +1,144 @@ +use std::{ffi::OsString, time::Duration}; + +use clap::Parser; +use rand::{thread_rng, Rng}; +use sqlx::{ + sqlite::{SqliteConnectOptions, SqlitePoolOptions}, + FromRow, SqlitePool, +}; +use tokio::task::JoinSet; +use tokio_retry::Retry; +use tokio_stream::{Stream, StreamExt}; +use witch_watch::{ + get_db_pool, + import_utils::{add_user, add_watch_user, ImportMovieOmega}, + DbId, User, Watch, WatchQuest, +}; + +const MOVIE_QUERY: &str = "select * from movies order by random() limit ?"; + +#[derive(Debug, Parser)] +struct Cli { + /// path to the movie database + #[clap(long = "database", short)] + pub db_path: OsString, + + /// number of users to create + #[clap(long, short, default_value_t = 1000)] + pub users: usize, + + /// expected gaussian value for number of movies per use + #[clap(long = "movies", short, default_value_t = 100)] + pub movies_per_user: u32, + + /// path to the dictionary to be used for usernames [default: + /// /usr/share/dict/words] + #[clap(long, short)] + pub words: Option, +} + +#[tokio::main] +async fn main() { + let cli = Cli::parse(); + let path = cli.db_path; + let num_users = cli.users; + let mpu = cli.movies_per_user; + let dict = if let Some(dict) = cli.words { + dict + } else { + "/usr/share/dict/words".into() + }; + + let words = std::fs::read_to_string(dict).expect("tried to open {dict:?}"); + let words: Vec<&str> = words.split('\n').collect(); + + let opts = SqliteConnectOptions::new().filename(&path).read_only(true); + let movie_db = SqlitePoolOptions::new() + .idle_timeout(Duration::from_secs(90)) + .connect_with(opts) + .await + .expect("could not open movies db"); + + let ww_db = get_db_pool().await; + + let rng = &mut thread_rng(); + + let users = gen_users(num_users, &words, &ww_db).await; + + for _ in 0..num_users { + let mut movies = sqlx::query(MOVIE_QUERY).bind(mpu).fetch(&movie_db); + + let mut set = JoinSet::new(); + + let retry_strategy = tokio_retry::strategy::ExponentialBackoff::from_millis(100) + .map(tokio_retry::strategy::jitter) + .take(4); + + while let Ok(Some(movie)) = movies.try_next().await { + let movie = ImportMovieOmega::from_row(&movie).unwrap(); + let mut watch: Watch = movie.into(); + let db = ww_db.clone(); + let retry_strategy = retry_strategy.clone(); + + let user = rng.gen_range(0..num_users); + let user = users[user]; + watch.added_by = user; + + let quest = WatchQuest { + id: DbId::new(), + user, + watch: watch.id, + is_public: true, + already_watched: false, + }; + + let key = quest.id.as_string(); + set.spawn(async move { + ( + key, + Retry::spawn(retry_strategy, || async { + add_watch_user(&db, &watch, quest).await + }) + .await, + ) + }); + } + + // stragglers + while (set.join_next().await).is_some() {} + } +} + +async fn gen_users(num: usize, words: &[&str], pool: &SqlitePool) -> Vec { + let mut rng = thread_rng(); + let rng = &mut rng; + let range = 0usize..(words.len()); + let mut users = Vec::with_capacity(num); + for _ in 0..num { + let n1 = rng.gen_range(range.clone()); + let n2 = rng.gen_range(range.clone()); + let n3 = rng.gen_range(range.clone()); + let nn = rng.gen_range(0..200); + + let n1 = &words[n1]; + let n2 = &words[n2]; + let email_domain = &words[n3]; + + let username = format!("{n1}{n2}{nn}"); + let displayname = Some(format!("{n1} {n2}")); + let email = Some(format!("{username}@{email_domain}")); + let id = DbId::new(); + + add_user( + pool, + &username, + displayname.as_deref(), + email.as_deref(), + Some(id), + ) + .await; + (&mut users).push(id); + } + + users +} diff --git a/src/db.rs b/src/db.rs index 926f3c4..f2ce37d 100644 --- a/src/db.rs +++ b/src/db.rs @@ -30,8 +30,8 @@ pub async fn get_db_pool() -> SqlitePool { } #[cfg(test)] { - use rand_core::RngCore; - let mut rng = rand_core::OsRng; + use rand::RngCore; + let mut rng = rand::thread_rng(); let id = rng.next_u64(); // see https://www.sqlite.org/inmemorydb.html for meaning of the string; // it allows each separate test to have its own dedicated memory-backed db that diff --git a/src/import_utils.rs b/src/import_utils.rs index d10c018..884d3f8 100644 --- a/src/import_utils.rs +++ b/src/import_utils.rs @@ -1,11 +1,17 @@ use sqlx::{query, query_scalar, SqlitePool}; +use tokio::task::JoinSet; +use tokio_retry::Retry; use crate::{ db_id::DbId, util::year_to_epoch, watches::handlers::add_new_watch_impl, ShowKind, Watch, + WatchQuest, }; const USER_EXISTS_QUERY: &str = "select count(*) from witches where id = $1"; +//-************************************************************************ +// the omega user is the system ID, but has no actual power in the app +//-************************************************************************ const OMEGA_ID: u128 = u128::MAX; #[derive(Debug, sqlx::FromRow, Clone)] @@ -57,6 +63,23 @@ pub async fn add_watch_omega(db_pool: &SqlitePool, movie: &ImportMovieOmega) -> } } +pub async fn add_watch_user( + db_pool: &SqlitePool, + watch: &Watch, + quest: WatchQuest, +) -> Result<(), ()> { + if add_new_watch_impl(db_pool, watch, Some(quest)) + .await + .is_ok() + { + println!("{}", watch.id); + Ok(()) + } else { + eprintln!("failed to add \"{}\"", watch.title); + Err(()) + } +} + pub async fn add_user( db_pool: &SqlitePool, username: &str, @@ -82,6 +105,37 @@ pub async fn add_user( } } +pub async fn add_omega_watches(ww_db: &SqlitePool, movies: Vec) { + ensure_omega(ww_db).await; + + let mut set = JoinSet::new(); + + let retry_strategy = tokio_retry::strategy::ExponentialBackoff::from_millis(100) + .map(tokio_retry::strategy::jitter) + .take(4); + + for movie in movies { + let db = ww_db.clone(); + let title = movie.title.as_str(); + let year = movie.year.clone().unwrap(); + let len = movie.length.clone().unwrap(); + let retry_strategy = retry_strategy.clone(); + + let key = format!("{title}{year}{len}"); + set.spawn(async move { + ( + key, + Retry::spawn(retry_strategy, || async { + add_watch_omega(&db, &movie).await + }) + .await, + ) + }); + } + // stragglers + while (set.join_next().await).is_some() {} +} + pub async fn ensure_omega(db_pool: &SqlitePool) -> DbId { if !check_omega_exists(db_pool).await { add_user( diff --git a/src/main.rs b/src/main.rs index d9c1062..4c07a0f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,6 +1,6 @@ use std::net::SocketAddr; -use rand_core::{OsRng, RngCore}; +use rand::{thread_rng, RngCore}; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; use witch_watch::get_db_pool; @@ -18,7 +18,7 @@ async fn main() { let secret = { let mut bytes = [0u8; 64]; - let mut rng = OsRng; + let mut rng = thread_rng(); rng.fill_bytes(&mut bytes); bytes };