From 45a56b9fe97c3b915238746df72cef9f912d16f4 Mon Sep 17 00:00:00 2001 From: Joe Date: Thu, 1 Jan 2026 23:12:14 -0800 Subject: [PATCH] support more than adds --- src/lib.rs | 97 +++++++++++++++++++++++++-------------- src/main.rs | 10 ++-- src/server.rs | 124 ++++++++++++-------------------------------------- 3 files changed, 99 insertions(+), 132 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index b8388d9..997f272 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -55,23 +55,36 @@ pub struct FeedEntry { body: Option, } -#[derive(Debug, Default, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq)] pub struct FeedResult { pub entries: Option>, pub url: String, pub feed_id: i64, } -#[derive(Debug, Clone)] -pub struct NewFeed { - url: String, - owner: u32, - result_sender: UnboundedSender>, +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct FeedCommand { + feed: String, + action: Action, } -impl PartialEq for NewFeed { +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum Action { + Add, + Remove, + Help, +} + +#[derive(Debug, Clone)] +pub struct UserRequest { + command: FeedCommand, + owner: u32, + result_sender: UnboundedSender>, +} + +impl PartialEq for UserRequest { fn eq(&self, other: &Self) -> bool { - self.url == other.url && self.owner == other.owner + self.command == other.command && self.owner == other.owner } } @@ -149,12 +162,11 @@ impl BlogdorTheAggregator { self.cancel.cancelled().await } - pub async fn spawn_http(&self, announce_tx: UnboundedSender) { + pub async fn spawn_http(&self, user_request_tx: UnboundedSender) { let state = ServerState::new( - self.db.clone(), &self.zulip_to_blogdor_email, &self.blogdor_token, - announce_tx, + user_request_tx, ); server::spawn_server(state, self.cancel.clone()).await; } @@ -233,21 +245,40 @@ impl BlogdorTheAggregator { } } - pub async fn add_feed(&self, add_request: &NewFeed) { - if let Err(e) = self.add_user(add_request.owner).await { - let _ = add_request.result_sender.send(Err(e)); + pub async fn process_user_request(&self, user_request: &UserRequest) { + match user_request.command.action { + Action::Add => self.add_feed(user_request).await, + Action::Help => user_request.result_sender.send(Ok("DM or `@blogdor's manager` with `add `, `remove `, or `help` to get this message (duh).".to_string())).unwrap_or_default(), + Action::Remove => self.remove_feed(user_request).await, + } + } + + async fn remove_feed(&self, remove_request: &UserRequest) { + let _ = remove_request + .result_sender + .send(Err("currently unsupported".to_string())); + } + + async fn add_feed(&self, user_request: &UserRequest) { + if let Err(e) = self.add_user(user_request.owner).await { + let _ = user_request.result_sender.send(Err(e)); return; } - if let Err(e) = crate::fetch_and_parse_feed(&add_request.url, &self.client).await { - let _ = add_request.result_sender.send(Err(e)); + let url = &user_request.command.feed; + let owner = user_request.owner; + + let msg; + + if let Err(e) = crate::fetch_and_parse_feed(url, &self.client).await { + let _ = user_request.result_sender.send(Err(e)); return; } if let Some(id) = match sqlx::query!( "select id from feeds where owner = ? and url = ?", - add_request.url, - add_request.owner + owner, + url ) .fetch_optional(&self.db) .await @@ -255,7 +286,7 @@ impl BlogdorTheAggregator { Ok(r) => r.map(|r| r.id), Err(e) => { tracing::error!("couldn't fetch an optional row from the feeds table: {e}"); - let _ = add_request + let _ = user_request .result_sender .send(Err("whoa, something weird and bad happened".to_string())); return; @@ -266,17 +297,19 @@ impl BlogdorTheAggregator { .await { tracing::error!("got error inserting into status: {e}"); - let _ = add_request.result_sender.send(Err(format!( + let _ = user_request.result_sender.send(Err(format!( "could not activate previously added feed at {} for Zulip user {}", - &add_request.url, add_request.owner + url, owner ))); + return; } + msg = format!("marked previously added feed at {url} as active"); } else { let txn = match self.db.begin().await { Ok(txn) => txn, Err(e) => { tracing::error!("got error begining a transaction: {e}"); - let _ = add_request + let _ = user_request .result_sender .send(Err("could not add feed".to_string())); return; @@ -285,15 +318,15 @@ impl BlogdorTheAggregator { // get the ID for the feed let id = match sqlx::query!( "insert into feeds (url, owner) values (?, ?) returning id", - add_request.url, - add_request.owner + url, + owner ) .fetch_one(&self.db) .await { Err(e) => { tracing::error!("error inserting into feeds: {e}"); - let _ = add_request + let _ = user_request .result_sender .send(Err("could not add feed".to_string())); return; @@ -307,7 +340,7 @@ impl BlogdorTheAggregator { .await { tracing::error!("error inserting into status: {e}"); - let _ = add_request + let _ = user_request .result_sender .send(Err("could not add feed".to_string())); return; @@ -320,7 +353,7 @@ impl BlogdorTheAggregator { .await { tracing::error!("error inserting into runs: {e}"); - let _ = add_request + let _ = user_request .result_sender .send(Err("could not add feed".to_string())); return; @@ -329,18 +362,16 @@ impl BlogdorTheAggregator { // woo! if let Err(e) = txn.commit().await { tracing::error!("error committing add-feed transaction: {e}"); - let _ = add_request + let _ = user_request .result_sender .send(Err("could not add feed".to_string())); return; } + msg = format!("added new feed at {url}"); } - let _ = add_request.result_sender.send(Ok(())); + let _ = user_request.result_sender.send(Ok(msg)); - let content = format!( - "@**|{}**: added a new feed: {}", - add_request.owner, add_request.url - ); + let content = format!("@**|{}**: added a new feed: {}", owner, url); let msg = ZulipMessage { to: self.channel_id, typ: MessageType::Stream, diff --git a/src/main.rs b/src/main.rs index 7ecc3be..dabdd0c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,6 +1,6 @@ use std::time::Duration; -use blogdor::{BlogdorTheAggregator, NewFeed}; +use blogdor::{BlogdorTheAggregator, UserRequest}; use tokio::sync::mpsc::{UnboundedReceiver, unbounded_channel}; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; @@ -31,16 +31,16 @@ fn init_logs() { .init(); } -async fn run_loop(bta: &BlogdorTheAggregator, mut announce_rx: UnboundedReceiver) { +async fn run_loop(bta: &BlogdorTheAggregator, mut user_req_rx: UnboundedReceiver) { let mut check_feeds = tokio::time::interval(BLOGDOR_SNOOZE); let mut check_stale = tokio::time::interval(Duration::from_hours(24)); loop { tokio::select! { biased; - announce = announce_rx.recv() => { - if let Some(announce) = announce { - bta.add_feed(&announce).await; + user_req = user_req_rx.recv() => { + if let Some(ureq) = user_req { + bta.process_user_request(&ureq).await; } } _ = check_feeds.tick() => { diff --git a/src/server.rs b/src/server.rs index d1f93c8..2ee1c6b 100644 --- a/src/server.rs +++ b/src/server.rs @@ -9,7 +9,6 @@ use axum::{ }; use serde::Deserialize; use serde_json::{Map, Value}; -use sqlx::SqlitePool; use tokio::sync::mpsc::{UnboundedSender, unbounded_channel}; use tokio_util::sync::CancellationToken; use winnow::{ @@ -20,28 +19,21 @@ use winnow::{ token::{literal, take_until, take_while}, }; -use crate::NewFeed; +use crate::{Action, FeedCommand, UserRequest}; type Payload = Map; #[derive(Debug, Clone)] pub struct ServerState { - db: SqlitePool, email: String, token: String, - announce_tx: UnboundedSender, + user_request_tx: UnboundedSender, } impl ServerState { - pub fn new( - db: SqlitePool, - email: &str, - token: &str, - announce_tx: UnboundedSender, - ) -> Self { + pub fn new(email: &str, token: &str, user_request_tx: UnboundedSender) -> Self { Self { - db, - announce_tx, + user_request_tx, email: email.to_string(), token: token.to_string(), } @@ -105,49 +97,28 @@ async fn handle_manage_feed( tracing::debug!(command = ?command); - match command.action { - Action::Add => { - let (tx, mut rx) = unbounded_channel(); - match state.announce_tx.send(NewFeed { - url: command.feed.to_string(), - owner: sender_id, - result_sender: tx, - }) { - Ok(_) => {} - Err(e) => { - tracing::error!("could not send add feed message to runloop: {e}"); - resp.insert("content", "oh no, something terrible happened!".to_string()); - } - } - - let s = rx.recv().await; - dbg!(&s); - match s { - Some(Ok(_)) => { - dbg!("success"); - resp.insert("content", "Blogdor Says: SUCCESS!".to_string()); - } - Some(Err(e)) => { - dbg!(&e); - resp.insert("content", format!("Blogdor Says: ERRORED! {e}")); - } - None => return StatusCode::INTERNAL_SERVER_ERROR.into_response(), - } - } - Action::Remove => match remove_feed(&state.db, sender_id, command.feed).await { - Ok(_) => { - resp.insert("content", "sorry, that's currently unsupported".to_string()); - } - Err(e) => { - resp.insert("content", e); - } - }, - Action::Help => { - resp.insert("content", "DM or `@blogdor's manager` with `add `, `remove `, or `help` to get this message (duh).".to_string()); + let (tx, mut rx) = unbounded_channel(); + match state.user_request_tx.send(UserRequest { + command, + owner: sender_id, + result_sender: tx, + }) { + Ok(_) => {} + Err(e) => { + tracing::error!("could not send add feed message to runloop: {e}"); + resp.insert("content", "oh no, something terrible happened!".to_string()); } } - dbg!(&resp); + match rx.recv().await { + Some(Ok(r)) => { + resp.insert("content", format!("Blogdor Says: {r}")); + } + Some(Err(e)) => { + resp.insert("content", format!("Blogdor Says: ERRORED! {e}")); + } + None => return StatusCode::INTERNAL_SERVER_ERROR.into_response(), + } Json(resp).into_response() } else { @@ -155,31 +126,6 @@ async fn handle_manage_feed( } } -async fn remove_feed(_db: &SqlitePool, _user: u32, _feed: &str) -> Result<(), String> { - // sqlx::query!( - // "update feeds set active = false, updated_at = current_timestamp where - // url = ? and added_by = ?", feed, - // user - // ) - // .execute(db) - // .await - // .map_err(|e| { - // tracing::error!("could not set {feed} inactive by {user}, got {e}"); - // "sorry buddy, Blogdor couldn't do that".to_string() - // })?; - // sqlx::query!( - // "select * from feeds where added_by = ? and active = false", - // user - // ) - // .fetch_one(db) - // .await - // .map_err(|e| { - // tracing::error!("could not set {feed} inactive by {user}, got {e}"); - // "sorry buddy, Blogdor couldn't do that".to_string() - // })?; - Ok(()) -} - async fn graceful_shutdown(cancel: CancellationToken) { use tokio::signal; let ctrl_c = async { @@ -223,20 +169,7 @@ struct ZulipMessage { _rest: Payload, } -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -enum Action { - Add, - Remove, - Help, -} - -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -struct FeedCommand<'req> { - feed: &'req str, - action: Action, -} - -fn parse_command<'i>(input: &mut &'i str) -> winnow::Result>> { +fn parse_command<'i>(input: &mut &'i str) -> winnow::Result> { let s = take_until::<_, _, ()>(0.., "@**blogdor's manager**").parse_next(input); match s { Err(_) => {} @@ -276,7 +209,10 @@ fn parse_command<'i>(input: &mut &'i str) -> winnow::Result