Clean up the import code.

Make things as batch-y as possible; add timers; tweak the db.
This commit is contained in:
Joe Ardent 2023-07-08 13:16:05 -07:00
parent c5993f57ee
commit 0e016552ab
6 changed files with 128 additions and 149 deletions

View file

@ -31,11 +31,10 @@ create table if not exists watches (
create table if not exists witch_watch ( create table if not exists witch_watch (
witch blob not null, witch blob not null,
watch blob not null, watch blob not null,
party blob, -- list of witch IDs, but we can also scan for friends that want to watch the same thing
priority int, -- 1-5 how much do you want to watch it priority int, -- 1-5 how much do you want to watch it
public boolean not null, public boolean not null default true,
watched boolean not null, watched boolean not null default false,
when_added int, when_added int not null default (unixepoch()),
when_watched int, when_watched int,
last_updated int not null default (unixepoch()), last_updated int not null default (unixepoch()),
foreign key (witch) references witches (id) on delete cascade on update no action, foreign key (witch) references witches (id) on delete cascade on update no action,

View file

@ -24,5 +24,16 @@ async fn main() {
let ww_db = get_db_pool().await; let ww_db = get_db_pool().await;
add_omega_watches(&ww_db, &movie_db).await; let start = std::time::Instant::now();
add_omega_watches(&ww_db, &movie_db).await.unwrap();
let end = std::time::Instant::now();
let dur = (end - start).as_secs_f32();
let rows: i32 = sqlx::query_scalar("select count(*) from watches")
.fetch_one(&ww_db)
.await
.unwrap();
println!("Added {rows} movies in {dur} seconds");
} }

View file

@ -1,7 +1,7 @@
use std::{ffi::OsString, time::Duration}; use std::{ffi::OsString, time::Duration};
use clap::Parser; use clap::Parser;
use rand::{rngs::ThreadRng, seq::SliceRandom, thread_rng, Rng}; use rand::{seq::SliceRandom, thread_rng, Rng};
use rand_distr::Normal; use rand_distr::Normal;
use sqlx::{ use sqlx::{
sqlite::{SqliteConnectOptions, SqlitePoolOptions}, sqlite::{SqliteConnectOptions, SqlitePoolOptions},
@ -11,30 +11,10 @@ use tokio::task::JoinSet;
use tokio_retry::Retry; use tokio_retry::Retry;
use witch_watch::{ use witch_watch::{
get_db_pool, get_db_pool,
import_utils::{add_omega_watches, add_user, add_watch_quests}, import_utils::{add_omega_watches, add_users, add_watch_quests},
DbId, WatchQuest, DbId, User, WatchQuest,
}; };
#[derive(Debug, Parser)]
struct Cli {
/// path to the movie database
#[clap(long = "database", short)]
pub db_path: OsString,
/// number of users to create
#[clap(long, short, default_value_t = 1000)]
pub users: usize,
/// expected gaussian value for number of movies per use
#[clap(long = "movies", short, default_value_t = 100)]
pub movies_per_user: u32,
/// path to the dictionary to be used for usernames [default:
/// /usr/share/dict/words]
#[clap(long, short)]
pub words: Option<OsString>,
}
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
let cli = Cli::parse(); let cli = Cli::parse();
@ -52,21 +32,29 @@ async fn main() {
let opts = SqliteConnectOptions::new().filename(&path).read_only(true); let opts = SqliteConnectOptions::new().filename(&path).read_only(true);
let movie_db = SqlitePoolOptions::new() let movie_db = SqlitePoolOptions::new()
.idle_timeout(Duration::from_secs(90)) .idle_timeout(Duration::from_secs(3))
.connect_with(opts) .connect_with(opts)
.await .await
.expect("could not open movies db"); .expect("could not open movies db");
let ww_db = get_db_pool().await; let ww_db = get_db_pool().await;
let users = &gen_users(num_users, &words, &ww_db).await; let users = &gen_users(num_users, &words, &ww_db).await;
let movies = &add_omega_watches(&ww_db, &movie_db).await; let movies = &add_omega_watches(&ww_db, &movie_db).await.unwrap();
let rng = &mut thread_rng(); let rng = &mut thread_rng();
let normal = Normal::new(mpu, mpu / 10.0).unwrap(); let normal = Normal::new(mpu, mpu / 10.0).unwrap();
let start = std::time::Instant::now();
for &user in users { for &user in users {
add_quests(user, movies, &ww_db, rng, normal).await; add_quests(user, movies, &ww_db, rng, normal).await;
} }
let end = std::time::Instant::now();
let rows: i32 = sqlx::query_scalar("select count(*) from witch_watch")
.fetch_one(&ww_db)
.await
.unwrap();
let dur = (end - start).as_secs_f32();
println!("Added {rows} quests in {dur} seconds");
} }
//-************************************************************************ //-************************************************************************
@ -90,28 +78,30 @@ async fn gen_users(num: usize, words: &[&str], pool: &SqlitePool) -> Vec<DbId> {
let username = format!("{n1}_{n2}{nn}"); let username = format!("{n1}_{n2}{nn}");
let displayname = Some(format!("{n1} {n2}")); let displayname = Some(format!("{n1} {n2}"));
let email = Some(format!("{username}@{email_domain}")); let email = Some(format!("{username}@{email_domain}"));
let id = add_user( let id = DbId::new();
pool, let user = User {
&username, id,
displayname.as_deref(), username,
email.as_deref(), displayname,
None, email,
) last_seen: None,
.await; pwhash: "can't password this".to_string(),
users.push(id); };
users.push(user);
} }
add_users(pool, &users).await.unwrap();
users users.into_iter().map(|u| u.id).collect()
} }
//-************************************************************************ //-************************************************************************
// batch add quests // batch add quests
//-************************************************************************ //-************************************************************************
async fn add_quests( async fn add_quests<R: Rng>(
user: DbId, user: DbId,
movies: &[DbId], movies: &[DbId],
ww_db: &SqlitePool, ww_db: &SqlitePool,
rng: &mut ThreadRng, rng: &mut R,
normal: Normal<f32>, normal: Normal<f32>,
) { ) {
let mut tasks = JoinSet::new(); let mut tasks = JoinSet::new();
@ -119,17 +109,17 @@ async fn add_quests(
let quests: Vec<WatchQuest> = movies let quests: Vec<WatchQuest> = movies
.choose_multiple(rng, num_movies) .choose_multiple(rng, num_movies)
.cloned() .cloned()
.map(|movie| WatchQuest { .map(|watch| WatchQuest {
user, user,
watch: movie, watch,
is_public: true, is_public: true,
already_watched: false, already_watched: false,
}) })
.collect(); .collect();
let retry_strategy = tokio_retry::strategy::ExponentialBackoff::from_millis(100) let retry_strategy = tokio_retry::strategy::ExponentialBackoff::from_millis(10)
.map(tokio_retry::strategy::jitter) .map(tokio_retry::strategy::jitter)
.take(4); .take(3);
let db = ww_db.clone(); let db = ww_db.clone();
tasks.spawn(async move { tasks.spawn(async move {
@ -146,3 +136,23 @@ async fn add_quests(
// get the stragglers // get the stragglers
while (tasks.join_next().await).is_some() {} while (tasks.join_next().await).is_some() {}
} }
#[derive(Debug, Parser)]
pub struct Cli {
/// path to the movie database
#[clap(long = "database", short)]
pub db_path: OsString,
/// number of users to create
#[clap(long, short, default_value_t = 1000)]
pub users: usize,
/// expected gaussian value for number of movies per use
#[clap(long = "movies", short, default_value_t = 100)]
pub movies_per_user: u32,
/// path to the dictionary to be used for usernames [default:
/// /usr/share/dict/words]
#[clap(long, short)]
pub words: Option<OsString>,
}

View file

@ -1,15 +1,6 @@
use std::sync::Arc; use sqlx::{query_scalar, SqlitePool};
use sqlx::{query, query_scalar, SqlitePool}; use crate::{db_id::DbId, util::year_to_epoch, ShowKind, User, Watch, WatchQuest};
use tokio::task::JoinSet;
use tokio_retry::Retry;
use crate::{
db_id::DbId,
util::year_to_epoch,
watches::handlers::{add_new_watch_impl, add_watch_quest_impl},
ShowKind, Watch, WatchQuest,
};
const USER_EXISTS_QUERY: &str = "select count(*) from witches where id = $1"; const USER_EXISTS_QUERY: &str = "select count(*) from witches where id = $1";
@ -20,6 +11,8 @@ const MOVIE_QUERY: &str = "select * from movies order by random() limit 10000";
//-************************************************************************ //-************************************************************************
const OMEGA_ID: u128 = u128::MAX; const OMEGA_ID: u128 = u128::MAX;
const BULK_INSERT: usize = 2_000;
#[derive(Debug, sqlx::FromRow, Clone)] #[derive(Debug, sqlx::FromRow, Clone)]
pub struct ImportMovieOmega { pub struct ImportMovieOmega {
pub title: String, pub title: String,
@ -58,64 +51,40 @@ impl From<&ImportMovieOmega> for Watch {
//-************************************************************************ //-************************************************************************
// utility functions for building CLI tools, currently just for benchmarking // utility functions for building CLI tools, currently just for benchmarking
//-************************************************************************ //-************************************************************************
pub async fn add_watch_omega(db_pool: &SqlitePool, movie: &ImportMovieOmega) -> Result<DbId, ()> {
let watch: Watch = movie.into();
if add_new_watch_impl(db_pool, &watch, None).await.is_ok() {
Ok(watch.id)
} else {
eprintln!("failed to add \"{}\"", watch.title);
Err(())
}
}
pub async fn add_watch_quest(db_pool: &SqlitePool, quest: WatchQuest) -> Result<(), ()> {
if add_watch_quest_impl(db_pool, &quest).await.is_ok() {
Ok(())
} else {
eprintln!("failed to add {:?}", quest.id());
Err(())
}
}
pub async fn add_watch_quests(pool: &SqlitePool, quests: &[WatchQuest]) -> Result<(), ()> { pub async fn add_watch_quests(pool: &SqlitePool, quests: &[WatchQuest]) -> Result<(), ()> {
let mut builder = sqlx::QueryBuilder::new("insert into witch_watch (witch, watch) ");
builder.push_values(quests, |mut b, quest| {
let user = quest.user;
let watch = quest.watch;
//eprintln!("{user}, {watch}");
b.push_bind(user).push_bind(watch);
});
let q = builder.build();
q.execute(pool).await.map_err(|e| {
dbg!(e);
})?;
Ok(())
}
pub async fn add_users(db_pool: &SqlitePool, users: &[User]) -> Result<(), ()> {
let mut builder = let mut builder =
sqlx::QueryBuilder::new("insert into witch_watch (witch, watch, public, watched) "); sqlx::QueryBuilder::new("insert into witches (id, username, displayname, email, pwhash) ");
builder.push_values(quests.iter(), |mut b, quest| {
b.push_bind(quest.user) builder.push_values(users.iter(), |mut b, user| {
.push_bind(quest.watch) b.push_bind(user.id)
.push_bind(quest.is_public) .push_bind(&user.username)
.push_bind(quest.already_watched); .push_bind(&user.displayname)
.push_bind(&user.email)
.push_bind(&user.pwhash);
}); });
let q = builder.build(); let q = builder.build();
q.execute(pool).await.map_err(|_| ())?; q.execute(db_pool).await.map_err(|_| ())?;
Ok(()) Ok(())
} }
pub async fn add_user( pub async fn add_omega_watches(ww_db: &SqlitePool, movie_db: &SqlitePool) -> Result<Vec<DbId>, ()> {
db_pool: &SqlitePool,
username: &str,
displayname: Option<&str>,
email: Option<&str>,
id: Option<DbId>,
) -> DbId {
let pwhash = "you shall not password";
let id: DbId = id.unwrap_or_else(DbId::new);
if query(crate::signup::CREATE_QUERY)
.bind(id)
.bind(username)
.bind(displayname)
.bind(email)
.bind(pwhash)
.execute(db_pool)
.await
.is_err()
{
eprintln!("failed to add user \"{username}\"");
}
id
}
pub async fn add_omega_watches(ww_db: &SqlitePool, movie_db: &SqlitePool) -> Vec<DbId> {
ensure_omega(ww_db).await; ensure_omega(ww_db).await;
let movies: Vec<ImportMovieOmega> = sqlx::query_as(MOVIE_QUERY) let movies: Vec<ImportMovieOmega> = sqlx::query_as(MOVIE_QUERY)
@ -123,55 +92,45 @@ pub async fn add_omega_watches(ww_db: &SqlitePool, movie_db: &SqlitePool) -> Vec
.await .await
.unwrap(); .unwrap();
let mut set = JoinSet::new(); let mut ids = Vec::with_capacity(10_000);
let movie_set = Vec::with_capacity(10_000); let omega: DbId = OMEGA_ID.into();
let movie_set = Arc::new(std::sync::Mutex::new(movie_set));
let retry_strategy = tokio_retry::strategy::ExponentialBackoff::from_millis(100) for movies in movies.as_slice().chunks(BULK_INSERT) {
.map(tokio_retry::strategy::jitter) let mut builder = sqlx::QueryBuilder::new(
.take(4); "insert into watches (id, kind, title, length, release_date, added_by) ",
);
for movie in movies { builder.push_values(movies, |mut b, movie| {
let db = ww_db.clone(); let id = DbId::new();
let title = movie.title.as_str(); ids.push(id);
let year = movie.year.clone().unwrap(); let title = &movie.title;
let len = movie.length.clone().unwrap();
let retry_strategy = retry_strategy.clone();
let movie_set = movie_set.clone();
let key = format!("{title}{year}{len}"); b.push_bind(id)
set.spawn(async move { .push_bind(ShowKind::Movie)
( .push_bind(title)
key, .push_bind(movie.length.as_ref().and_then(|l| l.parse::<i64>().ok()))
Retry::spawn(retry_strategy, || async { .push_bind(year_to_epoch(movie.year.as_deref()))
if let Ok(id) = add_watch_omega(&db, &movie).await { .push_bind(omega);
let mut mset = movie_set.lock().unwrap();
mset.push(id);
Ok(())
} else {
Err(())
}
})
.await,
)
}); });
let q = builder.build();
q.execute(ww_db).await.map_err(|_| ())?;
} }
// stragglers
while (set.join_next().await).is_some() {} Ok(ids)
let movies = movie_set.lock().unwrap().clone();
movies
} }
pub async fn ensure_omega(db_pool: &SqlitePool) -> DbId { pub async fn ensure_omega(db_pool: &SqlitePool) -> DbId {
if !check_omega_exists(db_pool).await { if !check_omega_exists(db_pool).await {
add_user( let omega = User {
db_pool, id: OMEGA_ID.into(),
"The Omega User", username: "The Omega User".to_string(),
Some("I am the end of all watches."), displayname: Some("I am the end of all watches".to_string()),
None, email: None,
Some(OMEGA_ID.into()), last_seen: None,
) pwhash: "you shall not password".to_string(),
.await; };
add_users(db_pool, &[omega]).await.unwrap();
} }
OMEGA_ID.into() OMEGA_ID.into()
} }

View file

@ -20,7 +20,7 @@ pub struct User {
pub displayname: Option<String>, pub displayname: Option<String>,
pub email: Option<String>, pub email: Option<String>,
pub last_seen: Option<i64>, pub last_seen: Option<i64>,
pub(crate) pwhash: String, pub pwhash: String,
} }
impl Debug for User { impl Debug for User {

View file

@ -199,7 +199,7 @@ pub async fn post_add_watch_quest(
todo!() todo!()
} }
pub async fn add_watch_quest_impl(pool: &SqlitePool, quest: &WatchQuest) -> Result<(), ()> { pub async fn _add_watch_quest_impl(pool: &SqlitePool, quest: &WatchQuest) -> Result<(), ()> {
query(ADD_WITCH_WATCH_QUERY) query(ADD_WITCH_WATCH_QUERY)
.bind(quest.user) .bind(quest.user)
.bind(quest.watch) .bind(quest.watch)