From 18c8749f917339b8c9e4442a1abc87f6e253a041 Mon Sep 17 00:00:00 2001 From: Joe Date: Thu, 1 Jan 2026 19:02:57 -0800 Subject: [PATCH] everything seems to work, need to add tests --- sample-data/invalid-url.json | 10 +++ src/lib.rs | 118 ++++++++++++++++++++++++++++--- src/server.rs | 133 +++++++++++++---------------------- 3 files changed, 168 insertions(+), 93 deletions(-) create mode 100644 sample-data/invalid-url.json diff --git a/sample-data/invalid-url.json b/sample-data/invalid-url.json new file mode 100644 index 0000000..dea3fec --- /dev/null +++ b/sample-data/invalid-url.json @@ -0,0 +1,10 @@ +{ + "bot_email": "blogdor-outgoing-bot@zulip-host", + "token": "another_token", + "message": { + "sender_email": "sender-email", + "sender_id": 1, + "sender_full_name": "magoo", + "content": "@**blogdor's manager** add invalid" + } +} diff --git a/src/lib.rs b/src/lib.rs index 6cc0d38..b8388d9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -23,7 +23,6 @@ const LAST_FETCHED: DateTime = DateTime::from_timestamp_nanos(0); const STALE_FETCH_THRESHOLD: Duration = Duration::from_hours(24); -const ADD_FEED_QUERY: &str = ""; const ACTIVE_FEEDS_QUERY: &str = r#"SELECT id, url, owner, created, fetched, posted, run, feed FROM feeds INNER JOIN (SELECT feed, MAX(id) _, run, fetched, posted FROM runs WHERE feed IN @@ -66,7 +65,7 @@ pub struct FeedResult { #[derive(Debug, Clone)] pub struct NewFeed { url: String, - owner: String, + owner: u32, result_sender: UnboundedSender>, } @@ -100,7 +99,6 @@ pub struct ActiveFeed { id: i64, owner: i64, created: DateTime, - updated: DateTime, #[sqlx(flatten)] last_run: FeedRun, } @@ -157,7 +155,6 @@ impl BlogdorTheAggregator { &self.zulip_to_blogdor_email, &self.blogdor_token, announce_tx, - self.client.clone(), ); server::spawn_server(state, self.cancel.clone()).await; } @@ -173,9 +170,8 @@ impl BlogdorTheAggregator { for feed in feeds { let id = feed.id; let url = feed.url; - let created_at = feed.created; let last_run = feed.last_run; - let last = last_run.posted.unwrap_or(created_at); + let last = last_run.posted.unwrap_or(LAST_FETCHED); handles.spawn(check_feed(self.client.clone(), id, url, last, feed.owner)); } @@ -237,10 +233,113 @@ impl BlogdorTheAggregator { } } - pub async fn add_feed(&self, announce: &NewFeed) { + pub async fn add_feed(&self, add_request: &NewFeed) { + if let Err(e) = self.add_user(add_request.owner).await { + let _ = add_request.result_sender.send(Err(e)); + return; + } + + if let Err(e) = crate::fetch_and_parse_feed(&add_request.url, &self.client).await { + let _ = add_request.result_sender.send(Err(e)); + return; + } + + if let Some(id) = match sqlx::query!( + "select id from feeds where owner = ? and url = ?", + add_request.url, + add_request.owner + ) + .fetch_optional(&self.db) + .await + { + Ok(r) => r.map(|r| r.id), + Err(e) => { + tracing::error!("couldn't fetch an optional row from the feeds table: {e}"); + let _ = add_request + .result_sender + .send(Err("whoa, something weird and bad happened".to_string())); + return; + } + } { + if let Err(e) = sqlx::query!("insert into status (feed, active) values (?, true)", id) + .execute(&self.db) + .await + { + tracing::error!("got error inserting into status: {e}"); + let _ = add_request.result_sender.send(Err(format!( + "could not activate previously added feed at {} for Zulip user {}", + &add_request.url, add_request.owner + ))); + } + } else { + let txn = match self.db.begin().await { + Ok(txn) => txn, + Err(e) => { + tracing::error!("got error begining a transaction: {e}"); + let _ = add_request + .result_sender + .send(Err("could not add feed".to_string())); + return; + } + }; + // get the ID for the feed + let id = match sqlx::query!( + "insert into feeds (url, owner) values (?, ?) returning id", + add_request.url, + add_request.owner + ) + .fetch_one(&self.db) + .await + { + Err(e) => { + tracing::error!("error inserting into feeds: {e}"); + let _ = add_request + .result_sender + .send(Err("could not add feed".to_string())); + return; + } + Ok(id) => id.id, + }; + + // add a row in status for the new feed + if let Err(e) = sqlx::query!("insert into status (feed, active) values (?, true)", id) + .execute(&self.db) + .await + { + tracing::error!("error inserting into status: {e}"); + let _ = add_request + .result_sender + .send(Err("could not add feed".to_string())); + return; + } + + // need one row in the runs table to allow the inner join in the active feeds + // query to work + if let Err(e) = sqlx::query!("insert into runs (feed) values (?)", id) + .execute(&self.db) + .await + { + tracing::error!("error inserting into runs: {e}"); + let _ = add_request + .result_sender + .send(Err("could not add feed".to_string())); + return; + } + + // woo! + if let Err(e) = txn.commit().await { + tracing::error!("error committing add-feed transaction: {e}"); + let _ = add_request + .result_sender + .send(Err("could not add feed".to_string())); + return; + } + } + let _ = add_request.result_sender.send(Ok(())); + let content = format!( "@**|{}**: added a new feed: {}", - announce.owner, announce.url + add_request.owner, add_request.url ); let msg = ZulipMessage { to: self.channel_id, @@ -331,7 +430,8 @@ impl BlogdorTheAggregator { let db_posted = db_posted.and_then(|p| p.posted.map(|p| p.and_utc())); let posted = posted.or(db_posted); if let Err(e) = sqlx::query!( - "insert into runs (fetched, posted) values (?, ?)", + "insert into runs (feed, fetched, posted) values (?, ?, ?)", + feed, fetched, posted ) diff --git a/src/server.rs b/src/server.rs index 356b9f0..d1f93c8 100644 --- a/src/server.rs +++ b/src/server.rs @@ -10,7 +10,7 @@ use axum::{ use serde::Deserialize; use serde_json::{Map, Value}; use sqlx::SqlitePool; -use tokio::sync::mpsc::UnboundedSender; +use tokio::sync::mpsc::{UnboundedSender, unbounded_channel}; use tokio_util::sync::CancellationToken; use winnow::{ Parser, @@ -27,7 +27,6 @@ type Payload = Map; #[derive(Debug, Clone)] pub struct ServerState { db: SqlitePool, - client: reqwest::Client, email: String, token: String, announce_tx: UnboundedSender, @@ -39,11 +38,9 @@ impl ServerState { email: &str, token: &str, announce_tx: UnboundedSender, - client: reqwest::Client, ) -> Self { Self { db, - client, announce_tx, email: email.to_string(), token: token.to_string(), @@ -86,10 +83,7 @@ async fn handle_manage_feed( if state.email == bot_email && state.token == token { let ZulipMessage { - content, - sender_id, - sender_full_name, - .. + content, sender_id, .. } = message; let mut resp: HashMap<&str, String> = HashMap::new(); @@ -113,22 +107,36 @@ async fn handle_manage_feed( match command.action { Action::Add => { - match add_feed(&state.db, sender_id, command.feed, &state.client).await { - Ok(_) => { - let _ = state.announce_tx.send(NewFeed { - url: command.feed.to_string(), - owner: sender_full_name, - }); + let (tx, mut rx) = unbounded_channel(); + match state.announce_tx.send(NewFeed { + url: command.feed.to_string(), + owner: sender_id, + result_sender: tx, + }) { + Ok(_) => {} + Err(e) => { + tracing::error!("could not send add feed message to runloop: {e}"); + resp.insert("content", "oh no, something terrible happened!".to_string()); + } + } + + let s = rx.recv().await; + dbg!(&s); + match s { + Some(Ok(_)) => { + dbg!("success"); resp.insert("content", "Blogdor Says: SUCCESS!".to_string()); } - Err(e) => { - resp.insert("content", format!("Blogdor Says: OH NO! {e}")); + Some(Err(e)) => { + dbg!(&e); + resp.insert("content", format!("Blogdor Says: ERRORED! {e}")); } + None => return StatusCode::INTERNAL_SERVER_ERROR.into_response(), } } Action::Remove => match remove_feed(&state.db, sender_id, command.feed).await { Ok(_) => { - resp.insert("content", "Blogdor Says: BURNINATED!".to_string()); + resp.insert("content", "sorry, that's currently unsupported".to_string()); } Err(e) => { resp.insert("content", e); @@ -139,78 +147,36 @@ async fn handle_manage_feed( } } + dbg!(&resp); + Json(resp).into_response() } else { StatusCode::IM_A_TEAPOT.into_response() } } -async fn remove_feed(db: &SqlitePool, user: u32, feed: &str) -> Result<(), String> { - sqlx::query!( - "update feeds set active = false, updated_at = current_timestamp where url = ? and added_by = ?", - feed, - user - ) - .execute(db) - .await - .map_err(|e| { - tracing::error!("could not set {feed} inactive by {user}, got {e}"); - "sorry buddy, Blogdor couldn't do that".to_string() - })?; - sqlx::query!( - "select * from feeds where added_by = ? and active = false", - user - ) - .fetch_one(db) - .await - .map_err(|e| { - tracing::error!("could not set {feed} inactive by {user}, got {e}"); - "sorry buddy, Blogdor couldn't do that".to_string() - })?; - Ok(()) -} - -async fn add_feed( - db: &SqlitePool, - user: u32, - feed: &str, - client: &reqwest::Client, -) -> Result<(), String> { - add_user(db, user).await?; - - let _ = crate::fetch_and_parse_feed(feed, client).await?; - - sqlx::query!( - "update feeds set active = true, updated_at = current_timestamp where url = ? and added_by = ?", - feed, - user - ) - .execute(db) - .await - .map_err(|e| format!("Got error adding feed: {e}"))?; - - if sqlx::query!( - "select * from feeds where added_by = ? and url = ? and active = true", - user, - feed - ) - .fetch_optional(db) - .await - .map_err(|e| format!("{e}"))? - .is_some() - { - return Ok(()); - } - - sqlx::query!( - "insert into feeds (url, added_by, active) values (?, ?, true)", - feed, - user - ) - .execute(db) - .await - .map_err(|e| format!("{e}"))?; - +async fn remove_feed(_db: &SqlitePool, _user: u32, _feed: &str) -> Result<(), String> { + // sqlx::query!( + // "update feeds set active = false, updated_at = current_timestamp where + // url = ? and added_by = ?", feed, + // user + // ) + // .execute(db) + // .await + // .map_err(|e| { + // tracing::error!("could not set {feed} inactive by {user}, got {e}"); + // "sorry buddy, Blogdor couldn't do that".to_string() + // })?; + // sqlx::query!( + // "select * from feeds where added_by = ? and active = false", + // user + // ) + // .fetch_one(db) + // .await + // .map_err(|e| { + // tracing::error!("could not set {feed} inactive by {user}, got {e}"); + // "sorry buddy, Blogdor couldn't do that".to_string() + // })?; Ok(()) } @@ -253,7 +219,6 @@ struct ManageFeedMessage { struct ZulipMessage { content: String, sender_id: u32, - sender_full_name: String, #[serde(flatten)] _rest: Payload, }