make betterer

This commit is contained in:
Joe 2026-01-02 00:20:41 -08:00
parent 45a56b9fe9
commit 8cbb2ef0c5
3 changed files with 87 additions and 117 deletions

View file

@ -1,8 +1,8 @@
CREATE TABLE IF NOT EXISTS runs ( CREATE TABLE IF NOT EXISTS runs (
id INTEGER NOT NULL PRIMARY KEY, id INTEGER NOT NULL PRIMARY KEY,
run DATETIME NOT NULL DEFAULT current_timestamp,
feed INTEGER NOT NULL, feed INTEGER NOT NULL,
fetched DATETIME, run DATETIME NOT NULL DEFAULT current_timestamp,
posted DATETIME, fetched DATETIME NOT NULL DEFAULT current_timestamp,
posted DATETIME, -- once this becomes non-null, the program will ensure it will never be null again
FOREIGN KEY (feed) REFERENCES feeds(id) FOREIGN KEY (feed) REFERENCES feeds(id)
); );

View file

@ -56,10 +56,11 @@ pub struct FeedEntry {
} }
#[derive(Debug, Clone, PartialEq, Eq)] #[derive(Debug, Clone, PartialEq, Eq)]
pub struct FeedResult { pub struct FeedRunResult {
pub entries: Option<Vec<FeedEntry>>, pub entries: Vec<FeedEntry>,
pub url: String, pub url: String,
pub feed_id: i64, pub feed_id: i64,
pub fetched: DateTime<Utc>,
} }
#[derive(Debug, Clone, PartialEq, Eq)] #[derive(Debug, Clone, PartialEq, Eq)]
@ -120,7 +121,7 @@ pub struct ActiveFeed {
pub struct FeedRun { pub struct FeedRun {
feed: i64, feed: i64,
run: DateTime<Utc>, run: DateTime<Utc>,
fetched: Option<DateTime<Utc>>, fetched: DateTime<Utc>,
posted: Option<DateTime<Utc>>, posted: Option<DateTime<Utc>>,
} }
@ -171,7 +172,7 @@ impl BlogdorTheAggregator {
server::spawn_server(state, self.cancel.clone()).await; server::spawn_server(state, self.cancel.clone()).await;
} }
pub async fn check_feeds(&self) -> Result<Vec<FeedResult>, String> { pub async fn check_feeds(&self) -> Result<Vec<FeedRunResult>, String> {
tracing::debug!("checking feeds"); tracing::debug!("checking feeds");
let feeds = self let feeds = self
.active_feeds() .active_feeds()
@ -221,7 +222,7 @@ impl BlogdorTheAggregator {
let url = &feed.url; let url = &feed.url;
let user = feed.owner; let user = feed.owner;
let run = feed.last_run; let run = feed.last_run;
let last = run.fetched.unwrap_or(feed.created); let last = run.fetched;
let dur = now - last; let dur = now - last;
if dur.num_seconds() > STALE_FETCH_THRESHOLD.as_secs() as i64 { if dur.num_seconds() > STALE_FETCH_THRESHOLD.as_secs() as i64 {
@ -237,7 +238,7 @@ impl BlogdorTheAggregator {
topic: None, topic: None,
}; };
if let Err(e) = self.send_zulip_message(&msg).await { if let Err(e) = self.send_zulip_message(&msg).await {
tracing::error!("error sending zulip message to user {user}: {e}"); tracing::error!("error sending zulip message to user {user} about {url}: {e}");
} else { } else {
tracing::debug!("sent DM to {user} about {url} being fucked"); tracing::debug!("sent DM to {user} about {url} being fucked");
} }
@ -247,7 +248,7 @@ impl BlogdorTheAggregator {
pub async fn process_user_request(&self, user_request: &UserRequest) { pub async fn process_user_request(&self, user_request: &UserRequest) {
match user_request.command.action { match user_request.command.action {
Action::Add => self.add_feed(user_request).await, Action::Add => user_request.result_sender.send(self.add_feed(user_request).await).unwrap_or_default(),
Action::Help => user_request.result_sender.send(Ok("DM or `@blogdor's manager` with `add <feed url, RSS or Atom XML files>`, `remove <feed url originally added by you>`, or `help` to get this message (duh).".to_string())).unwrap_or_default(), Action::Help => user_request.result_sender.send(Ok("DM or `@blogdor's manager` with `add <feed url, RSS or Atom XML files>`, `remove <feed url originally added by you>`, or `help` to get this message (duh).".to_string())).unwrap_or_default(),
Action::Remove => self.remove_feed(user_request).await, Action::Remove => self.remove_feed(user_request).await,
} }
@ -259,126 +260,93 @@ impl BlogdorTheAggregator {
.send(Err("currently unsupported".to_string())); .send(Err("currently unsupported".to_string()));
} }
async fn add_feed(&self, user_request: &UserRequest) { async fn add_feed(&self, user_request: &UserRequest) -> Result<String, String> {
if let Err(e) = self.add_user(user_request.owner).await { self.add_user(user_request.owner).await?;
let _ = user_request.result_sender.send(Err(e));
return;
}
let url = &user_request.command.feed; let url = &user_request.command.feed;
let owner = user_request.owner; let owner = user_request.owner;
let msg; crate::fetch_and_parse_feed(url, &self.client).await?;
if let Err(e) = crate::fetch_and_parse_feed(url, &self.client).await { let resp_text;
let _ = user_request.result_sender.send(Err(e)); if let Some(id) = sqlx::query!(
return;
}
if let Some(id) = match sqlx::query!(
"select id from feeds where owner = ? and url = ?", "select id from feeds where owner = ? and url = ?",
owner, owner,
url url
) )
.fetch_optional(&self.db) .fetch_optional(&self.db)
.await .await
.map_err(|e| {
tracing::error!("couldn't fetch an optional row from the feeds table: {e}");
"whoa, something weird and bad happened".to_string()
})?
.map(|r| r.id)
{ {
Ok(r) => r.map(|r| r.id), sqlx::query!("insert into status (feed, active) values (?, true)", id)
Err(e) => {
tracing::error!("couldn't fetch an optional row from the feeds table: {e}");
let _ = user_request
.result_sender
.send(Err("whoa, something weird and bad happened".to_string()));
return;
}
} {
if let Err(e) = sqlx::query!("insert into status (feed, active) values (?, true)", id)
.execute(&self.db) .execute(&self.db)
.await .await
{ .map_err(|e| {
tracing::error!("got error inserting into status: {e}"); tracing::error!("got error inserting into status: {e}");
let _ = user_request.result_sender.send(Err(format!( format!(
"could not activate previously added feed at {} for Zulip user {}", "could not activate previously added feed at {url} for Zulip user {owner}",
url, owner )
))); })?;
return; resp_text = format!("marked previously added feed at {url} as active");
}
msg = format!("marked previously added feed at {url} as active");
} else { } else {
let txn = match self.db.begin().await { let txn = self.db.begin().await.map_err(|e| {
Ok(txn) => txn, tracing::error!("got error begining a transaction: {e}");
Err(e) => { "could not add feed".to_string()
tracing::error!("got error begining a transaction: {e}"); })?;
let _ = user_request
.result_sender let id = sqlx::query!(
.send(Err("could not add feed".to_string()));
return;
}
};
// get the ID for the feed
let id = match sqlx::query!(
"insert into feeds (url, owner) values (?, ?) returning id", "insert into feeds (url, owner) values (?, ?) returning id",
url, url,
owner owner
) )
.fetch_one(&self.db) .fetch_one(&self.db)
.await .await
{ .map_err(|e| {
Err(e) => { tracing::error!("error inserting into feeds: {e}");
tracing::error!("error inserting into feeds: {e}"); "could not add feed".to_string()
let _ = user_request })
.result_sender .map(|i| i.id)?;
.send(Err("could not add feed".to_string()));
return;
}
Ok(id) => id.id,
};
// add a row in status for the new feed sqlx::query!("insert into status (feed, active) values (?, true)", id)
if let Err(e) = sqlx::query!("insert into status (feed, active) values (?, true)", id)
.execute(&self.db) .execute(&self.db)
.await .await
{ .map_err(|e| {
tracing::error!("error inserting into status: {e}"); tracing::error!("error inserting into status: {e}");
let _ = user_request "could not add feed".to_string()
.result_sender })?;
.send(Err("could not add feed".to_string()));
return;
}
// need one row in the runs table to allow the inner join in the active feeds // need one row in the runs table to allow the inner join in the active feeds
// query to work // query to work
if let Err(e) = sqlx::query!("insert into runs (feed) values (?)", id) //
// TODO: fix the active feed query so we don't need to do this.
sqlx::query!("insert into runs (feed) values (?)", id)
.execute(&self.db) .execute(&self.db)
.await .await
{ .map_err(|e| {
tracing::error!("error inserting into runs: {e}"); tracing::error!("error inserting into runs: {e}");
let _ = user_request "could not add feed".to_string()
.result_sender })?;
.send(Err("could not add feed".to_string()));
return;
}
// woo! // woo!
if let Err(e) = txn.commit().await { txn.commit().await.map_err(|e| {
tracing::error!("error committing add-feed transaction: {e}"); tracing::error!("error committing add-feed transaction: {e}");
let _ = user_request "could not add feed".to_string()
.result_sender })?;
.send(Err("could not add feed".to_string())); resp_text = format!("added new feed at {url}");
return;
}
msg = format!("added new feed at {url}");
} }
let _ = user_request.result_sender.send(Ok(msg));
let content = format!("@**|{}**: added a new feed: {}", owner, url); let content = format!("@**|{}**: added a new feed: {}", owner, url);
let msg = ZulipMessage { let zmsg = ZulipMessage {
to: self.channel_id, to: self.channel_id,
typ: MessageType::Stream, typ: MessageType::Stream,
content, content,
topic: Some("New feeds"), topic: Some("New feeds"),
}; };
match self.send_zulip_message(&msg).await { match self.send_zulip_message(&zmsg).await {
Err(e) => { Err(e) => {
tracing::error!("got error sending to zulip: {e}"); tracing::error!("got error sending to zulip: {e}");
} }
@ -388,25 +356,35 @@ impl BlogdorTheAggregator {
} }
} }
} }
Ok(resp_text)
} }
// will also update the successful_runs table if it posts to zulip // will also update the runs table with fetched and posted
pub async fn post_entries(&self, posts: &[FeedEntry]) { pub async fn post_entries(&self, feed_run: &FeedRunResult) {
let FeedEntry { let FeedRunResult {
feed_id, feed_id,
received, entries,
owner, url,
.. ..
} = posts.last().unwrap(); } = feed_run;
let mut success = true; let mut success = true;
for post in posts.iter() { if entries.is_empty() {
success = false;
tracing::debug!("no new posts from {url}");
} else {
tracing::debug!("got {} new posts from {url}", entries.len());
}
for post in entries.iter() {
let owner = post.owner;
let body = post.body.as_deref().unwrap_or(""); let body = post.body.as_deref().unwrap_or("");
let tail = if body.len() < ZULIP_MESSAGE_CUTOFF { let tail = if body.len() <= ZULIP_MESSAGE_CUTOFF {
"" ""
} else { } else {
"..." "[...]"
}; };
let url = post.post_url.as_str(); let url = post.post_url.as_str();
@ -442,14 +420,14 @@ impl BlogdorTheAggregator {
} }
tokio::time::sleep(ZULIP_INTERVAL).await; tokio::time::sleep(ZULIP_INTERVAL).await;
} }
let now = Utc::now();
let posted = if success { Some(now) } else { None }; let posted = if success { Some(Utc::now()) } else { None };
self.record_run(*feed_id, *received, posted).await; self.record_run(*feed_id, feed_run.fetched, posted).await;
} }
async fn record_run(&self, feed: i64, fetched: DateTime<Utc>, posted: Option<DateTime<Utc>>) { async fn record_run(&self, feed: i64, fetched: DateTime<Utc>, posted: Option<DateTime<Utc>>) {
let Ok(db_posted) = sqlx::query!( let Ok(db_posted) = sqlx::query!(
"select posted from runs where feed = ? order by id desc limit 1", "select max(posted) posted from runs where feed = ? limit 1",
feed feed
) )
.fetch_optional(&self.db) .fetch_optional(&self.db)
@ -540,12 +518,12 @@ async fn check_feed(
url: String, url: String,
last_fetched: DateTime<Utc>, last_fetched: DateTime<Utc>,
owner: i64, owner: i64,
) -> Result<FeedResult, String> { ) -> Result<FeedRunResult, String> {
tracing::debug!("checking {url}"); tracing::debug!("checking {url}");
let now = Utc::now(); let now = Utc::now();
let mut feed = fetch_and_parse_feed(&url, &client).await?; let mut feed = fetch_and_parse_feed(&url, &client).await?;
let mut entries = None; let mut entries = Vec::new();
feed.entries.sort_by_key(|e| std::cmp::Reverse(e.posted())); feed.entries.sort_by_key(|e| std::cmp::Reverse(e.posted()));
for post in feed.entries.into_iter().take(5) { for post in feed.entries.into_iter().take(5) {
if post.posted().unwrap_or(LAST_FETCHED) > last_fetched { if post.posted().unwrap_or(LAST_FETCHED) > last_fetched {
@ -577,14 +555,15 @@ async fn check_feed(
}) })
}), }),
}; };
entries.get_or_insert(Vec::new()).push(entry); entries.push(entry);
} }
} }
Ok(FeedResult { Ok(FeedRunResult {
entries, entries,
url, url,
feed_id, feed_id,
fetched: now,
}) })
} }

View file

@ -47,16 +47,7 @@ async fn run_loop(bta: &BlogdorTheAggregator, mut user_req_rx: UnboundedReceiver
match bta.check_feeds().await { match bta.check_feeds().await {
Ok(results) => { Ok(results) => {
for result in results { for result in results {
if let Some(ref posts) = result.entries { bta.post_entries(&result).await;
tracing::debug!(
"got {} new posts from {}",
posts.len(),
result.url
);
bta.post_entries(posts).await;
} else {
tracing::debug!("no new posts from {}", result.url);
}
} }
}, },
// outer check_feeds error // outer check_feeds error