From 8cbb2ef0c5efefeb41c6c68711a4ccdff5638a25 Mon Sep 17 00:00:00 2001 From: Joe Date: Fri, 2 Jan 2026 00:20:41 -0800 Subject: [PATCH] make betterer --- migrations/0003_runs.up.sql | 6 +- src/lib.rs | 187 ++++++++++++++++-------------------- src/main.rs | 11 +-- 3 files changed, 87 insertions(+), 117 deletions(-) diff --git a/migrations/0003_runs.up.sql b/migrations/0003_runs.up.sql index 627d3cb..1a48cec 100644 --- a/migrations/0003_runs.up.sql +++ b/migrations/0003_runs.up.sql @@ -1,8 +1,8 @@ CREATE TABLE IF NOT EXISTS runs ( id INTEGER NOT NULL PRIMARY KEY, - run DATETIME NOT NULL DEFAULT current_timestamp, feed INTEGER NOT NULL, - fetched DATETIME, - posted DATETIME, + run DATETIME NOT NULL DEFAULT current_timestamp, + fetched DATETIME NOT NULL DEFAULT current_timestamp, + posted DATETIME, -- once this becomes non-null, the program will ensure it will never be null again FOREIGN KEY (feed) REFERENCES feeds(id) ); diff --git a/src/lib.rs b/src/lib.rs index 997f272..69909b9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -56,10 +56,11 @@ pub struct FeedEntry { } #[derive(Debug, Clone, PartialEq, Eq)] -pub struct FeedResult { - pub entries: Option>, +pub struct FeedRunResult { + pub entries: Vec, pub url: String, pub feed_id: i64, + pub fetched: DateTime, } #[derive(Debug, Clone, PartialEq, Eq)] @@ -120,7 +121,7 @@ pub struct ActiveFeed { pub struct FeedRun { feed: i64, run: DateTime, - fetched: Option>, + fetched: DateTime, posted: Option>, } @@ -171,7 +172,7 @@ impl BlogdorTheAggregator { server::spawn_server(state, self.cancel.clone()).await; } - pub async fn check_feeds(&self) -> Result, String> { + pub async fn check_feeds(&self) -> Result, String> { tracing::debug!("checking feeds"); let feeds = self .active_feeds() @@ -221,7 +222,7 @@ impl BlogdorTheAggregator { let url = &feed.url; let user = feed.owner; let run = feed.last_run; - let last = run.fetched.unwrap_or(feed.created); + let last = run.fetched; let dur = now - last; if dur.num_seconds() > STALE_FETCH_THRESHOLD.as_secs() as i64 { @@ -237,7 +238,7 @@ impl BlogdorTheAggregator { topic: None, }; if let Err(e) = self.send_zulip_message(&msg).await { - tracing::error!("error sending zulip message to user {user}: {e}"); + tracing::error!("error sending zulip message to user {user} about {url}: {e}"); } else { tracing::debug!("sent DM to {user} about {url} being fucked"); } @@ -247,7 +248,7 @@ impl BlogdorTheAggregator { pub async fn process_user_request(&self, user_request: &UserRequest) { match user_request.command.action { - Action::Add => self.add_feed(user_request).await, + Action::Add => user_request.result_sender.send(self.add_feed(user_request).await).unwrap_or_default(), Action::Help => user_request.result_sender.send(Ok("DM or `@blogdor's manager` with `add `, `remove `, or `help` to get this message (duh).".to_string())).unwrap_or_default(), Action::Remove => self.remove_feed(user_request).await, } @@ -259,126 +260,93 @@ impl BlogdorTheAggregator { .send(Err("currently unsupported".to_string())); } - async fn add_feed(&self, user_request: &UserRequest) { - if let Err(e) = self.add_user(user_request.owner).await { - let _ = user_request.result_sender.send(Err(e)); - return; - } + async fn add_feed(&self, user_request: &UserRequest) -> Result { + self.add_user(user_request.owner).await?; let url = &user_request.command.feed; let owner = user_request.owner; - let msg; + crate::fetch_and_parse_feed(url, &self.client).await?; - if let Err(e) = crate::fetch_and_parse_feed(url, &self.client).await { - let _ = user_request.result_sender.send(Err(e)); - return; - } - - if let Some(id) = match sqlx::query!( + let resp_text; + if let Some(id) = sqlx::query!( "select id from feeds where owner = ? and url = ?", owner, url ) .fetch_optional(&self.db) .await + .map_err(|e| { + tracing::error!("couldn't fetch an optional row from the feeds table: {e}"); + "whoa, something weird and bad happened".to_string() + })? + .map(|r| r.id) { - Ok(r) => r.map(|r| r.id), - Err(e) => { - tracing::error!("couldn't fetch an optional row from the feeds table: {e}"); - let _ = user_request - .result_sender - .send(Err("whoa, something weird and bad happened".to_string())); - return; - } - } { - if let Err(e) = sqlx::query!("insert into status (feed, active) values (?, true)", id) + sqlx::query!("insert into status (feed, active) values (?, true)", id) .execute(&self.db) .await - { - tracing::error!("got error inserting into status: {e}"); - let _ = user_request.result_sender.send(Err(format!( - "could not activate previously added feed at {} for Zulip user {}", - url, owner - ))); - return; - } - msg = format!("marked previously added feed at {url} as active"); + .map_err(|e| { + tracing::error!("got error inserting into status: {e}"); + format!( + "could not activate previously added feed at {url} for Zulip user {owner}", + ) + })?; + resp_text = format!("marked previously added feed at {url} as active"); } else { - let txn = match self.db.begin().await { - Ok(txn) => txn, - Err(e) => { - tracing::error!("got error begining a transaction: {e}"); - let _ = user_request - .result_sender - .send(Err("could not add feed".to_string())); - return; - } - }; - // get the ID for the feed - let id = match sqlx::query!( + let txn = self.db.begin().await.map_err(|e| { + tracing::error!("got error begining a transaction: {e}"); + "could not add feed".to_string() + })?; + + let id = sqlx::query!( "insert into feeds (url, owner) values (?, ?) returning id", url, owner ) .fetch_one(&self.db) .await - { - Err(e) => { - tracing::error!("error inserting into feeds: {e}"); - let _ = user_request - .result_sender - .send(Err("could not add feed".to_string())); - return; - } - Ok(id) => id.id, - }; + .map_err(|e| { + tracing::error!("error inserting into feeds: {e}"); + "could not add feed".to_string() + }) + .map(|i| i.id)?; - // add a row in status for the new feed - if let Err(e) = sqlx::query!("insert into status (feed, active) values (?, true)", id) + sqlx::query!("insert into status (feed, active) values (?, true)", id) .execute(&self.db) .await - { - tracing::error!("error inserting into status: {e}"); - let _ = user_request - .result_sender - .send(Err("could not add feed".to_string())); - return; - } + .map_err(|e| { + tracing::error!("error inserting into status: {e}"); + "could not add feed".to_string() + })?; // need one row in the runs table to allow the inner join in the active feeds // query to work - if let Err(e) = sqlx::query!("insert into runs (feed) values (?)", id) + // + // TODO: fix the active feed query so we don't need to do this. + sqlx::query!("insert into runs (feed) values (?)", id) .execute(&self.db) .await - { - tracing::error!("error inserting into runs: {e}"); - let _ = user_request - .result_sender - .send(Err("could not add feed".to_string())); - return; - } + .map_err(|e| { + tracing::error!("error inserting into runs: {e}"); + "could not add feed".to_string() + })?; // woo! - if let Err(e) = txn.commit().await { + txn.commit().await.map_err(|e| { tracing::error!("error committing add-feed transaction: {e}"); - let _ = user_request - .result_sender - .send(Err("could not add feed".to_string())); - return; - } - msg = format!("added new feed at {url}"); + "could not add feed".to_string() + })?; + resp_text = format!("added new feed at {url}"); } - let _ = user_request.result_sender.send(Ok(msg)); let content = format!("@**|{}**: added a new feed: {}", owner, url); - let msg = ZulipMessage { + let zmsg = ZulipMessage { to: self.channel_id, typ: MessageType::Stream, content, topic: Some("New feeds"), }; - match self.send_zulip_message(&msg).await { + match self.send_zulip_message(&zmsg).await { Err(e) => { tracing::error!("got error sending to zulip: {e}"); } @@ -388,25 +356,35 @@ impl BlogdorTheAggregator { } } } + Ok(resp_text) } - // will also update the successful_runs table if it posts to zulip - pub async fn post_entries(&self, posts: &[FeedEntry]) { - let FeedEntry { + // will also update the runs table with fetched and posted + pub async fn post_entries(&self, feed_run: &FeedRunResult) { + let FeedRunResult { feed_id, - received, - owner, + entries, + url, .. - } = posts.last().unwrap(); + } = feed_run; + let mut success = true; - for post in posts.iter() { + if entries.is_empty() { + success = false; + tracing::debug!("no new posts from {url}"); + } else { + tracing::debug!("got {} new posts from {url}", entries.len()); + } + + for post in entries.iter() { + let owner = post.owner; let body = post.body.as_deref().unwrap_or(""); - let tail = if body.len() < ZULIP_MESSAGE_CUTOFF { + let tail = if body.len() <= ZULIP_MESSAGE_CUTOFF { "" } else { - "..." + "[...]" }; let url = post.post_url.as_str(); @@ -442,14 +420,14 @@ impl BlogdorTheAggregator { } tokio::time::sleep(ZULIP_INTERVAL).await; } - let now = Utc::now(); - let posted = if success { Some(now) } else { None }; - self.record_run(*feed_id, *received, posted).await; + + let posted = if success { Some(Utc::now()) } else { None }; + self.record_run(*feed_id, feed_run.fetched, posted).await; } async fn record_run(&self, feed: i64, fetched: DateTime, posted: Option>) { let Ok(db_posted) = sqlx::query!( - "select posted from runs where feed = ? order by id desc limit 1", + "select max(posted) posted from runs where feed = ? limit 1", feed ) .fetch_optional(&self.db) @@ -540,12 +518,12 @@ async fn check_feed( url: String, last_fetched: DateTime, owner: i64, -) -> Result { +) -> Result { tracing::debug!("checking {url}"); let now = Utc::now(); let mut feed = fetch_and_parse_feed(&url, &client).await?; - let mut entries = None; + let mut entries = Vec::new(); feed.entries.sort_by_key(|e| std::cmp::Reverse(e.posted())); for post in feed.entries.into_iter().take(5) { if post.posted().unwrap_or(LAST_FETCHED) > last_fetched { @@ -577,14 +555,15 @@ async fn check_feed( }) }), }; - entries.get_or_insert(Vec::new()).push(entry); + entries.push(entry); } } - Ok(FeedResult { + Ok(FeedRunResult { entries, url, feed_id, + fetched: now, }) } diff --git a/src/main.rs b/src/main.rs index dabdd0c..e3df797 100644 --- a/src/main.rs +++ b/src/main.rs @@ -47,16 +47,7 @@ async fn run_loop(bta: &BlogdorTheAggregator, mut user_req_rx: UnboundedReceiver match bta.check_feeds().await { Ok(results) => { for result in results { - if let Some(ref posts) = result.entries { - tracing::debug!( - "got {} new posts from {}", - posts.len(), - result.url - ); - bta.post_entries(posts).await; - } else { - tracing::debug!("no new posts from {}", result.url); - } + bta.post_entries(&result).await; } }, // outer check_feeds error