Now takes only 6 seconds, instead of 3600. Ready for final tests.

This commit is contained in:
Joe Ardent 2023-07-06 12:24:25 -07:00
parent 4f0540a903
commit 4c44aa12b0
3 changed files with 86 additions and 65 deletions

View file

@ -1,5 +1,3 @@
-rw-r--r-- 1 ardent ardent 1.6M Jul 4 12:27 .witch-watch.db -rw-r--r-- 1 ardent ardent 17M Jul 6 10:05 /home/ardent/.witch-watch.db
-rw-r--r-- 1 ardent ardent 161K Jul 4 12:29 .witch-watch.db-wal
-rw-r--r-- 1 ardent ardent 32K Jul 4 12:29 .witch-watch.db-shm
4 seconds wall to add 10k movies, added by the omega user. 6 seconds to add 98,713 watch quests.

View file

@ -1,7 +1,7 @@
use std::{collections::BTreeSet, ffi::OsString, time::Duration}; use std::{ffi::OsString, time::Duration};
use clap::Parser; use clap::Parser;
use rand::{thread_rng, Rng}; use rand::{rngs::ThreadRng, seq::SliceRandom, thread_rng, Rng};
use rand_distr::Normal; use rand_distr::Normal;
use sqlx::{ use sqlx::{
sqlite::{SqliteConnectOptions, SqlitePoolOptions}, sqlite::{SqliteConnectOptions, SqlitePoolOptions},
@ -11,7 +11,7 @@ use tokio::task::JoinSet;
use tokio_retry::Retry; use tokio_retry::Retry;
use witch_watch::{ use witch_watch::{
get_db_pool, get_db_pool,
import_utils::{add_omega_watches, add_user, add_watch_quest}, import_utils::{add_omega_watches, add_user, add_watch_quests},
DbId, WatchQuest, DbId, WatchQuest,
}; };
@ -61,53 +61,17 @@ async fn main() {
let users = &gen_users(num_users, &words, &ww_db).await; let users = &gen_users(num_users, &words, &ww_db).await;
let movies = &add_omega_watches(&ww_db, &movie_db).await; let movies = &add_omega_watches(&ww_db, &movie_db).await;
let normal = Normal::new(mpu, mpu / 10.0).unwrap();
let rng = &mut thread_rng(); let rng = &mut thread_rng();
for user in users {
let mut joinset = JoinSet::new();
let mut mset = BTreeSet::new(); let normal = Normal::new(mpu, mpu / 10.0).unwrap();
let num_movies = rng.sample(normal) as usize; for &user in users {
add_quests(user, movies, &ww_db, rng, normal).await;
while mset.len() < num_movies {
let idx = rng.gen_range(0..10_000usize);
mset.insert(idx);
}
dbg!("done with mset pop");
let retry_strategy = tokio_retry::strategy::ExponentialBackoff::from_millis(100)
.map(tokio_retry::strategy::jitter)
.take(4);
for movie in mset.iter() {
let movie = movies[*movie];
let quest = WatchQuest {
id: DbId::new(),
user: *user,
watch: movie,
is_public: true,
already_watched: false,
};
let retry_strategy = retry_strategy.clone();
let db = ww_db.clone();
let key = quest.id.as_string();
joinset.spawn(async move {
(
key,
Retry::spawn(retry_strategy, || async {
add_watch_quest(&db, quest).await
})
.await,
)
});
}
// stragglers
while (joinset.join_next().await).is_some() {}
} }
} }
//-************************************************************************
// add the users
//-************************************************************************
async fn gen_users(num: usize, words: &[&str], pool: &SqlitePool) -> Vec<DbId> { async fn gen_users(num: usize, words: &[&str], pool: &SqlitePool) -> Vec<DbId> {
let mut rng = thread_rng(); let mut rng = thread_rng();
let rng = &mut rng; let rng = &mut rng;
@ -119,25 +83,70 @@ async fn gen_users(num: usize, words: &[&str], pool: &SqlitePool) -> Vec<DbId> {
let n3 = rng.gen_range(range.clone()); let n3 = rng.gen_range(range.clone());
let nn = rng.gen_range(0..200); let nn = rng.gen_range(0..200);
let n1 = &words[n1]; let n1 = words[n1].replace('\'', "");
let n2 = &words[n2]; let n2 = words[n2].replace('\'', "");
let email_domain = &words[n3]; let email_domain = words[n3].replace('\'', "");
let username = format!("{n1}{n2}{nn}"); let username = format!("{n1}_{n2}{nn}");
let displayname = Some(format!("{n1} {n2}")); let displayname = Some(format!("{n1} {n2}"));
let email = Some(format!("{username}@{email_domain}")); let email = Some(format!("{username}@{email_domain}"));
let id = DbId::new(); let id = add_user(
add_user(
pool, pool,
&username, &username,
displayname.as_deref(), displayname.as_deref(),
email.as_deref(), email.as_deref(),
Some(id), None,
) )
.await; .await;
(&mut users).push(id); users.push(id);
} }
users users
} }
//-************************************************************************
// batch add quests
//-************************************************************************
async fn add_quests(
user: DbId,
movies: &[DbId],
ww_db: &SqlitePool,
rng: &mut ThreadRng,
normal: Normal<f32>,
) {
let mut tasks = JoinSet::new();
let num_movies = rng.sample(normal) as usize;
let quests: Vec<WatchQuest> = movies
.choose_multiple(rng, num_movies)
.cloned()
.map(|movie| {
let id = DbId::new();
WatchQuest {
id,
user,
watch: movie,
is_public: true,
already_watched: false,
}
})
.collect();
let retry_strategy = tokio_retry::strategy::ExponentialBackoff::from_millis(100)
.map(tokio_retry::strategy::jitter)
.take(4);
let db = ww_db.clone();
tasks.spawn(async move {
let movies = quests;
(
user,
Retry::spawn(retry_strategy, || async {
add_watch_quests(&db, &movies).await
})
.await,
)
});
// get the stragglers
while (tasks.join_next().await).is_some() {}
}

View file

@ -1,4 +1,4 @@
use std::sync::{Arc, Mutex}; use std::sync::Arc;
use sqlx::{query, query_scalar, SqlitePool}; use sqlx::{query, query_scalar, SqlitePool};
use tokio::task::JoinSet; use tokio::task::JoinSet;
@ -77,13 +77,28 @@ pub async fn add_watch_quest(db_pool: &SqlitePool, quest: WatchQuest) -> Result<
} }
} }
pub async fn add_watch_quests(pool: &SqlitePool, quests: &[WatchQuest]) -> Result<(), ()> {
let mut builder =
sqlx::QueryBuilder::new("insert into witch_watch (id, witch, watch, public, watched) ");
builder.push_values(quests.iter(), |mut b, quest| {
b.push_bind(quest.id)
.push_bind(quest.user)
.push_bind(quest.watch)
.push_bind(quest.is_public)
.push_bind(quest.already_watched);
});
let q = builder.build();
q.execute(pool).await.map_err(|_| ())?;
Ok(())
}
pub async fn add_user( pub async fn add_user(
db_pool: &SqlitePool, db_pool: &SqlitePool,
username: &str, username: &str,
displayname: Option<&str>, displayname: Option<&str>,
email: Option<&str>, email: Option<&str>,
id: Option<DbId>, id: Option<DbId>,
) { ) -> DbId {
let pwhash = "you shall not password"; let pwhash = "you shall not password";
let id: DbId = id.unwrap_or_else(DbId::new); let id: DbId = id.unwrap_or_else(DbId::new);
if query(crate::signup::CREATE_QUERY) if query(crate::signup::CREATE_QUERY)
@ -94,12 +109,11 @@ pub async fn add_user(
.bind(pwhash) .bind(pwhash)
.execute(db_pool) .execute(db_pool)
.await .await
.is_ok() .is_err()
{ {
println!("{id}");
} else {
eprintln!("failed to add user \"{username}\""); eprintln!("failed to add user \"{username}\"");
} }
id
} }
pub async fn add_omega_watches(ww_db: &SqlitePool, movie_db: &SqlitePool) -> Vec<DbId> { pub async fn add_omega_watches(ww_db: &SqlitePool, movie_db: &SqlitePool) -> Vec<DbId> {
@ -112,7 +126,7 @@ pub async fn add_omega_watches(ww_db: &SqlitePool, movie_db: &SqlitePool) -> Vec
let mut set = JoinSet::new(); let mut set = JoinSet::new();
let movie_set = Vec::with_capacity(10_000); let movie_set = Vec::with_capacity(10_000);
let movie_set = Arc::new(Mutex::new(movie_set)); let movie_set = Arc::new(std::sync::Mutex::new(movie_set));
let retry_strategy = tokio_retry::strategy::ExponentialBackoff::from_millis(100) let retry_strategy = tokio_retry::strategy::ExponentialBackoff::from_millis(100)
.map(tokio_retry::strategy::jitter) .map(tokio_retry::strategy::jitter)
@ -158,7 +172,7 @@ pub async fn ensure_omega(db_pool: &SqlitePool) -> DbId {
None, None,
Some(OMEGA_ID.into()), Some(OMEGA_ID.into()),
) )
.await .await;
} }
OMEGA_ID.into() OMEGA_ID.into()
} }