From bb33e28849c65f60106e1f55fc89f723d39420cd Mon Sep 17 00:00:00 2001 From: joe Date: Thu, 1 Jan 2026 12:42:18 -0800 Subject: [PATCH] mostly done with the refactor --- migrations/0002_feeds.down.sql | 1 - migrations/0002_feeds.up.sql | 9 - migrations/0002_feeds_and_status.down.sql | 2 + migrations/0002_feeds_and_status.up.sql | 16 ++ migrations/0003_runs.up.sql | 8 +- migrations/0004_fetches.down.sql | 1 - migrations/0004_fetches.up.sql | 6 - src/db.rs | 53 +----- src/lib.rs | 197 +++++++++++++--------- src/main.rs | 2 +- src/server.rs | 29 +--- 11 files changed, 141 insertions(+), 183 deletions(-) delete mode 100644 migrations/0002_feeds.down.sql delete mode 100644 migrations/0002_feeds.up.sql create mode 100644 migrations/0002_feeds_and_status.down.sql create mode 100644 migrations/0002_feeds_and_status.up.sql delete mode 100644 migrations/0004_fetches.down.sql delete mode 100644 migrations/0004_fetches.up.sql diff --git a/migrations/0002_feeds.down.sql b/migrations/0002_feeds.down.sql deleted file mode 100644 index e47ecee..0000000 --- a/migrations/0002_feeds.down.sql +++ /dev/null @@ -1 +0,0 @@ -DROP TABLE IF EXISTS feeds; diff --git a/migrations/0002_feeds.up.sql b/migrations/0002_feeds.up.sql deleted file mode 100644 index 62dff28..0000000 --- a/migrations/0002_feeds.up.sql +++ /dev/null @@ -1,9 +0,0 @@ -CREATE TABLE IF NOT EXISTS feeds ( - id INTEGER PRIMARY KEY, - url TEXT UNIQUE NOT NULL, - added_by INT NOT NULL, - active BOOLEAN NOT NULL DEFAULT FALSE, - created_at DATETIME NOT NULL DEFAULT current_timestamp, - updated_at DATETIME NOT NULL DEFAULT current_timestamp, - FOREIGN KEY (added_by) REFERENCES users(zulip_id) -); diff --git a/migrations/0002_feeds_and_status.down.sql b/migrations/0002_feeds_and_status.down.sql new file mode 100644 index 0000000..bd58015 --- /dev/null +++ b/migrations/0002_feeds_and_status.down.sql @@ -0,0 +1,2 @@ +DROP TABLE IF EXISTS status; +DROP TABLE IF EXISTS feeds; diff --git a/migrations/0002_feeds_and_status.up.sql b/migrations/0002_feeds_and_status.up.sql new file mode 100644 index 0000000..f0b14ff --- /dev/null +++ b/migrations/0002_feeds_and_status.up.sql @@ -0,0 +1,16 @@ +CREATE TABLE IF NOT EXISTS feeds ( + id INTEGER NOT NULL PRIMARY KEY, + url TEXT NOT NULL, + owner INT NOT NULL, + created DATETIME NOT NULL DEFAULT current_timestamp, + FOREIGN KEY (owner) REFERENCES users(zulip_id), + UNIQUE(url, owner) +); + +CREATE TABLE IF NOT EXISTS status ( + id INTEGER NOT NULL PRIMARY KEY, + feed INTEGER NOT NULL, + updated DATETIME NOT NULL DEFAULT current_timestamp, + active BOOLEAN NOT NULL DEFAULT FALSE, + FOREIGN KEY (feed) REFERENCES feeds(id) +); diff --git a/migrations/0003_runs.up.sql b/migrations/0003_runs.up.sql index 79281dd..627d3cb 100644 --- a/migrations/0003_runs.up.sql +++ b/migrations/0003_runs.up.sql @@ -1,6 +1,8 @@ -CREATE TABLE IF NOT EXISTS successful_runs ( - id INTEGER PRIMARY KEY, - date_time DATETIME NOT NULL DEFAULT current_timestamp, +CREATE TABLE IF NOT EXISTS runs ( + id INTEGER NOT NULL PRIMARY KEY, + run DATETIME NOT NULL DEFAULT current_timestamp, feed INTEGER NOT NULL, + fetched DATETIME, + posted DATETIME, FOREIGN KEY (feed) REFERENCES feeds(id) ); diff --git a/migrations/0004_fetches.down.sql b/migrations/0004_fetches.down.sql deleted file mode 100644 index ef89582..0000000 --- a/migrations/0004_fetches.down.sql +++ /dev/null @@ -1 +0,0 @@ -DROP TABLE IF EXISTS fetches; diff --git a/migrations/0004_fetches.up.sql b/migrations/0004_fetches.up.sql deleted file mode 100644 index ae72478..0000000 --- a/migrations/0004_fetches.up.sql +++ /dev/null @@ -1,6 +0,0 @@ -CREATE TABLE IF NOT EXISTS fetches ( - id INTEGER PRIMARY KEY, - feed INT NOT NULL, - fetched DATETIME NOT NULL DEFAULT current_timestamp, - FOREIGN KEY (feed) REFERENCES feeds(id) -); diff --git a/src/db.rs b/src/db.rs index 9fbdfe9..cd8c76e 100644 --- a/src/db.rs +++ b/src/db.rs @@ -5,65 +5,16 @@ const TIMEOUT: u64 = 2000; // in milliseconds use std::time::Duration; use sqlx::{ - Sqlite, SqlitePool, - query::Query, - sqlite::{ - SqliteArguments, SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions, SqliteRow, - }, + SqlitePool, + sqlite::{SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions}, }; use crate::BlogdorTheAggregator; -pub enum DbAction<'q> { - Execute(Query<'q, Sqlite, SqliteArguments<'q>>), - FetchOne(Query<'q, Sqlite, SqliteArguments<'q>>), - FetchMany(Query<'q, Sqlite, SqliteArguments<'q>>), - FetchOptional(Query<'q, Sqlite, SqliteArguments<'q>>), -} - -pub enum DbValue { - None, - Optional(Option), - One(SqliteRow), - Many(Vec), -} - impl BlogdorTheAggregator { pub async fn close_db(&self) { self.db.close().await; } - - pub async fn db_action<'q>(&self, query: DbAction<'q>) -> Result { - match query { - DbAction::Execute(q) => { - let t = self.db.begin().await.map_err(|e| format!("{e}"))?; - q.execute(&self.db).await.map_err(|e| format!("{e}"))?; - t.commit().await.map_err(|e| format!("{e}"))?; - Ok(DbValue::None) - } - DbAction::FetchOne(q) => { - let t = self.db.begin().await.map_err(|e| format!("{e}"))?; - let r = q.fetch_one(&self.db).await.map_err(|e| format!("{e}"))?; - t.commit().await.map_err(|e| format!("{e}"))?; - Ok(DbValue::One(r)) - } - DbAction::FetchMany(q) => { - let t = self.db.begin().await.map_err(|e| format!("{e}"))?; - let r = q.fetch_all(&self.db).await.map_err(|e| format!("{e}"))?; - t.commit().await.map_err(|e| format!("{e}"))?; - Ok(DbValue::Many(r)) - } - DbAction::FetchOptional(q) => { - let t = self.db.begin().await.map_err(|e| format!("{e}"))?; - let r = q - .fetch_optional(&self.db) - .await - .map_err(|e| format!("{e}"))?; - t.commit().await.map_err(|e| format!("{e}"))?; - Ok(DbValue::Optional(r)) - } - } - } } pub async fn get_db_pool() -> SqlitePool { diff --git a/src/lib.rs b/src/lib.rs index 664c203..6cc0d38 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,7 +5,7 @@ use reqwest::{Client, Response, StatusCode}; use serde::{Deserialize, Serialize}; use server::ServerState; use sqlx::{ - FromRow, Row, SqlitePool, + FromRow, SqlitePool, types::chrono::{DateTime, Utc}, }; use tokio::{sync::mpsc::UnboundedSender, task::JoinSet}; @@ -13,7 +13,6 @@ use tokio_util::{bytes::Buf, sync::CancellationToken}; use unicode_segmentation::UnicodeSegmentation; mod db; -use db::{DbAction, DbValue}; pub mod server; @@ -25,11 +24,12 @@ const LAST_FETCHED: DateTime = DateTime::from_timestamp_nanos(0); const STALE_FETCH_THRESHOLD: Duration = Duration::from_hours(24); const ADD_FEED_QUERY: &str = ""; -const ACTIVE_FEEDS_QUERY: &str = "select id, url, created_at from feeds where active = true"; -const STALE_FEEDS_QUERY: &str = - "select id, url, added_by, created_at from feeds where active = true"; -const FETCH_RUN_QUERY: &str = - "select date_time from successful_runs where feed = ? order by id desc limit 1"; +const ACTIVE_FEEDS_QUERY: &str = r#"SELECT id, url, owner, created, fetched, posted, run, feed FROM feeds + INNER JOIN + (SELECT feed, MAX(id) _, run, fetched, posted FROM runs WHERE feed IN + (SELECT feed FROM (SELECT feed, MAX(id), active FROM status GROUP BY feed) WHERE active = TRUE) + GROUP BY feed) r + ON feeds.id = r.feed"#; pub struct BlogdorTheAggregator { db: SqlitePool, @@ -48,6 +48,7 @@ pub struct FeedEntry { post_url: String, feed_url: String, feed_id: i64, + owner: i64, title: String, published: DateTime, received: DateTime, @@ -62,10 +63,17 @@ pub struct FeedResult { pub feed_id: i64, } -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone)] pub struct NewFeed { - feed: String, - user: String, + url: String, + owner: String, + result_sender: UnboundedSender>, +} + +impl PartialEq for NewFeed { + fn eq(&self, other: &Self) -> bool { + self.url == other.url && self.owner == other.owner + } } #[derive(Debug, Default, Clone, PartialEq, Eq, serde::Serialize)] @@ -91,13 +99,16 @@ pub struct ActiveFeed { url: String, id: i64, owner: i64, - added: DateTime, + created: DateTime, + updated: DateTime, #[sqlx(flatten)] last_run: FeedRun, } #[derive(Debug, Default, Clone, PartialEq, Eq, Serialize, Deserialize, FromRow)] pub struct FeedRun { + feed: i64, + run: DateTime, fetched: Option>, posted: Option>, } @@ -140,47 +151,32 @@ impl BlogdorTheAggregator { self.cancel.cancelled().await } - pub async fn spawn_http(&self, announce_tx: UnboundedSender, client: reqwest::Client) { + pub async fn spawn_http(&self, announce_tx: UnboundedSender) { let state = ServerState::new( self.db.clone(), &self.zulip_to_blogdor_email, &self.blogdor_token, announce_tx, - client, + self.client.clone(), ); server::spawn_server(state, self.cancel.clone()).await; } pub async fn check_feeds(&self) -> Result, String> { tracing::debug!("checking feeds"); - let feed_query = sqlx::query(ACTIVE_FEEDS_QUERY); let feeds = self - .db_action(DbAction::FetchMany(feed_query)) + .active_feeds() .await - .map_err(|e| { - tracing::error!("got error getting feeds from DB: {e}"); - "couldn't get active feeds".to_string() - })?; - let DbValue::Many(feeds) = feeds else { - unreachable!() - }; + .map_err(|_| "could not check feeds".to_string())?; let mut handles = JoinSet::new(); for feed in feeds { - let id = feed.get("id"); - let url = feed.get("url"); - let created_at: DateTime = feed.get("created_at"); - let last = if let Ok(v) = self - .db_action(DbAction::FetchOne(sqlx::query(FETCH_RUN_QUERY))) - .await - { - let DbValue::One(r) = v else { unreachable!() }; - r.get("date_time") - } else { - created_at - }; - - handles.spawn(check_feed(self.client.clone(), id, url, last)); + let id = feed.id; + let url = feed.url; + let created_at = feed.created; + let last_run = feed.last_run; + let last = last_run.posted.unwrap_or(created_at); + handles.spawn(check_feed(self.client.clone(), id, url, last, feed.owner)); } let mut feed_results = Vec::new(); @@ -195,7 +191,7 @@ impl BlogdorTheAggregator { tracing::error!("got error fetching feed: {e}"); continue; }; - //self.db_action(DbAction::Execute(sqlx::query("insert into succ"))); + feed_results.push(feed_result); } @@ -203,12 +199,9 @@ impl BlogdorTheAggregator { } pub async fn check_stale(&self) { - let feeds = match sqlx::query!("select id, url, added_by, created_at from feeds") - .fetch_all(&self.db) - .await - { - Err(e) => { - tracing::error!("could not fetch feeds: {e}"); + let feeds = match self.active_feeds().await { + Err(_) => { + tracing::error!("could not check stale feeds"); return; } Ok(f) => f, @@ -217,27 +210,11 @@ impl BlogdorTheAggregator { let now = Utc::now(); for feed in feeds.into_iter() { - let id = feed.id; let url = &feed.url; - let user = feed.added_by; - let fetched = match sqlx::query!( - "select fetched from fetches where feed = ? order by id desc limit 1", - id - ) - .fetch_optional(&self.db) - .await - { - Err(e) => { - tracing::error!("could not get last fetched for {url} from db: {e}"); - continue; - } - Ok(f) => f, - }; - let dur = if let Some(fetched) = fetched { - now - fetched.fetched.and_utc() - } else { - now - feed.created_at.and_utc() - }; + let user = feed.owner; + let run = feed.last_run; + let last = run.fetched.unwrap_or(feed.created); + let dur = now - last; if dur.num_seconds() > STALE_FETCH_THRESHOLD.as_secs() as i64 { let hours = dur.num_hours() % 24; @@ -260,8 +237,11 @@ impl BlogdorTheAggregator { } } - pub async fn announce_feed(&self, announce: &NewFeed) { - let content = format!("{} added a new feed: {}", announce.user, announce.feed); + pub async fn add_feed(&self, announce: &NewFeed) { + let content = format!( + "@**|{}**: added a new feed: {}", + announce.owner, announce.url + ); let msg = ZulipMessage { to: self.channel_id, typ: MessageType::Stream, @@ -283,17 +263,13 @@ impl BlogdorTheAggregator { // will also update the successful_runs table if it posts to zulip pub async fn post_entries(&self, posts: &[FeedEntry]) { let FeedEntry { - feed_id, received, .. + feed_id, + received, + owner, + .. } = posts.last().unwrap(); let mut success = true; - let Ok(user) = sqlx::query!("select added_by from feeds where id = ?", feed_id) - .fetch_one(&self.db) - .await - else { - tracing::error!("could not get user from db"); - return; - }; - let user = user.added_by; + for post in posts.iter() { let body = post.body.as_deref().unwrap_or(""); @@ -306,7 +282,7 @@ impl BlogdorTheAggregator { let url = post.post_url.as_str(); let title = post.title.as_str(); - let header = format!("New post in a feed added by @**|{user}**: {title}"); + let header = format!("New post in a feed added by @**|{owner}**: {title}"); let content = format!( "{header}\n---\n{body}{tail}\n\n---\noriginally posted to {url}, on {}", @@ -336,17 +312,59 @@ impl BlogdorTheAggregator { } tokio::time::sleep(ZULIP_INTERVAL).await; } - if success - && let Err(e) = sqlx::query!( - "insert into successful_runs (feed, date_time) values (?, ?)", - feed_id, - received - ) + let now = Utc::now(); + let posted = if success { Some(now) } else { None }; + self.record_run(*feed_id, *received, posted).await; + } + + async fn record_run(&self, feed: i64, fetched: DateTime, posted: Option>) { + let Ok(db_posted) = sqlx::query!( + "select posted from runs where feed = ? order by id desc limit 1", + feed + ) + .fetch_optional(&self.db) + .await + else { + tracing::error!("got db error fetching runs"); + return; + }; + let db_posted = db_posted.and_then(|p| p.posted.map(|p| p.and_utc())); + let posted = posted.or(db_posted); + if let Err(e) = sqlx::query!( + "insert into runs (fetched, posted) values (?, ?)", + fetched, + posted + ) + .execute(&self.db) + .await + { + tracing::error!("got error adding row to runs: {e}"); + } + } + + async fn add_user(&self, user: u32) -> Result<(), String> { + if let Err(e) = sqlx::query!("insert into users (zulip_id) values (?)", user) .execute(&self.db) .await { - tracing::error!("could not insert run for {feed_id}, got {e}"); + match e { + sqlx::Error::Database(database_error) => { + // the users table has only one constraint, which is a uniqueness one on + // zulip_id, so if it's violated, we don't care, it just means we already have + // that user; if it's not a constraint violation, then something + // else and bad has happened + if database_error.constraint().is_some() { + return Ok(()); + } + } + sqlx::Error::Io(error) => { + tracing::error!("got IO error: {error}"); + return Err("you should maybe retry that".to_string()); + } + _ => return Err("yikes".to_string()), + } } + Ok(()) } async fn send_zulip_message<'s>(&'s self, msg: &ZulipMessage<'s>) -> Result { @@ -360,6 +378,17 @@ impl BlogdorTheAggregator { .await .map_err(|e| format!("{e}")) } + + async fn active_feeds(&self) -> Result, ()> { + let feeds: Vec = sqlx::query_as(ACTIVE_FEEDS_QUERY) + .fetch_all(&self.db) + .await + .map_err(|e| { + tracing::error!("error fetching feeds: {e}"); + })?; + + Ok(feeds) + } } trait Posted { @@ -379,13 +408,12 @@ async fn check_feed( feed_id: i64, url: String, last_fetched: DateTime, + owner: i64, ) -> Result { tracing::debug!("checking {url}"); let now = Utc::now(); - let mut feed = fetch_and_parse_feed(&url, &client).await?; - let mut entries = None; feed.entries.sort_by_key(|e| std::cmp::Reverse(e.posted())); for post in feed.entries.into_iter().take(5) { @@ -398,6 +426,7 @@ async fn check_feed( .map(|l| l.href) .unwrap_or("".to_string()), feed_id, + owner, feed_url: url.clone(), title: post .title diff --git a/src/main.rs b/src/main.rs index 5b12cad..a56afea 100644 --- a/src/main.rs +++ b/src/main.rs @@ -12,7 +12,7 @@ async fn main() { let bta = BlogdorTheAggregator::new().await; let (tx, rx) = unbounded_channel(); - bta.spawn_http(tx, bta.client()).await; + bta.spawn_http(tx).await; run_loop(&bta, rx).await; diff --git a/src/server.rs b/src/server.rs index c593676..356b9f0 100644 --- a/src/server.rs +++ b/src/server.rs @@ -116,8 +116,8 @@ async fn handle_manage_feed( match add_feed(&state.db, sender_id, command.feed, &state.client).await { Ok(_) => { let _ = state.announce_tx.send(NewFeed { - feed: command.feed.to_string(), - user: sender_full_name, + url: command.feed.to_string(), + owner: sender_full_name, }); resp.insert("content", "Blogdor Says: SUCCESS!".to_string()); } @@ -214,31 +214,6 @@ async fn add_feed( Ok(()) } -async fn add_user(db: &SqlitePool, user: u32) -> Result<(), String> { - if let Err(e) = sqlx::query!("insert into users (zulip_id) values (?)", user) - .execute(db) - .await - { - match e { - sqlx::Error::Database(database_error) => { - // the users table has only one constraint, which is a uniqueness one on - // zulip_id, so if it's violated, we don't care, it just means we already have - // that user; if it's not a constraint violation, then something - // else and bad has happened - if database_error.constraint().is_some() { - return Ok(()); - } - } - sqlx::Error::Io(error) => { - tracing::error!("got IO error: {error}"); - return Err("you should maybe retry that".to_string()); - } - _ => return Err("yikes".to_string()), - } - } - Ok(()) -} - async fn graceful_shutdown(cancel: CancellationToken) { use tokio::signal; let ctrl_c = async {