This commit is contained in:
Joe 2025-12-20 22:19:14 -08:00
parent 452e413ede
commit 1f109edb35
7 changed files with 151 additions and 35 deletions

View file

@ -1 +0,0 @@
ardent@pinkee.17538:1750983153

View file

@ -1 +1 @@
-- Add down migration script here DROP TABLE IF EXISTS users;

View file

@ -1,3 +1,4 @@
CREATE TABLE IF NOT EXISTS users ( CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY,
zulip_id INT UNIQUE NOT NULL
); );

View file

@ -2,8 +2,8 @@ CREATE TABLE IF NOT EXISTS feeds (
id INTEGER PRIMARY KEY, id INTEGER PRIMARY KEY,
url TEXT UNIQUE NOT NULL, url TEXT UNIQUE NOT NULL,
added_by INT NOT NULL, added_by INT NOT NULL,
last_modified_by TEXT NOT NULL,
active BOOLEAN NOT NULL DEFAULT FALSE, active BOOLEAN NOT NULL DEFAULT FALSE,
created_at DATETIME NOT NULL DEFAULT current_timestamp, created_at DATETIME NOT NULL DEFAULT current_timestamp,
updated_at DATETIME NOT NULL DEFAULT current_timestamp updated_at DATETIME NOT NULL DEFAULT current_timestamp,
FOREIGN KEY (added_by) REFERENCES users(zulip_id)
); );

View file

@ -1,14 +1,14 @@
use std::time::Duration; use std::time::Duration;
use feed_rs::parser::parse; use feed_rs::parser::parse;
use reqwest::StatusCode; use reqwest::{Response, StatusCode};
use server::ServerState; use server::ServerState;
use sqlx::{ use sqlx::{
SqlitePool, SqlitePool,
sqlite::{SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions}, sqlite::{SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions},
types::chrono::{DateTime, Utc}, types::chrono::{DateTime, Utc},
}; };
use tokio::task::JoinSet; use tokio::{sync::mpsc::UnboundedSender, task::JoinSet};
use tokio_util::{bytes::Buf, sync::CancellationToken}; use tokio_util::{bytes::Buf, sync::CancellationToken};
use unicode_segmentation::UnicodeSegmentation; use unicode_segmentation::UnicodeSegmentation;
@ -55,6 +55,12 @@ pub struct FeedResult {
pub feed_id: i64, pub feed_id: i64,
} }
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct NewFeed {
feed: String,
user: String,
}
#[derive(Debug, Default, Clone, PartialEq, Eq, serde::Serialize)] #[derive(Debug, Default, Clone, PartialEq, Eq, serde::Serialize)]
struct ZulipMessage<'s> { struct ZulipMessage<'s> {
to: u32, to: u32,
@ -99,11 +105,12 @@ impl BlogdorTheAggregator {
self.cancel.cancelled().await self.cancel.cancelled().await
} }
pub async fn spawn_http(&self) { pub async fn spawn_http(&self, announce_tx: UnboundedSender<NewFeed>) {
let state = ServerState::new( let state = ServerState::new(
self.db.clone(), self.db.clone(),
&self.zulip_to_blogdor_email, &self.zulip_to_blogdor_email,
&self.blogdor_token, &self.blogdor_token,
announce_tx,
); );
server::spawn_server(state, self.cancel.clone()).await; server::spawn_server(state, self.cancel.clone()).await;
} }
@ -138,6 +145,26 @@ impl BlogdorTheAggregator {
Ok(feed_results) Ok(feed_results)
} }
pub async fn announce_feed(&self, announce: &NewFeed) {
let content = format!("{} added a new feed: {}", announce.user, announce.feed);
let msg = ZulipMessage {
to: self.channel_id,
typ: "stream",
content,
topic: Some("New feeds"),
};
match self.send_zulip_message(&msg).await {
Err(e) => {
tracing::error!("got error sending to zulip: {e}");
}
Ok(r) => {
if r.status() != StatusCode::OK {
tracing::warn!("did not successfully post to zulip: status {}", r.status());
}
}
}
}
// will also update the successful_runs table if it posts to zulip // will also update the successful_runs table if it posts to zulip
pub async fn post_entries(&self, posts: &[FeedEntry]) { pub async fn post_entries(&self, posts: &[FeedEntry]) {
let FeedEntry { let FeedEntry {
@ -165,17 +192,8 @@ impl BlogdorTheAggregator {
content, content,
topic: Some(&post.title), topic: Some(&post.title),
}; };
let msg = serde_urlencoded::to_string(msg).expect("serialize msg");
match self match self.send_zulip_message(&msg).await {
.client
.post(&self.endpoint)
.basic_auth(&self.blogdor_to_zulip_email, Some(&self.zulip_token))
.body(msg)
.header("Content-Type", "application/x-www-form-urlencoded")
.send()
.await
{
Err(e) => { Err(e) => {
tracing::error!("got error sending to zulip: {e}"); tracing::error!("got error sending to zulip: {e}");
success = false; success = false;
@ -207,6 +225,18 @@ impl BlogdorTheAggregator {
pub async fn close_db(&self) { pub async fn close_db(&self) {
self.db.close().await; self.db.close().await;
} }
async fn send_zulip_message<'s>(&'s self, msg: &ZulipMessage<'s>) -> Result<Response, String> {
let msg = serde_urlencoded::to_string(msg).expect("serialize msg");
self.client
.post(&self.endpoint)
.basic_auth(&self.blogdor_to_zulip_email, Some(&self.zulip_token))
.body(msg)
.header("Content-Type", "application/x-www-form-urlencoded")
.send()
.await
.map_err(|e| format!("{e}"))
}
} }
// takes args by value because it's meant to be called from inside a spawned // takes args by value because it's meant to be called from inside a spawned

View file

@ -1,6 +1,7 @@
use std::time::Duration; use std::time::Duration;
use blogdor::BlogdorTheAggregator; use blogdor::{BlogdorTheAggregator, NewFeed};
use tokio::sync::mpsc::{UnboundedReceiver, unbounded_channel};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
const BLOGDOR_SNOOZE: Duration = Duration::from_hours(1); const BLOGDOR_SNOOZE: Duration = Duration::from_hours(1);
@ -10,9 +11,10 @@ async fn main() {
init_logs(); init_logs();
let bta = BlogdorTheAggregator::new().await; let bta = BlogdorTheAggregator::new().await;
bta.spawn_http().await; let (tx, rx) = unbounded_channel();
bta.spawn_http(tx).await;
run_loop(&bta).await; run_loop(&bta, rx).await;
bta.close_db().await; bta.close_db().await;
@ -29,11 +31,16 @@ fn init_logs() {
.init(); .init();
} }
async fn run_loop(bta: &BlogdorTheAggregator) { async fn run_loop(bta: &BlogdorTheAggregator, mut announce_rx: UnboundedReceiver<NewFeed>) {
let mut alarm = tokio::time::interval(BLOGDOR_SNOOZE); let mut alarm = tokio::time::interval(BLOGDOR_SNOOZE);
loop { loop {
tokio::select! { tokio::select! {
biased; biased;
announce = announce_rx.recv() => {
if let Some(announce) = announce {
bta.announce_feed(&announce).await;
}
}
_ = alarm.tick() => { _ = alarm.tick() => {
match bta.check_feeds().await { match bta.check_feeds().await {
Ok(results) => { Ok(results) => {

View file

@ -10,15 +10,18 @@ use axum::{
use serde::Deserialize; use serde::Deserialize;
use serde_json::{Map, Value}; use serde_json::{Map, Value};
use sqlx::SqlitePool; use sqlx::SqlitePool;
use tokio::sync::mpsc::UnboundedSender;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use winnow::{ use winnow::{
Parser, Parser,
ascii::space0, ascii::{newline, space0},
combinator::{alt, fail}, combinator::{alt, eof, fail},
error::StrContext, error::StrContext,
token::take_while, token::take_while,
}; };
use crate::NewFeed;
type Payload = Map<String, Value>; type Payload = Map<String, Value>;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
@ -26,12 +29,19 @@ pub struct ServerState {
db: SqlitePool, db: SqlitePool,
email: String, email: String,
token: String, token: String,
announce_tx: UnboundedSender<NewFeed>,
} }
impl ServerState { impl ServerState {
pub fn new(db: SqlitePool, email: &str, token: &str) -> Self { pub fn new(
db: SqlitePool,
email: &str,
token: &str,
announce_tx: UnboundedSender<NewFeed>,
) -> Self {
Self { Self {
db, db,
announce_tx,
email: email.to_string(), email: email.to_string(),
token: token.to_string(), token: token.to_string(),
} }
@ -70,13 +80,10 @@ async fn handle_manage_feed(
message, message,
_rest: _, _rest: _,
} = request; } = request;
tracing::debug!("email: {bot_email}, token: {token}");
if state.email == bot_email && state.token == token {
tracing::debug!("gonna do a thing with {message:?}");
if state.email == bot_email && state.token == token {
let ZulipMessage { let ZulipMessage {
mut content, content,
ref sender_email,
sender_id, sender_id,
sender_full_name, sender_full_name,
.. ..
@ -87,18 +94,83 @@ async fn handle_manage_feed(
let command = parse_command(&mut content.as_str()); let command = parse_command(&mut content.as_str());
let command = match command { let command = match command {
Err(e) => { Err(e) => {
resp["content"] = format!("I did not understand: {e}"); resp.insert("content", e.to_string());
return Json(resp).into_response(); return Json(resp).into_response();
} }
Ok(c) => c, Ok(c) => c,
}; };
tracing::debug!(command = ?command);
match command.action {
Action::Add => match add_feed(&state.db, sender_id, command.feed).await {
Ok(_) => {
let _ = state.announce_tx.send(NewFeed {
feed: command.feed.to_string(),
user: sender_full_name,
});
resp.insert("content", "Blogdor Says: SUCCESS!".to_string());
}
Err(e) => {
resp.insert("content", format!("Blogdor Says: OH NO! {e}"));
}
},
Action::Remove => return StatusCode::UNAVAILABLE_FOR_LEGAL_REASONS.into_response(),
Action::Help => {
resp.insert("content", "DM or `@blogdor's manager` with `add <feed url, RSS or Atom XML files>`, `remove <feed url originally added by you>`, or `help` to get this message (duh).".to_string());
}
}
Json(resp).into_response() Json(resp).into_response()
} else { } else {
StatusCode::IM_A_TEAPOT.into_response() StatusCode::IM_A_TEAPOT.into_response()
} }
} }
async fn add_feed(db: &SqlitePool, user: u32, feed: &str) -> Result<(), String> {
add_user(db, user).await?;
sqlx::query!(
"insert into feeds (url, added_by, active) values (?, ?, true)",
feed,
user
)
.execute(db)
.await
.map_err(|e| format!("{e}"))?;
Ok(())
}
async fn add_user(db: &SqlitePool, user: u32) -> Result<(), String> {
if let Err(e) = sqlx::query!("insert into users (zulip_id) values (?)", user)
.execute(db)
.await
{
match e {
sqlx::Error::Database(database_error) => {
// the users table has only one constraint, which is a uniqueness one on
// zulip_id, so if it's violated, we don't care, it just means we already have
// that user; if it's not a constraint violation, then something
// else and bad has happened
if database_error.constraint().is_some() {
return Ok(());
}
}
sqlx::Error::Io(error) => {
tracing::error!("got IO error: {error}");
return Err("you should maybe retry that".to_string());
}
_ => return Err("yikes".to_string()),
}
}
Ok(())
}
async fn _remove_feed() -> Result<(), String> {
todo!()
}
async fn graceful_shutdown(cancel: CancellationToken) { async fn graceful_shutdown(cancel: CancellationToken) {
use tokio::signal; use tokio::signal;
let ctrl_c = async { let ctrl_c = async {
@ -137,26 +209,28 @@ struct ManageFeedMessage {
#[derive(Clone, Debug, PartialEq, Eq, Deserialize)] #[derive(Clone, Debug, PartialEq, Eq, Deserialize)]
struct ZulipMessage { struct ZulipMessage {
content: String, content: String,
sender_email: String,
sender_id: u32, sender_id: u32,
sender_full_name: String, sender_full_name: String,
#[serde(flatten)] #[serde(flatten)]
_rest: Payload, _rest: Payload,
} }
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum Action { enum Action {
Add, Add,
Remove, Remove,
Help, Help,
} }
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
struct FeedCommand<'req> { struct FeedCommand<'req> {
feed: &'req str, feed: &'req str,
action: Action, action: Action,
} }
fn parse_command<'i>(input: &mut &'i str) -> winnow::Result<FeedCommand<'i>> { fn parse_command<'i>(input: &mut &'i str) -> winnow::Result<FeedCommand<'i>> {
let _ = "**@blogdor's manager**".parse_next(input)?; let _ = alt(("@**blogdor's manager**", space0)).parse_next(input)?;
let action = ( let action = (
space0, space0,
alt(( alt((
@ -176,8 +250,13 @@ fn parse_command<'i>(input: &mut &'i str) -> winnow::Result<FeedCommand<'i>> {
}) })
.parse_next(input)?; .parse_next(input)?;
let feed = (space0, take_while(0.., |c: char| !c.is_whitespace())) let feed = (
.map(|(_, f)| f) space0,
take_while(0.., |c: char| !c.is_whitespace()),
space0,
alt((eof.void(), newline.void())),
)
.map(|(_, f, _, _)| f)
.parse_next(input)?; .parse_next(input)?;
Ok(FeedCommand { feed, action }) Ok(FeedCommand { feed, action })
} }