use std::time::Duration; use feed_rs::parser::parse; use reqwest::{Response, StatusCode}; use server::ServerState; use sqlx::{ SqlitePool, sqlite::{SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions}, types::chrono::{DateTime, Utc}, }; use tokio::{sync::mpsc::UnboundedSender, task::JoinSet}; use tokio_util::{bytes::Buf, sync::CancellationToken}; use unicode_segmentation::UnicodeSegmentation; pub mod server; const MAX_CONNS: u32 = 200; const MIN_CONNS: u32 = 5; const TIMEOUT: u64 = 2000; // in milliseconds const ZULIP_INTERVAL: Duration = Duration::from_millis(250); const ZULIP_MESSAGE_CUTOFF: usize = 700; const LAST_FETCHED: DateTime = DateTime::from_timestamp_nanos(0); const STALE_FETCH_THRESHOLD: Duration = Duration::from_hours(24); pub struct BlogdorTheAggregator { db: SqlitePool, client: reqwest::Client, cancel: CancellationToken, endpoint: String, channel_id: u32, blogdor_to_zulip_email: String, zulip_to_blogdor_email: String, zulip_token: String, // sent *to zulip* in POSTs *from us* blogdor_token: String, // sent *from zulip* in POSTs *to us* } #[derive(Debug, Default, Clone, PartialEq, Eq)] pub struct FeedEntry { post_url: String, feed_url: String, feed_id: i64, title: String, published: DateTime, received: DateTime, feed_description: Option, body: Option, } #[derive(Debug, Default, Clone, PartialEq, Eq)] pub struct FeedResult { pub entries: Option>, pub url: String, pub feed_id: i64, } #[derive(Debug, Clone, PartialEq, Eq)] pub struct NewFeed { feed: String, user: String, } #[derive(Debug, Default, Clone, PartialEq, Eq, serde::Serialize)] struct ZulipMessage<'s> { to: u32, #[serde(rename = "type")] typ: MessageType, content: String, #[serde(skip_serializing_if = "Option::is_none")] topic: Option<&'s str>, } #[derive(Debug, Default, Clone, Copy, PartialEq, Eq, serde::Serialize)] #[serde(rename_all = "lowercase")] enum MessageType { #[default] Stream, Direct, } impl BlogdorTheAggregator { pub async fn new() -> Self { let db = get_db_pool().await; let client = reqwest::Client::new(); // TODO: retries? let cancel = CancellationToken::new(); let endpoint = std::env::var("ZULIP_URL").expect("ZULIP_URL must be set"); let channel_id: u32 = std::env::var("ZULIP_CHANNEL") .expect("ZULIP_CHANNEL must be set") .parse() .expect("ZULIP_CHANNEL must be an integer"); let password = std::env::var("ZULIP_TOKEN").expect("ZULIP_TOKEN must be set"); let b2z_email = std::env::var("BLOGDOR_TO_ZULIP_EMAIL").expect("BLOGDOR_TO_ZULIP_EMAIL must be set"); let z2b_email = std::env::var("ZULIP_TO_BLOGDOR_EMAIL").expect("ZULIP_TO_BLOGDOR_EMAIL must be set"); let token = std::env::var("BLOGDOR_TOKEN").expect("BLOGDOR_TOKEN must be set"); Self { db, client, cancel, endpoint, channel_id, blogdor_to_zulip_email: b2z_email, zulip_to_blogdor_email: z2b_email, zulip_token: password, blogdor_token: token, } } pub async fn cancelled(&self) { self.cancel.cancelled().await } pub async fn spawn_http(&self, announce_tx: UnboundedSender) { let state = ServerState::new( self.db.clone(), &self.zulip_to_blogdor_email, &self.blogdor_token, announce_tx, ); server::spawn_server(state, self.cancel.clone()).await; } pub async fn check_feeds(&self) -> Result>, String> { tracing::debug!("checking feeds"); let feeds = sqlx::query!("select id, url from feeds where active = true") .fetch_all(&self.db) .await .map_err(|e| format!("{e}"))?; let mut handles = JoinSet::new(); for feed in feeds { handles.spawn(check_feed( self.db.clone(), feed.id, self.client.clone(), feed.url, )); } let mut feed_results = Vec::new(); while let Some(feed_result) = handles.join_next().await { let Ok(feed_result) = feed_result else { let e = feed_result.unwrap_err(); tracing::error!("got join error: {e}"); continue; }; feed_results.push(feed_result); } Ok(feed_results) } pub async fn check_stale(&self) { let feeds = match sqlx::query!("select id, url, added_by, created_at from feeds") .fetch_all(&self.db) .await { Err(e) => { tracing::error!("could not fetch feeds: {e}"); return; } Ok(f) => f, }; let now = Utc::now(); for feed in feeds.into_iter() { let id = feed.id; let url = &feed.url; let user = feed.added_by; let fetched = match sqlx::query!( "select fetched from fetches where feed = ? order by id desc limit 1", id ) .fetch_optional(&self.db) .await { Err(e) => { tracing::error!("could not get last fetched for {url} from db: {e}"); continue; } Ok(f) => f, }; let dur = if let Some(fetched) = fetched { now - fetched.fetched.and_utc() } else { now - feed.created_at.and_utc() }; if dur.num_seconds() > STALE_FETCH_THRESHOLD.as_secs() as i64 { let hours = dur.num_hours() % 24; let days = dur.num_days(); let content = format!( "It has been {days} days and {hours} hours since Blogdor was able to check your feed at {url}. If you'd like to remove it, you can DM @blogdor's manager with 'remove {url}'." ); let msg = ZulipMessage { to: user as u32, typ: MessageType::Direct, content, topic: None, }; if let Err(e) = self.send_zulip_message(&msg).await { tracing::error!("error sending zulip message to user {user}: {e}"); } } } } pub async fn announce_feed(&self, announce: &NewFeed) { let content = format!("{} added a new feed: {}", announce.user, announce.feed); let msg = ZulipMessage { to: self.channel_id, typ: MessageType::Stream, content, topic: Some("New feeds"), }; match self.send_zulip_message(&msg).await { Err(e) => { tracing::error!("got error sending to zulip: {e}"); } Ok(r) => { if r.status() != StatusCode::OK { tracing::warn!("did not successfully post to zulip: status {}", r.status()); } } } } // will also update the successful_runs table if it posts to zulip pub async fn post_entries(&self, posts: &[FeedEntry]) { let FeedEntry { feed_id, received, .. } = posts.last().unwrap(); let mut success = true; let Ok(user) = sqlx::query!("select added_by from feeds where id = ?", feed_id) .fetch_one(&self.db) .await else { tracing::error!("could not get user from db"); return; }; let user = user.added_by; for post in posts.iter() { let body = post.body.as_deref().unwrap_or(""); let tail = if body.len() < ZULIP_MESSAGE_CUTOFF { "" } else { "..." }; let url = post.post_url.as_str(); let title = post.title.as_str(); let header = format!("New post in a feed added by @**|{user}**: {title}"); let content = format!( "{header}\n---\n{body}{tail}\n\n---\noriginally posted to {url}, on {}", post.published.format("%B %e, %Y"), ); let msg = ZulipMessage { to: self.channel_id, typ: MessageType::Stream, content, topic: Some(title), }; match self.send_zulip_message(&msg).await { Err(e) => { tracing::error!("got error sending to zulip: {e}"); success = false; } Ok(r) => { if r.status() == StatusCode::OK { success &= true; } else { tracing::warn!("did not successfully post to zulip: status {}", r.status()); success = false; } } } tokio::time::sleep(ZULIP_INTERVAL).await; } if success && let Err(e) = sqlx::query!( "insert into successful_runs (feed, date_time) values (?, ?)", feed_id, received ) .execute(&self.db) .await { tracing::error!("could not insert run for {feed_id}, got {e}"); } } pub async fn close_db(&self) { self.db.close().await; } async fn send_zulip_message<'s>(&'s self, msg: &ZulipMessage<'s>) -> Result { let msg = serde_urlencoded::to_string(msg).expect("serialize msg"); self.client .post(&self.endpoint) .basic_auth(&self.blogdor_to_zulip_email, Some(&self.zulip_token)) .body(msg) .header("Content-Type", "application/x-www-form-urlencoded") .send() .await .map_err(|e| format!("{e}")) } } trait Posted { fn posted(&self) -> Option>; } impl Posted for feed_rs::model::Entry { fn posted(&self) -> Option> { self.published.or(self.updated) } } // takes args by value because it's meant to be called from inside a spawned // tokio task scope async fn check_feed( db: SqlitePool, feed_id: i64, client: reqwest::Client, url: String, ) -> Result { let rec = sqlx::query!( "select date_time from successful_runs where feed = ? order by id desc limit 1", feed_id ) .fetch_optional(&db) .await .map_err(|e| format!("Could not fetch runs for {url} from DB, got {e}"))?; tracing::debug!("checking {url}"); let last_fetched = rec.map(|d| d.date_time.and_utc()).unwrap_or(LAST_FETCHED); let now = Utc::now(); let mut entries = None; let feed = client .get(&url) .send() .await .map_err(|e| format!("could not get feed from {url}, got {e}"))? .bytes() .await .map_err(|e| format!("could not get bytes from response from {url}, got {e}"))?; let mut feed = parse(feed.reader()).map_err(|e| format!("could not parse feed from {url}, got {e}"))?; if let Err(e) = sqlx::query!("insert into fetches (feed) values (?)", feed_id) .execute(&db) .await { tracing::error!("got error inserting {feed_id} into fetches: {e}"); } feed.entries.sort_by_key(|e| std::cmp::Reverse(e.posted())); for post in feed.entries.into_iter().take(5) { if post.posted().unwrap_or(LAST_FETCHED) > last_fetched { let entry = FeedEntry { post_url: post .links .first() .cloned() .map(|l| l.href) .unwrap_or("".to_string()), feed_id, feed_url: url.clone(), title: post .title .clone() .map(|t| t.content) .unwrap_or("".to_string()), published: post.posted().unwrap_or(now), received: now, feed_description: feed.description.to_owned().map(|d| d.content), body: post.content.and_then(|c| { c.body.map(|f| { let s = html2md::parse_html(&f) .graphemes(false) .take(ZULIP_MESSAGE_CUTOFF) .collect::(); s.to_string() }) }), }; entries.get_or_insert(Vec::new()).push(entry); } } Ok(FeedResult { entries, url, feed_id, }) } async fn get_db_pool() -> SqlitePool { let db_filename = { std::env::var("DATABASE_FILE").unwrap_or_else(|_| { #[cfg(not(test))] { tracing::info!("connecting to default db file"); "blogdor.db".to_string() } #[cfg(test)] { use rand::RngCore; let mut rng = rand::rng(); let id = rng.next_u64(); // see https://www.sqlite.org/inmemorydb.html for meaning of the string; // it allows each separate test to have its own dedicated memory-backed db that // will live as long as the whole process format!("file:testdb-{id}?mode=memory&cache=shared") } }) }; tracing::info!("Connecting to DB at {db_filename}"); let conn_opts = SqliteConnectOptions::new() .foreign_keys(true) .journal_mode(SqliteJournalMode::Wal) .synchronous(sqlx::sqlite::SqliteSynchronous::Normal) .filename(&db_filename) .busy_timeout(Duration::from_secs(TIMEOUT)) .pragma("temp_store", "memory") .create_if_missing(true) .optimize_on_close(true, None) .pragma("mmap_size", "3000000000"); let pool = SqlitePoolOptions::new() .max_connections(MAX_CONNS) .min_connections(MIN_CONNS) .idle_timeout(Some(Duration::from_secs(3))) .max_lifetime(Some(Duration::from_secs(3600))) .connect_with(conn_opts) .await .expect("could not get sqlite pool"); sqlx::migrate!() .run(&pool) .await .expect("could not run migrations"); tracing::info!("Ran migrations"); pool } //-************************************************************************ // Tests for `db` module. //-************************************************************************ #[cfg(test)] mod tests { #[tokio::test] async fn it_migrates_the_db() { let db = super::get_db_pool().await; let r = sqlx::query!("select count(*) as count from feeds") .fetch_one(&db) .await; assert!(r.is_ok()); } }