everything seems to work, need to add tests

This commit is contained in:
Joe 2026-01-01 19:02:57 -08:00
parent 71e1086ff7
commit 18c8749f91
3 changed files with 168 additions and 93 deletions

View file

@ -0,0 +1,10 @@
{
"bot_email": "blogdor-outgoing-bot@zulip-host",
"token": "another_token",
"message": {
"sender_email": "sender-email",
"sender_id": 1,
"sender_full_name": "magoo",
"content": "@**blogdor's manager** add invalid"
}
}

View file

@ -23,7 +23,6 @@ const LAST_FETCHED: DateTime<Utc> = DateTime::from_timestamp_nanos(0);
const STALE_FETCH_THRESHOLD: Duration = Duration::from_hours(24); const STALE_FETCH_THRESHOLD: Duration = Duration::from_hours(24);
const ADD_FEED_QUERY: &str = "";
const ACTIVE_FEEDS_QUERY: &str = r#"SELECT id, url, owner, created, fetched, posted, run, feed FROM feeds const ACTIVE_FEEDS_QUERY: &str = r#"SELECT id, url, owner, created, fetched, posted, run, feed FROM feeds
INNER JOIN INNER JOIN
(SELECT feed, MAX(id) _, run, fetched, posted FROM runs WHERE feed IN (SELECT feed, MAX(id) _, run, fetched, posted FROM runs WHERE feed IN
@ -66,7 +65,7 @@ pub struct FeedResult {
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct NewFeed { pub struct NewFeed {
url: String, url: String,
owner: String, owner: u32,
result_sender: UnboundedSender<Result<(), String>>, result_sender: UnboundedSender<Result<(), String>>,
} }
@ -100,7 +99,6 @@ pub struct ActiveFeed {
id: i64, id: i64,
owner: i64, owner: i64,
created: DateTime<Utc>, created: DateTime<Utc>,
updated: DateTime<Utc>,
#[sqlx(flatten)] #[sqlx(flatten)]
last_run: FeedRun, last_run: FeedRun,
} }
@ -157,7 +155,6 @@ impl BlogdorTheAggregator {
&self.zulip_to_blogdor_email, &self.zulip_to_blogdor_email,
&self.blogdor_token, &self.blogdor_token,
announce_tx, announce_tx,
self.client.clone(),
); );
server::spawn_server(state, self.cancel.clone()).await; server::spawn_server(state, self.cancel.clone()).await;
} }
@ -173,9 +170,8 @@ impl BlogdorTheAggregator {
for feed in feeds { for feed in feeds {
let id = feed.id; let id = feed.id;
let url = feed.url; let url = feed.url;
let created_at = feed.created;
let last_run = feed.last_run; let last_run = feed.last_run;
let last = last_run.posted.unwrap_or(created_at); let last = last_run.posted.unwrap_or(LAST_FETCHED);
handles.spawn(check_feed(self.client.clone(), id, url, last, feed.owner)); handles.spawn(check_feed(self.client.clone(), id, url, last, feed.owner));
} }
@ -237,10 +233,113 @@ impl BlogdorTheAggregator {
} }
} }
pub async fn add_feed(&self, announce: &NewFeed) { pub async fn add_feed(&self, add_request: &NewFeed) {
if let Err(e) = self.add_user(add_request.owner).await {
let _ = add_request.result_sender.send(Err(e));
return;
}
if let Err(e) = crate::fetch_and_parse_feed(&add_request.url, &self.client).await {
let _ = add_request.result_sender.send(Err(e));
return;
}
if let Some(id) = match sqlx::query!(
"select id from feeds where owner = ? and url = ?",
add_request.url,
add_request.owner
)
.fetch_optional(&self.db)
.await
{
Ok(r) => r.map(|r| r.id),
Err(e) => {
tracing::error!("couldn't fetch an optional row from the feeds table: {e}");
let _ = add_request
.result_sender
.send(Err("whoa, something weird and bad happened".to_string()));
return;
}
} {
if let Err(e) = sqlx::query!("insert into status (feed, active) values (?, true)", id)
.execute(&self.db)
.await
{
tracing::error!("got error inserting into status: {e}");
let _ = add_request.result_sender.send(Err(format!(
"could not activate previously added feed at {} for Zulip user {}",
&add_request.url, add_request.owner
)));
}
} else {
let txn = match self.db.begin().await {
Ok(txn) => txn,
Err(e) => {
tracing::error!("got error begining a transaction: {e}");
let _ = add_request
.result_sender
.send(Err("could not add feed".to_string()));
return;
}
};
// get the ID for the feed
let id = match sqlx::query!(
"insert into feeds (url, owner) values (?, ?) returning id",
add_request.url,
add_request.owner
)
.fetch_one(&self.db)
.await
{
Err(e) => {
tracing::error!("error inserting into feeds: {e}");
let _ = add_request
.result_sender
.send(Err("could not add feed".to_string()));
return;
}
Ok(id) => id.id,
};
// add a row in status for the new feed
if let Err(e) = sqlx::query!("insert into status (feed, active) values (?, true)", id)
.execute(&self.db)
.await
{
tracing::error!("error inserting into status: {e}");
let _ = add_request
.result_sender
.send(Err("could not add feed".to_string()));
return;
}
// need one row in the runs table to allow the inner join in the active feeds
// query to work
if let Err(e) = sqlx::query!("insert into runs (feed) values (?)", id)
.execute(&self.db)
.await
{
tracing::error!("error inserting into runs: {e}");
let _ = add_request
.result_sender
.send(Err("could not add feed".to_string()));
return;
}
// woo!
if let Err(e) = txn.commit().await {
tracing::error!("error committing add-feed transaction: {e}");
let _ = add_request
.result_sender
.send(Err("could not add feed".to_string()));
return;
}
}
let _ = add_request.result_sender.send(Ok(()));
let content = format!( let content = format!(
"@**|{}**: added a new feed: {}", "@**|{}**: added a new feed: {}",
announce.owner, announce.url add_request.owner, add_request.url
); );
let msg = ZulipMessage { let msg = ZulipMessage {
to: self.channel_id, to: self.channel_id,
@ -331,7 +430,8 @@ impl BlogdorTheAggregator {
let db_posted = db_posted.and_then(|p| p.posted.map(|p| p.and_utc())); let db_posted = db_posted.and_then(|p| p.posted.map(|p| p.and_utc()));
let posted = posted.or(db_posted); let posted = posted.or(db_posted);
if let Err(e) = sqlx::query!( if let Err(e) = sqlx::query!(
"insert into runs (fetched, posted) values (?, ?)", "insert into runs (feed, fetched, posted) values (?, ?, ?)",
feed,
fetched, fetched,
posted posted
) )

View file

@ -10,7 +10,7 @@ use axum::{
use serde::Deserialize; use serde::Deserialize;
use serde_json::{Map, Value}; use serde_json::{Map, Value};
use sqlx::SqlitePool; use sqlx::SqlitePool;
use tokio::sync::mpsc::UnboundedSender; use tokio::sync::mpsc::{UnboundedSender, unbounded_channel};
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use winnow::{ use winnow::{
Parser, Parser,
@ -27,7 +27,6 @@ type Payload = Map<String, Value>;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct ServerState { pub struct ServerState {
db: SqlitePool, db: SqlitePool,
client: reqwest::Client,
email: String, email: String,
token: String, token: String,
announce_tx: UnboundedSender<NewFeed>, announce_tx: UnboundedSender<NewFeed>,
@ -39,11 +38,9 @@ impl ServerState {
email: &str, email: &str,
token: &str, token: &str,
announce_tx: UnboundedSender<NewFeed>, announce_tx: UnboundedSender<NewFeed>,
client: reqwest::Client,
) -> Self { ) -> Self {
Self { Self {
db, db,
client,
announce_tx, announce_tx,
email: email.to_string(), email: email.to_string(),
token: token.to_string(), token: token.to_string(),
@ -86,10 +83,7 @@ async fn handle_manage_feed(
if state.email == bot_email && state.token == token { if state.email == bot_email && state.token == token {
let ZulipMessage { let ZulipMessage {
content, content, sender_id, ..
sender_id,
sender_full_name,
..
} = message; } = message;
let mut resp: HashMap<&str, String> = HashMap::new(); let mut resp: HashMap<&str, String> = HashMap::new();
@ -113,22 +107,36 @@ async fn handle_manage_feed(
match command.action { match command.action {
Action::Add => { Action::Add => {
match add_feed(&state.db, sender_id, command.feed, &state.client).await { let (tx, mut rx) = unbounded_channel();
Ok(_) => { match state.announce_tx.send(NewFeed {
let _ = state.announce_tx.send(NewFeed {
url: command.feed.to_string(), url: command.feed.to_string(),
owner: sender_full_name, owner: sender_id,
}); result_sender: tx,
}) {
Ok(_) => {}
Err(e) => {
tracing::error!("could not send add feed message to runloop: {e}");
resp.insert("content", "oh no, something terrible happened!".to_string());
}
}
let s = rx.recv().await;
dbg!(&s);
match s {
Some(Ok(_)) => {
dbg!("success");
resp.insert("content", "Blogdor Says: SUCCESS!".to_string()); resp.insert("content", "Blogdor Says: SUCCESS!".to_string());
} }
Err(e) => { Some(Err(e)) => {
resp.insert("content", format!("Blogdor Says: OH NO! {e}")); dbg!(&e);
resp.insert("content", format!("Blogdor Says: ERRORED! {e}"));
} }
None => return StatusCode::INTERNAL_SERVER_ERROR.into_response(),
} }
} }
Action::Remove => match remove_feed(&state.db, sender_id, command.feed).await { Action::Remove => match remove_feed(&state.db, sender_id, command.feed).await {
Ok(_) => { Ok(_) => {
resp.insert("content", "Blogdor Says: BURNINATED!".to_string()); resp.insert("content", "sorry, that's currently unsupported".to_string());
} }
Err(e) => { Err(e) => {
resp.insert("content", e); resp.insert("content", e);
@ -139,78 +147,36 @@ async fn handle_manage_feed(
} }
} }
dbg!(&resp);
Json(resp).into_response() Json(resp).into_response()
} else { } else {
StatusCode::IM_A_TEAPOT.into_response() StatusCode::IM_A_TEAPOT.into_response()
} }
} }
async fn remove_feed(db: &SqlitePool, user: u32, feed: &str) -> Result<(), String> { async fn remove_feed(_db: &SqlitePool, _user: u32, _feed: &str) -> Result<(), String> {
sqlx::query!( // sqlx::query!(
"update feeds set active = false, updated_at = current_timestamp where url = ? and added_by = ?", // "update feeds set active = false, updated_at = current_timestamp where
feed, // url = ? and added_by = ?", feed,
user // user
) // )
.execute(db) // .execute(db)
.await // .await
.map_err(|e| { // .map_err(|e| {
tracing::error!("could not set {feed} inactive by {user}, got {e}"); // tracing::error!("could not set {feed} inactive by {user}, got {e}");
"sorry buddy, Blogdor couldn't do that".to_string() // "sorry buddy, Blogdor couldn't do that".to_string()
})?; // })?;
sqlx::query!( // sqlx::query!(
"select * from feeds where added_by = ? and active = false", // "select * from feeds where added_by = ? and active = false",
user // user
) // )
.fetch_one(db) // .fetch_one(db)
.await // .await
.map_err(|e| { // .map_err(|e| {
tracing::error!("could not set {feed} inactive by {user}, got {e}"); // tracing::error!("could not set {feed} inactive by {user}, got {e}");
"sorry buddy, Blogdor couldn't do that".to_string() // "sorry buddy, Blogdor couldn't do that".to_string()
})?; // })?;
Ok(())
}
async fn add_feed(
db: &SqlitePool,
user: u32,
feed: &str,
client: &reqwest::Client,
) -> Result<(), String> {
add_user(db, user).await?;
let _ = crate::fetch_and_parse_feed(feed, client).await?;
sqlx::query!(
"update feeds set active = true, updated_at = current_timestamp where url = ? and added_by = ?",
feed,
user
)
.execute(db)
.await
.map_err(|e| format!("Got error adding feed: {e}"))?;
if sqlx::query!(
"select * from feeds where added_by = ? and url = ? and active = true",
user,
feed
)
.fetch_optional(db)
.await
.map_err(|e| format!("{e}"))?
.is_some()
{
return Ok(());
}
sqlx::query!(
"insert into feeds (url, added_by, active) values (?, ?, true)",
feed,
user
)
.execute(db)
.await
.map_err(|e| format!("{e}"))?;
Ok(()) Ok(())
} }
@ -253,7 +219,6 @@ struct ManageFeedMessage {
struct ZulipMessage { struct ZulipMessage {
content: String, content: String,
sender_id: u32, sender_id: u32,
sender_full_name: String,
#[serde(flatten)] #[serde(flatten)]
_rest: Payload, _rest: Payload,
} }