mostly done with the refactor

This commit is contained in:
joe 2026-01-01 12:42:18 -08:00
parent b3cdf9f8ef
commit bb33e28849
11 changed files with 141 additions and 183 deletions

View file

@ -1 +0,0 @@
DROP TABLE IF EXISTS feeds;

View file

@ -1,9 +0,0 @@
CREATE TABLE IF NOT EXISTS feeds (
id INTEGER PRIMARY KEY,
url TEXT UNIQUE NOT NULL,
added_by INT NOT NULL,
active BOOLEAN NOT NULL DEFAULT FALSE,
created_at DATETIME NOT NULL DEFAULT current_timestamp,
updated_at DATETIME NOT NULL DEFAULT current_timestamp,
FOREIGN KEY (added_by) REFERENCES users(zulip_id)
);

View file

@ -0,0 +1,2 @@
DROP TABLE IF EXISTS status;
DROP TABLE IF EXISTS feeds;

View file

@ -0,0 +1,16 @@
CREATE TABLE IF NOT EXISTS feeds (
id INTEGER NOT NULL PRIMARY KEY,
url TEXT NOT NULL,
owner INT NOT NULL,
created DATETIME NOT NULL DEFAULT current_timestamp,
FOREIGN KEY (owner) REFERENCES users(zulip_id),
UNIQUE(url, owner)
);
CREATE TABLE IF NOT EXISTS status (
id INTEGER NOT NULL PRIMARY KEY,
feed INTEGER NOT NULL,
updated DATETIME NOT NULL DEFAULT current_timestamp,
active BOOLEAN NOT NULL DEFAULT FALSE,
FOREIGN KEY (feed) REFERENCES feeds(id)
);

View file

@ -1,6 +1,8 @@
CREATE TABLE IF NOT EXISTS successful_runs ( CREATE TABLE IF NOT EXISTS runs (
id INTEGER PRIMARY KEY, id INTEGER NOT NULL PRIMARY KEY,
date_time DATETIME NOT NULL DEFAULT current_timestamp, run DATETIME NOT NULL DEFAULT current_timestamp,
feed INTEGER NOT NULL, feed INTEGER NOT NULL,
fetched DATETIME,
posted DATETIME,
FOREIGN KEY (feed) REFERENCES feeds(id) FOREIGN KEY (feed) REFERENCES feeds(id)
); );

View file

@ -1 +0,0 @@
DROP TABLE IF EXISTS fetches;

View file

@ -1,6 +0,0 @@
CREATE TABLE IF NOT EXISTS fetches (
id INTEGER PRIMARY KEY,
feed INT NOT NULL,
fetched DATETIME NOT NULL DEFAULT current_timestamp,
FOREIGN KEY (feed) REFERENCES feeds(id)
);

View file

@ -5,65 +5,16 @@ const TIMEOUT: u64 = 2000; // in milliseconds
use std::time::Duration; use std::time::Duration;
use sqlx::{ use sqlx::{
Sqlite, SqlitePool, SqlitePool,
query::Query, sqlite::{SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions},
sqlite::{
SqliteArguments, SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions, SqliteRow,
},
}; };
use crate::BlogdorTheAggregator; use crate::BlogdorTheAggregator;
pub enum DbAction<'q> {
Execute(Query<'q, Sqlite, SqliteArguments<'q>>),
FetchOne(Query<'q, Sqlite, SqliteArguments<'q>>),
FetchMany(Query<'q, Sqlite, SqliteArguments<'q>>),
FetchOptional(Query<'q, Sqlite, SqliteArguments<'q>>),
}
pub enum DbValue {
None,
Optional(Option<SqliteRow>),
One(SqliteRow),
Many(Vec<SqliteRow>),
}
impl BlogdorTheAggregator { impl BlogdorTheAggregator {
pub async fn close_db(&self) { pub async fn close_db(&self) {
self.db.close().await; self.db.close().await;
} }
pub async fn db_action<'q>(&self, query: DbAction<'q>) -> Result<DbValue, String> {
match query {
DbAction::Execute(q) => {
let t = self.db.begin().await.map_err(|e| format!("{e}"))?;
q.execute(&self.db).await.map_err(|e| format!("{e}"))?;
t.commit().await.map_err(|e| format!("{e}"))?;
Ok(DbValue::None)
}
DbAction::FetchOne(q) => {
let t = self.db.begin().await.map_err(|e| format!("{e}"))?;
let r = q.fetch_one(&self.db).await.map_err(|e| format!("{e}"))?;
t.commit().await.map_err(|e| format!("{e}"))?;
Ok(DbValue::One(r))
}
DbAction::FetchMany(q) => {
let t = self.db.begin().await.map_err(|e| format!("{e}"))?;
let r = q.fetch_all(&self.db).await.map_err(|e| format!("{e}"))?;
t.commit().await.map_err(|e| format!("{e}"))?;
Ok(DbValue::Many(r))
}
DbAction::FetchOptional(q) => {
let t = self.db.begin().await.map_err(|e| format!("{e}"))?;
let r = q
.fetch_optional(&self.db)
.await
.map_err(|e| format!("{e}"))?;
t.commit().await.map_err(|e| format!("{e}"))?;
Ok(DbValue::Optional(r))
}
}
}
} }
pub async fn get_db_pool() -> SqlitePool { pub async fn get_db_pool() -> SqlitePool {

View file

@ -5,7 +5,7 @@ use reqwest::{Client, Response, StatusCode};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use server::ServerState; use server::ServerState;
use sqlx::{ use sqlx::{
FromRow, Row, SqlitePool, FromRow, SqlitePool,
types::chrono::{DateTime, Utc}, types::chrono::{DateTime, Utc},
}; };
use tokio::{sync::mpsc::UnboundedSender, task::JoinSet}; use tokio::{sync::mpsc::UnboundedSender, task::JoinSet};
@ -13,7 +13,6 @@ use tokio_util::{bytes::Buf, sync::CancellationToken};
use unicode_segmentation::UnicodeSegmentation; use unicode_segmentation::UnicodeSegmentation;
mod db; mod db;
use db::{DbAction, DbValue};
pub mod server; pub mod server;
@ -25,11 +24,12 @@ const LAST_FETCHED: DateTime<Utc> = DateTime::from_timestamp_nanos(0);
const STALE_FETCH_THRESHOLD: Duration = Duration::from_hours(24); const STALE_FETCH_THRESHOLD: Duration = Duration::from_hours(24);
const ADD_FEED_QUERY: &str = ""; const ADD_FEED_QUERY: &str = "";
const ACTIVE_FEEDS_QUERY: &str = "select id, url, created_at from feeds where active = true"; const ACTIVE_FEEDS_QUERY: &str = r#"SELECT id, url, owner, created, fetched, posted, run, feed FROM feeds
const STALE_FEEDS_QUERY: &str = INNER JOIN
"select id, url, added_by, created_at from feeds where active = true"; (SELECT feed, MAX(id) _, run, fetched, posted FROM runs WHERE feed IN
const FETCH_RUN_QUERY: &str = (SELECT feed FROM (SELECT feed, MAX(id), active FROM status GROUP BY feed) WHERE active = TRUE)
"select date_time from successful_runs where feed = ? order by id desc limit 1"; GROUP BY feed) r
ON feeds.id = r.feed"#;
pub struct BlogdorTheAggregator { pub struct BlogdorTheAggregator {
db: SqlitePool, db: SqlitePool,
@ -48,6 +48,7 @@ pub struct FeedEntry {
post_url: String, post_url: String,
feed_url: String, feed_url: String,
feed_id: i64, feed_id: i64,
owner: i64,
title: String, title: String,
published: DateTime<Utc>, published: DateTime<Utc>,
received: DateTime<Utc>, received: DateTime<Utc>,
@ -62,10 +63,17 @@ pub struct FeedResult {
pub feed_id: i64, pub feed_id: i64,
} }
#[derive(Debug, Clone, PartialEq, Eq)] #[derive(Debug, Clone)]
pub struct NewFeed { pub struct NewFeed {
feed: String, url: String,
user: String, owner: String,
result_sender: UnboundedSender<Result<(), String>>,
}
impl PartialEq for NewFeed {
fn eq(&self, other: &Self) -> bool {
self.url == other.url && self.owner == other.owner
}
} }
#[derive(Debug, Default, Clone, PartialEq, Eq, serde::Serialize)] #[derive(Debug, Default, Clone, PartialEq, Eq, serde::Serialize)]
@ -91,13 +99,16 @@ pub struct ActiveFeed {
url: String, url: String,
id: i64, id: i64,
owner: i64, owner: i64,
added: DateTime<Utc>, created: DateTime<Utc>,
updated: DateTime<Utc>,
#[sqlx(flatten)] #[sqlx(flatten)]
last_run: FeedRun, last_run: FeedRun,
} }
#[derive(Debug, Default, Clone, PartialEq, Eq, Serialize, Deserialize, FromRow)] #[derive(Debug, Default, Clone, PartialEq, Eq, Serialize, Deserialize, FromRow)]
pub struct FeedRun { pub struct FeedRun {
feed: i64,
run: DateTime<Utc>,
fetched: Option<DateTime<Utc>>, fetched: Option<DateTime<Utc>>,
posted: Option<DateTime<Utc>>, posted: Option<DateTime<Utc>>,
} }
@ -140,47 +151,32 @@ impl BlogdorTheAggregator {
self.cancel.cancelled().await self.cancel.cancelled().await
} }
pub async fn spawn_http(&self, announce_tx: UnboundedSender<NewFeed>, client: reqwest::Client) { pub async fn spawn_http(&self, announce_tx: UnboundedSender<NewFeed>) {
let state = ServerState::new( let state = ServerState::new(
self.db.clone(), self.db.clone(),
&self.zulip_to_blogdor_email, &self.zulip_to_blogdor_email,
&self.blogdor_token, &self.blogdor_token,
announce_tx, announce_tx,
client, self.client.clone(),
); );
server::spawn_server(state, self.cancel.clone()).await; server::spawn_server(state, self.cancel.clone()).await;
} }
pub async fn check_feeds(&self) -> Result<Vec<FeedResult>, String> { pub async fn check_feeds(&self) -> Result<Vec<FeedResult>, String> {
tracing::debug!("checking feeds"); tracing::debug!("checking feeds");
let feed_query = sqlx::query(ACTIVE_FEEDS_QUERY);
let feeds = self let feeds = self
.db_action(DbAction::FetchMany(feed_query)) .active_feeds()
.await .await
.map_err(|e| { .map_err(|_| "could not check feeds".to_string())?;
tracing::error!("got error getting feeds from DB: {e}");
"couldn't get active feeds".to_string()
})?;
let DbValue::Many(feeds) = feeds else {
unreachable!()
};
let mut handles = JoinSet::new(); let mut handles = JoinSet::new();
for feed in feeds { for feed in feeds {
let id = feed.get("id"); let id = feed.id;
let url = feed.get("url"); let url = feed.url;
let created_at: DateTime<Utc> = feed.get("created_at"); let created_at = feed.created;
let last = if let Ok(v) = self let last_run = feed.last_run;
.db_action(DbAction::FetchOne(sqlx::query(FETCH_RUN_QUERY))) let last = last_run.posted.unwrap_or(created_at);
.await handles.spawn(check_feed(self.client.clone(), id, url, last, feed.owner));
{
let DbValue::One(r) = v else { unreachable!() };
r.get("date_time")
} else {
created_at
};
handles.spawn(check_feed(self.client.clone(), id, url, last));
} }
let mut feed_results = Vec::new(); let mut feed_results = Vec::new();
@ -195,7 +191,7 @@ impl BlogdorTheAggregator {
tracing::error!("got error fetching feed: {e}"); tracing::error!("got error fetching feed: {e}");
continue; continue;
}; };
//self.db_action(DbAction::Execute(sqlx::query("insert into succ")));
feed_results.push(feed_result); feed_results.push(feed_result);
} }
@ -203,12 +199,9 @@ impl BlogdorTheAggregator {
} }
pub async fn check_stale(&self) { pub async fn check_stale(&self) {
let feeds = match sqlx::query!("select id, url, added_by, created_at from feeds") let feeds = match self.active_feeds().await {
.fetch_all(&self.db) Err(_) => {
.await tracing::error!("could not check stale feeds");
{
Err(e) => {
tracing::error!("could not fetch feeds: {e}");
return; return;
} }
Ok(f) => f, Ok(f) => f,
@ -217,27 +210,11 @@ impl BlogdorTheAggregator {
let now = Utc::now(); let now = Utc::now();
for feed in feeds.into_iter() { for feed in feeds.into_iter() {
let id = feed.id;
let url = &feed.url; let url = &feed.url;
let user = feed.added_by; let user = feed.owner;
let fetched = match sqlx::query!( let run = feed.last_run;
"select fetched from fetches where feed = ? order by id desc limit 1", let last = run.fetched.unwrap_or(feed.created);
id let dur = now - last;
)
.fetch_optional(&self.db)
.await
{
Err(e) => {
tracing::error!("could not get last fetched for {url} from db: {e}");
continue;
}
Ok(f) => f,
};
let dur = if let Some(fetched) = fetched {
now - fetched.fetched.and_utc()
} else {
now - feed.created_at.and_utc()
};
if dur.num_seconds() > STALE_FETCH_THRESHOLD.as_secs() as i64 { if dur.num_seconds() > STALE_FETCH_THRESHOLD.as_secs() as i64 {
let hours = dur.num_hours() % 24; let hours = dur.num_hours() % 24;
@ -260,8 +237,11 @@ impl BlogdorTheAggregator {
} }
} }
pub async fn announce_feed(&self, announce: &NewFeed) { pub async fn add_feed(&self, announce: &NewFeed) {
let content = format!("{} added a new feed: {}", announce.user, announce.feed); let content = format!(
"@**|{}**: added a new feed: {}",
announce.owner, announce.url
);
let msg = ZulipMessage { let msg = ZulipMessage {
to: self.channel_id, to: self.channel_id,
typ: MessageType::Stream, typ: MessageType::Stream,
@ -283,17 +263,13 @@ impl BlogdorTheAggregator {
// will also update the successful_runs table if it posts to zulip // will also update the successful_runs table if it posts to zulip
pub async fn post_entries(&self, posts: &[FeedEntry]) { pub async fn post_entries(&self, posts: &[FeedEntry]) {
let FeedEntry { let FeedEntry {
feed_id, received, .. feed_id,
received,
owner,
..
} = posts.last().unwrap(); } = posts.last().unwrap();
let mut success = true; let mut success = true;
let Ok(user) = sqlx::query!("select added_by from feeds where id = ?", feed_id)
.fetch_one(&self.db)
.await
else {
tracing::error!("could not get user from db");
return;
};
let user = user.added_by;
for post in posts.iter() { for post in posts.iter() {
let body = post.body.as_deref().unwrap_or(""); let body = post.body.as_deref().unwrap_or("");
@ -306,7 +282,7 @@ impl BlogdorTheAggregator {
let url = post.post_url.as_str(); let url = post.post_url.as_str();
let title = post.title.as_str(); let title = post.title.as_str();
let header = format!("New post in a feed added by @**|{user}**: {title}"); let header = format!("New post in a feed added by @**|{owner}**: {title}");
let content = format!( let content = format!(
"{header}\n---\n{body}{tail}\n\n---\noriginally posted to {url}, on {}", "{header}\n---\n{body}{tail}\n\n---\noriginally posted to {url}, on {}",
@ -336,19 +312,61 @@ impl BlogdorTheAggregator {
} }
tokio::time::sleep(ZULIP_INTERVAL).await; tokio::time::sleep(ZULIP_INTERVAL).await;
} }
if success let now = Utc::now();
&& let Err(e) = sqlx::query!( let posted = if success { Some(now) } else { None };
"insert into successful_runs (feed, date_time) values (?, ?)", self.record_run(*feed_id, *received, posted).await;
feed_id, }
received
async fn record_run(&self, feed: i64, fetched: DateTime<Utc>, posted: Option<DateTime<Utc>>) {
let Ok(db_posted) = sqlx::query!(
"select posted from runs where feed = ? order by id desc limit 1",
feed
)
.fetch_optional(&self.db)
.await
else {
tracing::error!("got db error fetching runs");
return;
};
let db_posted = db_posted.and_then(|p| p.posted.map(|p| p.and_utc()));
let posted = posted.or(db_posted);
if let Err(e) = sqlx::query!(
"insert into runs (fetched, posted) values (?, ?)",
fetched,
posted
) )
.execute(&self.db) .execute(&self.db)
.await .await
{ {
tracing::error!("could not insert run for {feed_id}, got {e}"); tracing::error!("got error adding row to runs: {e}");
} }
} }
async fn add_user(&self, user: u32) -> Result<(), String> {
if let Err(e) = sqlx::query!("insert into users (zulip_id) values (?)", user)
.execute(&self.db)
.await
{
match e {
sqlx::Error::Database(database_error) => {
// the users table has only one constraint, which is a uniqueness one on
// zulip_id, so if it's violated, we don't care, it just means we already have
// that user; if it's not a constraint violation, then something
// else and bad has happened
if database_error.constraint().is_some() {
return Ok(());
}
}
sqlx::Error::Io(error) => {
tracing::error!("got IO error: {error}");
return Err("you should maybe retry that".to_string());
}
_ => return Err("yikes".to_string()),
}
}
Ok(())
}
async fn send_zulip_message<'s>(&'s self, msg: &ZulipMessage<'s>) -> Result<Response, String> { async fn send_zulip_message<'s>(&'s self, msg: &ZulipMessage<'s>) -> Result<Response, String> {
let msg = serde_urlencoded::to_string(msg).expect("serialize msg"); let msg = serde_urlencoded::to_string(msg).expect("serialize msg");
self.client self.client
@ -360,6 +378,17 @@ impl BlogdorTheAggregator {
.await .await
.map_err(|e| format!("{e}")) .map_err(|e| format!("{e}"))
} }
async fn active_feeds(&self) -> Result<Vec<ActiveFeed>, ()> {
let feeds: Vec<ActiveFeed> = sqlx::query_as(ACTIVE_FEEDS_QUERY)
.fetch_all(&self.db)
.await
.map_err(|e| {
tracing::error!("error fetching feeds: {e}");
})?;
Ok(feeds)
}
} }
trait Posted { trait Posted {
@ -379,13 +408,12 @@ async fn check_feed(
feed_id: i64, feed_id: i64,
url: String, url: String,
last_fetched: DateTime<Utc>, last_fetched: DateTime<Utc>,
owner: i64,
) -> Result<FeedResult, String> { ) -> Result<FeedResult, String> {
tracing::debug!("checking {url}"); tracing::debug!("checking {url}");
let now = Utc::now(); let now = Utc::now();
let mut feed = fetch_and_parse_feed(&url, &client).await?; let mut feed = fetch_and_parse_feed(&url, &client).await?;
let mut entries = None; let mut entries = None;
feed.entries.sort_by_key(|e| std::cmp::Reverse(e.posted())); feed.entries.sort_by_key(|e| std::cmp::Reverse(e.posted()));
for post in feed.entries.into_iter().take(5) { for post in feed.entries.into_iter().take(5) {
@ -398,6 +426,7 @@ async fn check_feed(
.map(|l| l.href) .map(|l| l.href)
.unwrap_or("<url not found>".to_string()), .unwrap_or("<url not found>".to_string()),
feed_id, feed_id,
owner,
feed_url: url.clone(), feed_url: url.clone(),
title: post title: post
.title .title

View file

@ -12,7 +12,7 @@ async fn main() {
let bta = BlogdorTheAggregator::new().await; let bta = BlogdorTheAggregator::new().await;
let (tx, rx) = unbounded_channel(); let (tx, rx) = unbounded_channel();
bta.spawn_http(tx, bta.client()).await; bta.spawn_http(tx).await;
run_loop(&bta, rx).await; run_loop(&bta, rx).await;

View file

@ -116,8 +116,8 @@ async fn handle_manage_feed(
match add_feed(&state.db, sender_id, command.feed, &state.client).await { match add_feed(&state.db, sender_id, command.feed, &state.client).await {
Ok(_) => { Ok(_) => {
let _ = state.announce_tx.send(NewFeed { let _ = state.announce_tx.send(NewFeed {
feed: command.feed.to_string(), url: command.feed.to_string(),
user: sender_full_name, owner: sender_full_name,
}); });
resp.insert("content", "Blogdor Says: SUCCESS!".to_string()); resp.insert("content", "Blogdor Says: SUCCESS!".to_string());
} }
@ -214,31 +214,6 @@ async fn add_feed(
Ok(()) Ok(())
} }
async fn add_user(db: &SqlitePool, user: u32) -> Result<(), String> {
if let Err(e) = sqlx::query!("insert into users (zulip_id) values (?)", user)
.execute(db)
.await
{
match e {
sqlx::Error::Database(database_error) => {
// the users table has only one constraint, which is a uniqueness one on
// zulip_id, so if it's violated, we don't care, it just means we already have
// that user; if it's not a constraint violation, then something
// else and bad has happened
if database_error.constraint().is_some() {
return Ok(());
}
}
sqlx::Error::Io(error) => {
tracing::error!("got IO error: {error}");
return Err("you should maybe retry that".to_string());
}
_ => return Err("yikes".to_string()),
}
}
Ok(())
}
async fn graceful_shutdown(cancel: CancellationToken) { async fn graceful_shutdown(cancel: CancellationToken) {
use tokio::signal; use tokio::signal;
let ctrl_c = async { let ctrl_c = async {