refactored omega code, same results as before
This commit is contained in:
parent
5e4f5c07d8
commit
87fe035971
7 changed files with 231 additions and 60 deletions
2
Cargo.lock
generated
2
Cargo.lock
generated
|
@ -2527,7 +2527,7 @@ dependencies = [
|
|||
"justerror",
|
||||
"optional_optional_user",
|
||||
"password-hash",
|
||||
"rand_core",
|
||||
"rand",
|
||||
"serde",
|
||||
"serde_test",
|
||||
"sqlx",
|
||||
|
|
41
Cargo.toml
41
Cargo.toml
|
@ -5,33 +5,34 @@ edition = "2021"
|
|||
default-run = "witch_watch"
|
||||
|
||||
[dependencies]
|
||||
axum = { version = "0.6", features = ["macros", "headers"] }
|
||||
# local proc macro
|
||||
optional_optional_user = {path = "optional_optional_user"}
|
||||
|
||||
# regular external deps
|
||||
argon2 = "0.5"
|
||||
askama = { version = "0.12", features = ["with-axum"] }
|
||||
askama_axum = "0.3"
|
||||
axum-macros = "0.3"
|
||||
tokio = { version = "1", features = ["full", "tracing"], default-features = false }
|
||||
tracing = "0.1"
|
||||
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
|
||||
tower = { version = "0.4", features = ["util", "timeout"], default-features = false }
|
||||
tower-http = { version = "0.4", features = ["add-extension", "trace"] }
|
||||
serde = { version = "1", features = ["derive"] }
|
||||
sqlx = { version = "0.6", default-features = false, features = ["runtime-tokio-rustls", "any", "sqlite", "chrono", "time"] }
|
||||
argon2 = "0.5"
|
||||
rand_core = { version = "0.6", features = ["getrandom"] }
|
||||
thiserror = "1"
|
||||
justerror = "1"
|
||||
password-hash = { version = "0.5", features = ["std", "getrandom"] }
|
||||
axum-login = { version = "0.5", features = ["sqlite", "sqlx"] }
|
||||
unicode-segmentation = "1"
|
||||
async-session = "3"
|
||||
ulid = { version = "1", features = ["rand"] }
|
||||
|
||||
# proc macros:
|
||||
optional_optional_user = {path = "optional_optional_user"}
|
||||
axum = { version = "0.6", features = ["macros", "headers"] }
|
||||
axum-login = { version = "0.5", features = ["sqlite", "sqlx"] }
|
||||
axum-macros = "0.3"
|
||||
chrono = { version = "0.4", default-features = false, features = ["std", "clock"] }
|
||||
clap = { version = "4.3.10", features = ["derive", "env", "unicode", "suggestions", "usage"] }
|
||||
justerror = "1"
|
||||
password-hash = { version = "0.5", features = ["std", "getrandom"] }
|
||||
rand = "0.8"
|
||||
serde = { version = "1", features = ["derive"] }
|
||||
sqlx = { version = "0.6", default-features = false, features = ["runtime-tokio-rustls", "any", "sqlite", "chrono", "time"] }
|
||||
thiserror = "1"
|
||||
tokio = { version = "1", features = ["full", "tracing"], default-features = false }
|
||||
tokio-retry = "0.3.0"
|
||||
tokio-stream = "0.1.14"
|
||||
tower = { version = "0.4", features = ["util", "timeout"], default-features = false }
|
||||
tower-http = { version = "0.4", features = ["add-extension", "trace"] }
|
||||
tracing = "0.1"
|
||||
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
|
||||
ulid = { version = "1", features = ["rand"] }
|
||||
unicode-segmentation = "1"
|
||||
|
||||
[dev-dependencies]
|
||||
axum-test = "9.0.0"
|
||||
|
|
|
@ -1,13 +1,10 @@
|
|||
use std::{ffi::OsString, pin::Pin, time::Duration};
|
||||
use std::{ffi::OsString, time::Duration};
|
||||
|
||||
use clap::Parser;
|
||||
use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions};
|
||||
use tokio::task::JoinSet;
|
||||
use tokio_retry::Retry;
|
||||
use tokio_stream::{Stream, StreamExt};
|
||||
use witch_watch::{
|
||||
get_db_pool,
|
||||
import_utils::{add_watch_omega, ensure_omega, ImportMovieOmega},
|
||||
import_utils::{add_omega_watches, ImportMovieOmega},
|
||||
};
|
||||
|
||||
const MOVIE_QUERY: &str = "select * from movies order by random() limit 10000";
|
||||
|
@ -32,35 +29,10 @@ async fn main() {
|
|||
|
||||
let ww_db = get_db_pool().await;
|
||||
|
||||
let mut movies: Pin<Box<dyn Stream<Item = Result<ImportMovieOmega, _>> + Send>> =
|
||||
sqlx::query_as(MOVIE_QUERY).fetch(&movie_db);
|
||||
let movies: Vec<ImportMovieOmega> = sqlx::query_as(MOVIE_QUERY)
|
||||
.fetch_all(&movie_db)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
ensure_omega(&ww_db).await;
|
||||
|
||||
let mut set = JoinSet::new();
|
||||
|
||||
let retry_strategy = tokio_retry::strategy::ExponentialBackoff::from_millis(100)
|
||||
.map(tokio_retry::strategy::jitter)
|
||||
.take(4);
|
||||
|
||||
while let Ok(Some(movie)) = movies.try_next().await {
|
||||
let db = ww_db.clone();
|
||||
let title = movie.title.as_str();
|
||||
let year = movie.year.clone().unwrap();
|
||||
let len = movie.length.clone().unwrap();
|
||||
let retry_strategy = retry_strategy.clone();
|
||||
|
||||
let key = format!("{title}{year}{len}");
|
||||
set.spawn(async move {
|
||||
(
|
||||
key,
|
||||
Retry::spawn(retry_strategy, || async {
|
||||
add_watch_omega(&db, &movie).await
|
||||
})
|
||||
.await,
|
||||
)
|
||||
});
|
||||
}
|
||||
// stragglers
|
||||
while (set.join_next().await).is_some() {}
|
||||
add_omega_watches(&ww_db, movies).await;
|
||||
}
|
||||
|
|
144
src/bin/import_users.rs
Normal file
144
src/bin/import_users.rs
Normal file
|
@ -0,0 +1,144 @@
|
|||
use std::{ffi::OsString, time::Duration};
|
||||
|
||||
use clap::Parser;
|
||||
use rand::{thread_rng, Rng};
|
||||
use sqlx::{
|
||||
sqlite::{SqliteConnectOptions, SqlitePoolOptions},
|
||||
FromRow, SqlitePool,
|
||||
};
|
||||
use tokio::task::JoinSet;
|
||||
use tokio_retry::Retry;
|
||||
use tokio_stream::{Stream, StreamExt};
|
||||
use witch_watch::{
|
||||
get_db_pool,
|
||||
import_utils::{add_user, add_watch_user, ImportMovieOmega},
|
||||
DbId, User, Watch, WatchQuest,
|
||||
};
|
||||
|
||||
const MOVIE_QUERY: &str = "select * from movies order by random() limit ?";
|
||||
|
||||
#[derive(Debug, Parser)]
|
||||
struct Cli {
|
||||
/// path to the movie database
|
||||
#[clap(long = "database", short)]
|
||||
pub db_path: OsString,
|
||||
|
||||
/// number of users to create
|
||||
#[clap(long, short, default_value_t = 1000)]
|
||||
pub users: usize,
|
||||
|
||||
/// expected gaussian value for number of movies per use
|
||||
#[clap(long = "movies", short, default_value_t = 100)]
|
||||
pub movies_per_user: u32,
|
||||
|
||||
/// path to the dictionary to be used for usernames [default:
|
||||
/// /usr/share/dict/words]
|
||||
#[clap(long, short)]
|
||||
pub words: Option<OsString>,
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
let cli = Cli::parse();
|
||||
let path = cli.db_path;
|
||||
let num_users = cli.users;
|
||||
let mpu = cli.movies_per_user;
|
||||
let dict = if let Some(dict) = cli.words {
|
||||
dict
|
||||
} else {
|
||||
"/usr/share/dict/words".into()
|
||||
};
|
||||
|
||||
let words = std::fs::read_to_string(dict).expect("tried to open {dict:?}");
|
||||
let words: Vec<&str> = words.split('\n').collect();
|
||||
|
||||
let opts = SqliteConnectOptions::new().filename(&path).read_only(true);
|
||||
let movie_db = SqlitePoolOptions::new()
|
||||
.idle_timeout(Duration::from_secs(90))
|
||||
.connect_with(opts)
|
||||
.await
|
||||
.expect("could not open movies db");
|
||||
|
||||
let ww_db = get_db_pool().await;
|
||||
|
||||
let rng = &mut thread_rng();
|
||||
|
||||
let users = gen_users(num_users, &words, &ww_db).await;
|
||||
|
||||
for _ in 0..num_users {
|
||||
let mut movies = sqlx::query(MOVIE_QUERY).bind(mpu).fetch(&movie_db);
|
||||
|
||||
let mut set = JoinSet::new();
|
||||
|
||||
let retry_strategy = tokio_retry::strategy::ExponentialBackoff::from_millis(100)
|
||||
.map(tokio_retry::strategy::jitter)
|
||||
.take(4);
|
||||
|
||||
while let Ok(Some(movie)) = movies.try_next().await {
|
||||
let movie = ImportMovieOmega::from_row(&movie).unwrap();
|
||||
let mut watch: Watch = movie.into();
|
||||
let db = ww_db.clone();
|
||||
let retry_strategy = retry_strategy.clone();
|
||||
|
||||
let user = rng.gen_range(0..num_users);
|
||||
let user = users[user];
|
||||
watch.added_by = user;
|
||||
|
||||
let quest = WatchQuest {
|
||||
id: DbId::new(),
|
||||
user,
|
||||
watch: watch.id,
|
||||
is_public: true,
|
||||
already_watched: false,
|
||||
};
|
||||
|
||||
let key = quest.id.as_string();
|
||||
set.spawn(async move {
|
||||
(
|
||||
key,
|
||||
Retry::spawn(retry_strategy, || async {
|
||||
add_watch_user(&db, &watch, quest).await
|
||||
})
|
||||
.await,
|
||||
)
|
||||
});
|
||||
}
|
||||
|
||||
// stragglers
|
||||
while (set.join_next().await).is_some() {}
|
||||
}
|
||||
}
|
||||
|
||||
async fn gen_users(num: usize, words: &[&str], pool: &SqlitePool) -> Vec<DbId> {
|
||||
let mut rng = thread_rng();
|
||||
let rng = &mut rng;
|
||||
let range = 0usize..(words.len());
|
||||
let mut users = Vec::with_capacity(num);
|
||||
for _ in 0..num {
|
||||
let n1 = rng.gen_range(range.clone());
|
||||
let n2 = rng.gen_range(range.clone());
|
||||
let n3 = rng.gen_range(range.clone());
|
||||
let nn = rng.gen_range(0..200);
|
||||
|
||||
let n1 = &words[n1];
|
||||
let n2 = &words[n2];
|
||||
let email_domain = &words[n3];
|
||||
|
||||
let username = format!("{n1}{n2}{nn}");
|
||||
let displayname = Some(format!("{n1} {n2}"));
|
||||
let email = Some(format!("{username}@{email_domain}"));
|
||||
let id = DbId::new();
|
||||
|
||||
add_user(
|
||||
pool,
|
||||
&username,
|
||||
displayname.as_deref(),
|
||||
email.as_deref(),
|
||||
Some(id),
|
||||
)
|
||||
.await;
|
||||
(&mut users).push(id);
|
||||
}
|
||||
|
||||
users
|
||||
}
|
|
@ -30,8 +30,8 @@ pub async fn get_db_pool() -> SqlitePool {
|
|||
}
|
||||
#[cfg(test)]
|
||||
{
|
||||
use rand_core::RngCore;
|
||||
let mut rng = rand_core::OsRng;
|
||||
use rand::RngCore;
|
||||
let mut rng = rand::thread_rng();
|
||||
let id = rng.next_u64();
|
||||
// see https://www.sqlite.org/inmemorydb.html for meaning of the string;
|
||||
// it allows each separate test to have its own dedicated memory-backed db that
|
||||
|
|
|
@ -1,11 +1,17 @@
|
|||
use sqlx::{query, query_scalar, SqlitePool};
|
||||
use tokio::task::JoinSet;
|
||||
use tokio_retry::Retry;
|
||||
|
||||
use crate::{
|
||||
db_id::DbId, util::year_to_epoch, watches::handlers::add_new_watch_impl, ShowKind, Watch,
|
||||
WatchQuest,
|
||||
};
|
||||
|
||||
const USER_EXISTS_QUERY: &str = "select count(*) from witches where id = $1";
|
||||
|
||||
//-************************************************************************
|
||||
// the omega user is the system ID, but has no actual power in the app
|
||||
//-************************************************************************
|
||||
const OMEGA_ID: u128 = u128::MAX;
|
||||
|
||||
#[derive(Debug, sqlx::FromRow, Clone)]
|
||||
|
@ -57,6 +63,23 @@ pub async fn add_watch_omega(db_pool: &SqlitePool, movie: &ImportMovieOmega) ->
|
|||
}
|
||||
}
|
||||
|
||||
pub async fn add_watch_user(
|
||||
db_pool: &SqlitePool,
|
||||
watch: &Watch,
|
||||
quest: WatchQuest,
|
||||
) -> Result<(), ()> {
|
||||
if add_new_watch_impl(db_pool, watch, Some(quest))
|
||||
.await
|
||||
.is_ok()
|
||||
{
|
||||
println!("{}", watch.id);
|
||||
Ok(())
|
||||
} else {
|
||||
eprintln!("failed to add \"{}\"", watch.title);
|
||||
Err(())
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn add_user(
|
||||
db_pool: &SqlitePool,
|
||||
username: &str,
|
||||
|
@ -82,6 +105,37 @@ pub async fn add_user(
|
|||
}
|
||||
}
|
||||
|
||||
pub async fn add_omega_watches(ww_db: &SqlitePool, movies: Vec<ImportMovieOmega>) {
|
||||
ensure_omega(ww_db).await;
|
||||
|
||||
let mut set = JoinSet::new();
|
||||
|
||||
let retry_strategy = tokio_retry::strategy::ExponentialBackoff::from_millis(100)
|
||||
.map(tokio_retry::strategy::jitter)
|
||||
.take(4);
|
||||
|
||||
for movie in movies {
|
||||
let db = ww_db.clone();
|
||||
let title = movie.title.as_str();
|
||||
let year = movie.year.clone().unwrap();
|
||||
let len = movie.length.clone().unwrap();
|
||||
let retry_strategy = retry_strategy.clone();
|
||||
|
||||
let key = format!("{title}{year}{len}");
|
||||
set.spawn(async move {
|
||||
(
|
||||
key,
|
||||
Retry::spawn(retry_strategy, || async {
|
||||
add_watch_omega(&db, &movie).await
|
||||
})
|
||||
.await,
|
||||
)
|
||||
});
|
||||
}
|
||||
// stragglers
|
||||
while (set.join_next().await).is_some() {}
|
||||
}
|
||||
|
||||
pub async fn ensure_omega(db_pool: &SqlitePool) -> DbId {
|
||||
if !check_omega_exists(db_pool).await {
|
||||
add_user(
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
use std::net::SocketAddr;
|
||||
|
||||
use rand_core::{OsRng, RngCore};
|
||||
use rand::{thread_rng, RngCore};
|
||||
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
|
||||
use witch_watch::get_db_pool;
|
||||
|
||||
|
@ -18,7 +18,7 @@ async fn main() {
|
|||
|
||||
let secret = {
|
||||
let mut bytes = [0u8; 64];
|
||||
let mut rng = OsRng;
|
||||
let mut rng = thread_rng();
|
||||
rng.fill_bytes(&mut bytes);
|
||||
bytes
|
||||
};
|
||||
|
|
Loading…
Reference in a new issue