From e7f0a58c8ab88174cc4891dead2d3fd083f6594c Mon Sep 17 00:00:00 2001 From: joe Date: Fri, 26 Dec 2025 13:09:17 -0800 Subject: [PATCH] add stale feed check and nag --- migrations/0004_fetches.down.sql | 1 + migrations/0004_fetches.up.sql | 6 +++ src/lib.rs | 80 ++++++++++++++++++++++++++++++-- src/main.rs | 9 +++- 4 files changed, 91 insertions(+), 5 deletions(-) create mode 100644 migrations/0004_fetches.down.sql create mode 100644 migrations/0004_fetches.up.sql diff --git a/migrations/0004_fetches.down.sql b/migrations/0004_fetches.down.sql new file mode 100644 index 0000000..ef89582 --- /dev/null +++ b/migrations/0004_fetches.down.sql @@ -0,0 +1 @@ +DROP TABLE IF EXISTS fetches; diff --git a/migrations/0004_fetches.up.sql b/migrations/0004_fetches.up.sql new file mode 100644 index 0000000..ae72478 --- /dev/null +++ b/migrations/0004_fetches.up.sql @@ -0,0 +1,6 @@ +CREATE TABLE IF NOT EXISTS fetches ( + id INTEGER PRIMARY KEY, + feed INT NOT NULL, + fetched DATETIME NOT NULL DEFAULT current_timestamp, + FOREIGN KEY (feed) REFERENCES feeds(id) +); diff --git a/src/lib.rs b/src/lib.rs index 422333f..92694d6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -23,6 +23,8 @@ const ZULIP_MESSAGE_CUTOFF: usize = 700; const LAST_FETCHED: DateTime = DateTime::from_timestamp_nanos(0); +const STALE_FETCH_THRESHOLD: Duration = Duration::from_hours(24); + pub struct BlogdorTheAggregator { db: SqlitePool, client: reqwest::Client, @@ -64,12 +66,20 @@ pub struct NewFeed { struct ZulipMessage<'s> { to: u32, #[serde(rename = "type")] - typ: &'s str, + typ: MessageType, content: String, #[serde(skip_serializing_if = "Option::is_none")] topic: Option<&'s str>, } +#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, serde::Serialize)] +#[serde(rename_all = "lowercase")] +enum MessageType { + #[default] + Stream, + Direct, +} + impl BlogdorTheAggregator { pub async fn new() -> Self { let db = get_db_pool().await; @@ -144,11 +154,67 @@ impl BlogdorTheAggregator { Ok(feed_results) } + pub async fn check_stale(&self) { + let feeds = match sqlx::query!("select id, url, added_by, created_at from feeds") + .fetch_all(&self.db) + .await + { + Err(e) => { + tracing::error!("could not fetch feeds: {e}"); + return; + } + Ok(f) => f, + }; + + let now = Utc::now(); + + for feed in feeds.into_iter() { + let id = feed.id; + let url = &feed.url; + let user = feed.added_by; + let fetched = match sqlx::query!( + "select fetched from fetches where feed = ? order by id desc limit 1", + id + ) + .fetch_optional(&self.db) + .await + { + Err(e) => { + tracing::error!("could not get last fetched for {url} from db: {e}"); + continue; + } + Ok(f) => f, + }; + let dur = if let Some(fetched) = fetched { + now - fetched.fetched.and_utc() + } else { + now - feed.created_at.and_utc() + }; + + if dur.num_seconds() > STALE_FETCH_THRESHOLD.as_secs() as i64 { + let hours = dur.num_hours() % 24; + let days = dur.num_days(); + let content = format!( + "It has been {days} days and {hours} hours since Blogdor was able to check your feed at {url}. If you'd like to remove it, you can DM @blogdor's manager with 'remove {url}'." + ); + let msg = ZulipMessage { + to: user as u32, + typ: MessageType::Direct, + content, + topic: None, + }; + if let Err(e) = self.send_zulip_message(&msg).await { + tracing::error!("error sending zulip message to user {user}: {e}"); + } + } + } + } + pub async fn announce_feed(&self, announce: &NewFeed) { let content = format!("{} added a new feed: {}", announce.user, announce.feed); let msg = ZulipMessage { to: self.channel_id, - typ: "stream", + typ: MessageType::Stream, content, topic: Some("New feeds"), }; @@ -199,7 +265,7 @@ impl BlogdorTheAggregator { let msg = ZulipMessage { to: self.channel_id, - typ: "stream", + typ: MessageType::Stream, content, topic: Some(title), }; @@ -290,6 +356,14 @@ async fn check_feed( .map_err(|e| format!("could not get bytes from response from {url}, got {e}"))?; let mut feed = parse(feed.reader()).map_err(|e| format!("could not parse feed from {url}, got {e}"))?; + + if let Err(e) = sqlx::query!("insert into fetches (feed) values (?)", feed_id) + .execute(&db) + .await + { + tracing::error!("got error inserting {feed_id} into fetches: {e}"); + } + feed.entries.sort_by_key(|e| std::cmp::Reverse(e.posted())); for post in feed.entries.into_iter().take(5) { if post.posted().unwrap_or(LAST_FETCHED) > last_fetched { diff --git a/src/main.rs b/src/main.rs index 3bfe76e..fa3660a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -32,7 +32,9 @@ fn init_logs() { } async fn run_loop(bta: &BlogdorTheAggregator, mut announce_rx: UnboundedReceiver) { - let mut alarm = tokio::time::interval(BLOGDOR_SNOOZE); + let mut check_feeds = tokio::time::interval(BLOGDOR_SNOOZE); + let mut check_stale = tokio::time::interval(Duration::from_hours(24)); + loop { tokio::select! { biased; @@ -41,7 +43,7 @@ async fn run_loop(bta: &BlogdorTheAggregator, mut announce_rx: UnboundedReceiver bta.announce_feed(&announce).await; } } - _ = alarm.tick() => { + _ = check_feeds.tick() => { match bta.check_feeds().await { Ok(results) => { for result in results { @@ -71,6 +73,9 @@ async fn run_loop(bta: &BlogdorTheAggregator, mut announce_rx: UnboundedReceiver } } } + _ = check_stale.tick() => { + bta.check_stale().await; + } _ = bta.cancelled() => { tracing::info!("shutting down the aggregation loop"); break;