diff --git a/migrations/.#0001_users.up.sql b/migrations/.#0001_users.up.sql deleted file mode 120000 index ee098d3..0000000 --- a/migrations/.#0001_users.up.sql +++ /dev/null @@ -1 +0,0 @@ -ardent@pinkee.17538:1750983153 \ No newline at end of file diff --git a/migrations/0001_users.down.sql b/migrations/0001_users.down.sql index d2f607c..c99ddcd 100644 --- a/migrations/0001_users.down.sql +++ b/migrations/0001_users.down.sql @@ -1 +1 @@ --- Add down migration script here +DROP TABLE IF EXISTS users; diff --git a/migrations/0001_users.up.sql b/migrations/0001_users.up.sql index 9abfafa..aa0a16b 100644 --- a/migrations/0001_users.up.sql +++ b/migrations/0001_users.up.sql @@ -1,3 +1,4 @@ CREATE TABLE IF NOT EXISTS users ( - + id INTEGER PRIMARY KEY, + zulip_id INT UNIQUE NOT NULL ); diff --git a/migrations/0002_feeds.up.sql b/migrations/0002_feeds.up.sql index a48cd8d..62dff28 100644 --- a/migrations/0002_feeds.up.sql +++ b/migrations/0002_feeds.up.sql @@ -2,8 +2,8 @@ CREATE TABLE IF NOT EXISTS feeds ( id INTEGER PRIMARY KEY, url TEXT UNIQUE NOT NULL, added_by INT NOT NULL, - last_modified_by TEXT NOT NULL, active BOOLEAN NOT NULL DEFAULT FALSE, created_at DATETIME NOT NULL DEFAULT current_timestamp, - updated_at DATETIME NOT NULL DEFAULT current_timestamp + updated_at DATETIME NOT NULL DEFAULT current_timestamp, + FOREIGN KEY (added_by) REFERENCES users(zulip_id) ); diff --git a/src/lib.rs b/src/lib.rs index 7ff5f67..7a1f5ca 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,14 +1,14 @@ use std::time::Duration; use feed_rs::parser::parse; -use reqwest::StatusCode; +use reqwest::{Response, StatusCode}; use server::ServerState; use sqlx::{ SqlitePool, sqlite::{SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions}, types::chrono::{DateTime, Utc}, }; -use tokio::task::JoinSet; +use tokio::{sync::mpsc::UnboundedSender, task::JoinSet}; use tokio_util::{bytes::Buf, sync::CancellationToken}; use unicode_segmentation::UnicodeSegmentation; @@ -55,6 +55,12 @@ pub struct FeedResult { pub feed_id: i64, } +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct NewFeed { + feed: String, + user: String, +} + #[derive(Debug, Default, Clone, PartialEq, Eq, serde::Serialize)] struct ZulipMessage<'s> { to: u32, @@ -99,11 +105,12 @@ impl BlogdorTheAggregator { self.cancel.cancelled().await } - pub async fn spawn_http(&self) { + pub async fn spawn_http(&self, announce_tx: UnboundedSender) { let state = ServerState::new( self.db.clone(), &self.zulip_to_blogdor_email, &self.blogdor_token, + announce_tx, ); server::spawn_server(state, self.cancel.clone()).await; } @@ -138,6 +145,26 @@ impl BlogdorTheAggregator { Ok(feed_results) } + pub async fn announce_feed(&self, announce: &NewFeed) { + let content = format!("{} added a new feed: {}", announce.user, announce.feed); + let msg = ZulipMessage { + to: self.channel_id, + typ: "stream", + content, + topic: Some("New feeds"), + }; + match self.send_zulip_message(&msg).await { + Err(e) => { + tracing::error!("got error sending to zulip: {e}"); + } + Ok(r) => { + if r.status() != StatusCode::OK { + tracing::warn!("did not successfully post to zulip: status {}", r.status()); + } + } + } + } + // will also update the successful_runs table if it posts to zulip pub async fn post_entries(&self, posts: &[FeedEntry]) { let FeedEntry { @@ -165,17 +192,8 @@ impl BlogdorTheAggregator { content, topic: Some(&post.title), }; - let msg = serde_urlencoded::to_string(msg).expect("serialize msg"); - match self - .client - .post(&self.endpoint) - .basic_auth(&self.blogdor_to_zulip_email, Some(&self.zulip_token)) - .body(msg) - .header("Content-Type", "application/x-www-form-urlencoded") - .send() - .await - { + match self.send_zulip_message(&msg).await { Err(e) => { tracing::error!("got error sending to zulip: {e}"); success = false; @@ -207,6 +225,18 @@ impl BlogdorTheAggregator { pub async fn close_db(&self) { self.db.close().await; } + + async fn send_zulip_message<'s>(&'s self, msg: &ZulipMessage<'s>) -> Result { + let msg = serde_urlencoded::to_string(msg).expect("serialize msg"); + self.client + .post(&self.endpoint) + .basic_auth(&self.blogdor_to_zulip_email, Some(&self.zulip_token)) + .body(msg) + .header("Content-Type", "application/x-www-form-urlencoded") + .send() + .await + .map_err(|e| format!("{e}")) + } } // takes args by value because it's meant to be called from inside a spawned diff --git a/src/main.rs b/src/main.rs index 1cc7ff1..3bfe76e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,6 +1,7 @@ use std::time::Duration; -use blogdor::BlogdorTheAggregator; +use blogdor::{BlogdorTheAggregator, NewFeed}; +use tokio::sync::mpsc::{UnboundedReceiver, unbounded_channel}; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; const BLOGDOR_SNOOZE: Duration = Duration::from_hours(1); @@ -10,9 +11,10 @@ async fn main() { init_logs(); let bta = BlogdorTheAggregator::new().await; - bta.spawn_http().await; + let (tx, rx) = unbounded_channel(); + bta.spawn_http(tx).await; - run_loop(&bta).await; + run_loop(&bta, rx).await; bta.close_db().await; @@ -29,11 +31,16 @@ fn init_logs() { .init(); } -async fn run_loop(bta: &BlogdorTheAggregator) { +async fn run_loop(bta: &BlogdorTheAggregator, mut announce_rx: UnboundedReceiver) { let mut alarm = tokio::time::interval(BLOGDOR_SNOOZE); loop { tokio::select! { biased; + announce = announce_rx.recv() => { + if let Some(announce) = announce { + bta.announce_feed(&announce).await; + } + } _ = alarm.tick() => { match bta.check_feeds().await { Ok(results) => { diff --git a/src/server.rs b/src/server.rs index 1c74967..8445d49 100644 --- a/src/server.rs +++ b/src/server.rs @@ -10,15 +10,18 @@ use axum::{ use serde::Deserialize; use serde_json::{Map, Value}; use sqlx::SqlitePool; +use tokio::sync::mpsc::UnboundedSender; use tokio_util::sync::CancellationToken; use winnow::{ Parser, - ascii::space0, - combinator::{alt, fail}, + ascii::{newline, space0}, + combinator::{alt, eof, fail}, error::StrContext, token::take_while, }; +use crate::NewFeed; + type Payload = Map; #[derive(Debug, Clone)] @@ -26,12 +29,19 @@ pub struct ServerState { db: SqlitePool, email: String, token: String, + announce_tx: UnboundedSender, } impl ServerState { - pub fn new(db: SqlitePool, email: &str, token: &str) -> Self { + pub fn new( + db: SqlitePool, + email: &str, + token: &str, + announce_tx: UnboundedSender, + ) -> Self { Self { db, + announce_tx, email: email.to_string(), token: token.to_string(), } @@ -70,13 +80,10 @@ async fn handle_manage_feed( message, _rest: _, } = request; - tracing::debug!("email: {bot_email}, token: {token}"); - if state.email == bot_email && state.token == token { - tracing::debug!("gonna do a thing with {message:?}"); + if state.email == bot_email && state.token == token { let ZulipMessage { - mut content, - ref sender_email, + content, sender_id, sender_full_name, .. @@ -87,18 +94,83 @@ async fn handle_manage_feed( let command = parse_command(&mut content.as_str()); let command = match command { Err(e) => { - resp["content"] = format!("I did not understand: {e}"); + resp.insert("content", e.to_string()); return Json(resp).into_response(); } Ok(c) => c, }; + tracing::debug!(command = ?command); + + match command.action { + Action::Add => match add_feed(&state.db, sender_id, command.feed).await { + Ok(_) => { + let _ = state.announce_tx.send(NewFeed { + feed: command.feed.to_string(), + user: sender_full_name, + }); + resp.insert("content", "Blogdor Says: SUCCESS!".to_string()); + } + Err(e) => { + resp.insert("content", format!("Blogdor Says: OH NO! {e}")); + } + }, + Action::Remove => return StatusCode::UNAVAILABLE_FOR_LEGAL_REASONS.into_response(), + Action::Help => { + resp.insert("content", "DM or `@blogdor's manager` with `add `, `remove `, or `help` to get this message (duh).".to_string()); + } + } + Json(resp).into_response() } else { StatusCode::IM_A_TEAPOT.into_response() } } +async fn add_feed(db: &SqlitePool, user: u32, feed: &str) -> Result<(), String> { + add_user(db, user).await?; + + sqlx::query!( + "insert into feeds (url, added_by, active) values (?, ?, true)", + feed, + user + ) + .execute(db) + .await + .map_err(|e| format!("{e}"))?; + + Ok(()) +} + +async fn add_user(db: &SqlitePool, user: u32) -> Result<(), String> { + if let Err(e) = sqlx::query!("insert into users (zulip_id) values (?)", user) + .execute(db) + .await + { + match e { + sqlx::Error::Database(database_error) => { + // the users table has only one constraint, which is a uniqueness one on + // zulip_id, so if it's violated, we don't care, it just means we already have + // that user; if it's not a constraint violation, then something + // else and bad has happened + if database_error.constraint().is_some() { + return Ok(()); + } + } + sqlx::Error::Io(error) => { + tracing::error!("got IO error: {error}"); + return Err("you should maybe retry that".to_string()); + } + _ => return Err("yikes".to_string()), + } + } + Ok(()) +} + +async fn _remove_feed() -> Result<(), String> { + todo!() +} + async fn graceful_shutdown(cancel: CancellationToken) { use tokio::signal; let ctrl_c = async { @@ -137,26 +209,28 @@ struct ManageFeedMessage { #[derive(Clone, Debug, PartialEq, Eq, Deserialize)] struct ZulipMessage { content: String, - sender_email: String, sender_id: u32, sender_full_name: String, #[serde(flatten)] _rest: Payload, } +#[derive(Debug, Clone, Copy, PartialEq, Eq)] enum Action { Add, Remove, Help, } +#[derive(Debug, Clone, Copy, PartialEq, Eq)] struct FeedCommand<'req> { feed: &'req str, action: Action, } fn parse_command<'i>(input: &mut &'i str) -> winnow::Result> { - let _ = "**@blogdor's manager**".parse_next(input)?; + let _ = alt(("@**blogdor's manager**", space0)).parse_next(input)?; + let action = ( space0, alt(( @@ -176,8 +250,13 @@ fn parse_command<'i>(input: &mut &'i str) -> winnow::Result> { }) .parse_next(input)?; - let feed = (space0, take_while(0.., |c: char| !c.is_whitespace())) - .map(|(_, f)| f) + let feed = ( + space0, + take_while(0.., |c: char| !c.is_whitespace()), + space0, + alt((eof.void(), newline.void())), + ) + .map(|(_, f, _, _)| f) .parse_next(input)?; Ok(FeedCommand { feed, action }) }