From 2cafaee52fa520beb90309f7f007d3e2942cc764 Mon Sep 17 00:00:00 2001 From: Joe Date: Sun, 7 Dec 2025 22:12:15 -0800 Subject: [PATCH] checkpoint --- .rustfmt.toml | 4 +++ src/lib.rs | 78 ++++++++++++++++++++++++++++++++++++++++++++------- src/main.rs | 2 +- src/server.rs | 4 +-- 4 files changed, 75 insertions(+), 13 deletions(-) create mode 100644 .rustfmt.toml diff --git a/.rustfmt.toml b/.rustfmt.toml new file mode 100644 index 0000000..8feb93e --- /dev/null +++ b/.rustfmt.toml @@ -0,0 +1,4 @@ +imports_granularity = "Crate" +group_imports = "StdExternalCrate" +wrap_comments = true +edition = "2024" diff --git a/src/lib.rs b/src/lib.rs index 95556bc..6b71b0c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,22 +1,25 @@ use std::time::Duration; + +use feed_rs::{model::Content, parser::parse}; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; pub mod server; -use reqwest::Client; use sqlx::{ - sqlite::{SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions}, - types::chrono::DateTime, - types::chrono::Utc, SqlitePool, + sqlite::{SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions}, + types::chrono::{DateTime, Utc}, }; use tokio::task::JoinHandle; -use tokio_util::sync::CancellationToken; +use tokio_util::{bytes::Buf, sync::CancellationToken}; const MAX_CONNS: u32 = 200; const MIN_CONNS: u32 = 5; const TIMEOUT: u64 = 2000; // in milliseconds +const LAST_FETCHED: DateTime = DateTime::from_timestamp_nanos(0); +const ONE_YEAR: Duration = Duration::from_secs(365 * 24 * 60 * 60); + pub struct BlogdorTheAggregator { db: SqlitePool, client: reqwest::Client, @@ -29,7 +32,7 @@ pub struct FeedEntry { title: String, published: DateTime, received: DateTime, - description: Option, + feed_description: Option, body: Option, } @@ -44,7 +47,7 @@ impl BlogdorTheAggregator { .init(); let db = get_db_pool().await; - let client = reqwest::Client::new(); + let client = reqwest::Client::new(); // TODO: retries? let cancel = CancellationToken::new(); Self { db, client, cancel } @@ -70,7 +73,7 @@ impl BlogdorTheAggregator { }) } - pub async fn listen(&self) -> JoinHandle<()> { + pub async fn listen_http(&self) -> JoinHandle<()> { server::spawn_server(self.db.clone(), self.cancel.clone()).await } @@ -79,7 +82,7 @@ impl BlogdorTheAggregator { } } -async fn check_feeds(db: &SqlitePool, _client: &Client) { +async fn check_feeds(db: &SqlitePool, client: &reqwest::Client) { tracing::debug!("checking feeds"); let feeds = match sqlx::query!("select id, url from feeds where active = true") .fetch_all(db) @@ -92,9 +95,64 @@ async fn check_feeds(db: &SqlitePool, _client: &Client) { } }; + // a channel to receive feed entries over, from the feed-reading tasks + let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); + for feed in feeds { - let id = feed.id; let url = feed.url; + let client = client.clone(); + let db = db.clone(); + let tx = tx.clone(); + tokio::spawn(async move { + check_feed(db, feed.id, client, url, tx).await; + }); + } +} + +async fn check_feed( + db: SqlitePool, + id: i64, + client: reqwest::Client, + url: String, + tx: tokio::sync::mpsc::UnboundedSender, +) { + if let Ok(rec) = sqlx::query!( + "select date_time from runs where succeeded = true and feed = ? order by id desc limit 1", + id + ) + .fetch_optional(&db) + .await + { + let last_fetched = rec.map(|d| d.date_time.and_utc()).unwrap_or(LAST_FETCHED); + let now = Utc::now(); + + let feed = client.get(&url).send().await; + if let Ok(feed) = feed + && let Ok(feed) = feed.bytes().await + && let Ok(feed) = parse(feed.reader()) + { + for post in feed.entries { + let last_year = now - ONE_YEAR; + if post.published.unwrap_or(last_year) > last_fetched { + let entry = FeedEntry { + url: url.clone(), + title: post + .title + .map(|t| t.content) + .unwrap_or("Blogdor Says: NO POST TITLE".to_string()), + published: post.published.unwrap_or(now), + received: now, + feed_description: feed.description.to_owned().map(|d| d.content), + body: post.content.and_then(|c| c.body), + }; + if let Err(e) = tx.send(entry) { + tracing::error!("error sending feed entry: {e}"); + continue; + }; + // update DB + } + } + } } } diff --git a/src/main.rs b/src/main.rs index 95ba837..deb61a2 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,7 +4,7 @@ use blogdor::BlogdorTheAggregator; async fn main() { let bta = BlogdorTheAggregator::new().await; let aggregator_handle = bta.aggregate().await; - let server_handle = bta.listen().await; + let server_handle = bta.listen_http().await; server_handle.await.unwrap_or_default(); aggregator_handle.await.unwrap_or_default(); bta.close_db().await; diff --git a/src/server.rs b/src/server.rs index a2267a1..e1f8323 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,10 +1,10 @@ use std::net::SocketAddr; -use axum::{routing::post, Router}; +use axum::{Router, routing::post}; use sqlx::SqlitePool; use tokio_util::sync::CancellationToken; -pub async fn spawn_server( +pub(crate) async fn spawn_server( pool: SqlitePool, cancel: CancellationToken, ) -> tokio::task::JoinHandle<()> {