blogdor/src/lib.rs
2025-12-25 10:35:30 -08:00

402 lines
13 KiB
Rust

use std::time::Duration;
use feed_rs::parser::parse;
use reqwest::{Response, StatusCode};
use server::ServerState;
use sqlx::{
SqlitePool,
sqlite::{SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions},
types::chrono::{DateTime, Utc},
};
use tokio::{sync::mpsc::UnboundedSender, task::JoinSet};
use tokio_util::{bytes::Buf, sync::CancellationToken};
use unicode_segmentation::UnicodeSegmentation;
pub mod server;
const MAX_CONNS: u32 = 200;
const MIN_CONNS: u32 = 5;
const TIMEOUT: u64 = 2000; // in milliseconds
const ZULIP_INTERVAL: Duration = Duration::from_millis(250);
const ZULIP_MESSAGE_CUTOFF: usize = 700;
const LAST_FETCHED: DateTime<Utc> = DateTime::from_timestamp_nanos(0);
pub struct BlogdorTheAggregator {
db: SqlitePool,
client: reqwest::Client,
cancel: CancellationToken,
endpoint: String,
channel_id: u32,
blogdor_to_zulip_email: String,
zulip_to_blogdor_email: String,
zulip_token: String, // sent *to zulip* in POSTs *from us*
blogdor_token: String, // sent *from zulip* in POSTs *to us*
}
#[derive(Debug, Default, Clone, PartialEq, Eq)]
pub struct FeedEntry {
post_url: String,
feed_url: String,
feed_id: i64,
title: String,
published: DateTime<Utc>,
received: DateTime<Utc>,
feed_description: Option<String>,
body: Option<String>,
}
#[derive(Debug, Default, Clone, PartialEq, Eq)]
pub struct FeedResult {
pub entries: Option<Vec<FeedEntry>>,
pub url: String,
pub feed_id: i64,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct NewFeed {
feed: String,
user: String,
}
#[derive(Debug, Default, Clone, PartialEq, Eq, serde::Serialize)]
struct ZulipMessage<'s> {
to: u32,
#[serde(rename = "type")]
typ: &'s str,
content: String,
#[serde(skip_serializing_if = "Option::is_none")]
topic: Option<&'s str>,
}
impl BlogdorTheAggregator {
pub async fn new() -> Self {
let db = get_db_pool().await;
let client = reqwest::Client::new(); // TODO: retries?
let cancel = CancellationToken::new();
let endpoint = std::env::var("ZULIP_URL").expect("ZULIP_URL must be set");
let channel_id: u32 = std::env::var("ZULIP_CHANNEL")
.expect("ZULIP_CHANNEL must be set")
.parse()
.expect("ZULIP_CHANNEL must be an integer");
let password = std::env::var("ZULIP_TOKEN").expect("ZULIP_TOKEN must be set");
let b2z_email =
std::env::var("BLOGDOR_TO_ZULIP_EMAIL").expect("BLOGDOR_TO_ZULIP_EMAIL must be set");
let z2b_email =
std::env::var("ZULIP_TO_BLOGDOR_EMAIL").expect("ZULIP_TO_BLOGDOR_EMAIL must be set");
let token = std::env::var("BLOGDOR_TOKEN").expect("BLOGDOR_TOKEN must be set");
Self {
db,
client,
cancel,
endpoint,
channel_id,
blogdor_to_zulip_email: b2z_email,
zulip_to_blogdor_email: z2b_email,
zulip_token: password,
blogdor_token: token,
}
}
pub async fn cancelled(&self) {
self.cancel.cancelled().await
}
pub async fn spawn_http(&self, announce_tx: UnboundedSender<NewFeed>) {
let state = ServerState::new(
self.db.clone(),
&self.zulip_to_blogdor_email,
&self.blogdor_token,
announce_tx,
);
server::spawn_server(state, self.cancel.clone()).await;
}
pub async fn check_feeds(&self) -> Result<Vec<Result<FeedResult, String>>, String> {
tracing::debug!("checking feeds");
let feeds = sqlx::query!("select id, url from feeds where active = true")
.fetch_all(&self.db)
.await
.map_err(|e| format!("{e}"))?;
let mut handles = JoinSet::new();
for feed in feeds {
handles.spawn(check_feed(
self.db.clone(),
feed.id,
self.client.clone(),
feed.url,
));
}
let mut feed_results = Vec::new();
while let Some(feed_result) = handles.join_next().await {
let Ok(feed_result) = feed_result else {
let e = feed_result.unwrap_err();
tracing::error!("got join error: {e}");
continue;
};
feed_results.push(feed_result);
}
Ok(feed_results)
}
pub async fn announce_feed(&self, announce: &NewFeed) {
let content = format!("{} added a new feed: {}", announce.user, announce.feed);
let msg = ZulipMessage {
to: self.channel_id,
typ: "stream",
content,
topic: Some("New feeds"),
};
match self.send_zulip_message(&msg).await {
Err(e) => {
tracing::error!("got error sending to zulip: {e}");
}
Ok(r) => {
if r.status() != StatusCode::OK {
tracing::warn!("did not successfully post to zulip: status {}", r.status());
}
}
}
}
// will also update the successful_runs table if it posts to zulip
pub async fn post_entries(&self, posts: &[FeedEntry]) {
let FeedEntry {
feed_id, received, ..
} = posts.last().unwrap();
let mut success = true;
let Ok(user) = sqlx::query!("select added_by from feeds where id = ?", feed_id)
.fetch_one(&self.db)
.await
else {
tracing::error!("could not get user from db");
return;
};
let user = user.added_by;
for post in posts.iter() {
let body = post.body.as_deref().unwrap_or("Blogdor Says: NO BODY!");
let tail = if body.len() < ZULIP_MESSAGE_CUTOFF {
""
} else {
"..."
};
let url = post.post_url.as_str();
let title = post.title.as_str();
let header = format!("New post in a feed added by @**|{user}**: {title}");
let content = format!(
"{header}\n---\n{body}{tail}\n\n---\noriginally posted to {url}, on {}",
post.published.format("%B %e, %Y"),
);
let msg = ZulipMessage {
to: self.channel_id,
typ: "stream",
content,
topic: Some(title),
};
match self.send_zulip_message(&msg).await {
Err(e) => {
tracing::error!("got error sending to zulip: {e}");
success = false;
}
Ok(r) => {
if r.status() == StatusCode::OK {
success &= true;
} else {
tracing::warn!("did not successfully post to zulip: status {}", r.status());
success = false;
}
}
}
tokio::time::sleep(ZULIP_INTERVAL).await;
}
if success
&& let Err(e) = sqlx::query!(
"insert into successful_runs (feed, date_time) values (?, ?)",
feed_id,
received
)
.execute(&self.db)
.await
{
tracing::error!("could not insert run for {feed_id}, got {e}");
}
}
pub async fn close_db(&self) {
self.db.close().await;
}
async fn send_zulip_message<'s>(&'s self, msg: &ZulipMessage<'s>) -> Result<Response, String> {
let msg = serde_urlencoded::to_string(msg).expect("serialize msg");
self.client
.post(&self.endpoint)
.basic_auth(&self.blogdor_to_zulip_email, Some(&self.zulip_token))
.body(msg)
.header("Content-Type", "application/x-www-form-urlencoded")
.send()
.await
.map_err(|e| format!("{e}"))
}
}
trait Posted {
fn posted(&self) -> Option<DateTime<Utc>>;
}
impl Posted for feed_rs::model::Entry {
fn posted(&self) -> Option<DateTime<Utc>> {
self.published.or(self.updated)
}
}
// takes args by value because it's meant to be called from inside a spawned
// tokio task scope
async fn check_feed(
db: SqlitePool,
feed_id: i64,
client: reqwest::Client,
url: String,
) -> Result<FeedResult, String> {
let rec = sqlx::query!(
"select date_time from successful_runs where feed = ? order by id desc limit 1",
feed_id
)
.fetch_optional(&db)
.await
.map_err(|e| format!("Could not fetch runs for {url} from DB, got {e}"))?;
tracing::debug!("checking {url}");
let last_fetched = rec.map(|d| d.date_time.and_utc()).unwrap_or(LAST_FETCHED);
let now = Utc::now();
let mut entries = None;
let feed = client
.get(&url)
.send()
.await
.map_err(|e| format!("could not get feed from {url}, got {e}"))?
.bytes()
.await
.map_err(|e| format!("could not get bytes from response from {url}, got {e}"))?;
let mut feed =
parse(feed.reader()).map_err(|e| format!("could not parse feed from {url}, got {e}"))?;
feed.entries.sort_by_key(|e| std::cmp::Reverse(e.posted()));
for post in feed.entries.into_iter().take(5) {
if post.posted().unwrap_or(LAST_FETCHED) > last_fetched {
let entry = FeedEntry {
post_url: post
.links
.first()
.cloned()
.map(|l| l.href)
.unwrap_or("Blogdor Says: NO POST URL".to_string()),
feed_id,
feed_url: url.clone(),
title: post
.title
.clone()
.map(|t| t.content)
.unwrap_or("Blogdor Says: NO POST TITLE".to_string()),
published: post.posted().unwrap_or(now),
received: now,
feed_description: feed.description.to_owned().map(|d| d.content),
body: post.content.and_then(|c| {
c.body.map(|f| {
let s = html2md::parse_html(&f)
.graphemes(false)
.take(ZULIP_MESSAGE_CUTOFF)
.collect::<String>();
s.to_string()
})
}),
};
entries.get_or_insert(Vec::new()).push(entry);
}
}
Ok(FeedResult {
entries,
url,
feed_id,
})
}
async fn get_db_pool() -> SqlitePool {
let db_filename = {
std::env::var("DATABASE_FILE").unwrap_or_else(|_| {
#[cfg(not(test))]
{
tracing::info!("connecting to default db file");
"blogdor.db".to_string()
}
#[cfg(test)]
{
use rand::RngCore;
let mut rng = rand::rng();
let id = rng.next_u64();
// see https://www.sqlite.org/inmemorydb.html for meaning of the string;
// it allows each separate test to have its own dedicated memory-backed db that
// will live as long as the whole process
format!("file:testdb-{id}?mode=memory&cache=shared")
}
})
};
tracing::info!("Connecting to DB at {db_filename}");
let conn_opts = SqliteConnectOptions::new()
.foreign_keys(true)
.journal_mode(SqliteJournalMode::Wal)
.synchronous(sqlx::sqlite::SqliteSynchronous::Normal)
.filename(&db_filename)
.busy_timeout(Duration::from_secs(TIMEOUT))
.pragma("temp_store", "memory")
.create_if_missing(true)
.optimize_on_close(true, None)
.pragma("mmap_size", "3000000000");
let pool = SqlitePoolOptions::new()
.max_connections(MAX_CONNS)
.min_connections(MIN_CONNS)
.idle_timeout(Some(Duration::from_secs(3)))
.max_lifetime(Some(Duration::from_secs(3600)))
.connect_with(conn_opts)
.await
.expect("could not get sqlite pool");
sqlx::migrate!()
.run(&pool)
.await
.expect("could not run migrations");
tracing::info!("Ran migrations");
pool
}
//-************************************************************************
// Tests for `db` module.
//-************************************************************************
#[cfg(test)]
mod tests {
#[tokio::test]
async fn it_migrates_the_db() {
let db = super::get_db_pool().await;
let r = sqlx::query!("select count(*) as count from feeds")
.fetch_one(&db)
.await;
assert!(r.is_ok());
}
}