From d3aa7964128eb94b2376ea6f9f91e5d68040d8e3 Mon Sep 17 00:00:00 2001 From: joe Date: Sat, 13 Dec 2025 11:15:09 -0800 Subject: [PATCH] ready to get fetchin' --- src/lib.rs | 96 +++++++++++++++++++++++++++++++----------------------- 1 file changed, 55 insertions(+), 41 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index f019e45..17841df 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,6 +1,6 @@ use std::time::Duration; -use feed_rs::{model::Content, parser::parse}; +use feed_rs::parser::parse; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; pub mod server; @@ -10,7 +10,7 @@ use sqlx::{ sqlite::{SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions}, types::chrono::{DateTime, Utc}, }; -use tokio::task::JoinHandle; +use tokio::task::{JoinHandle, JoinSet}; use tokio_util::{bytes::Buf, sync::CancellationToken}; const MAX_CONNS: u32 = 200; @@ -96,17 +96,27 @@ async fn check_feeds(db: &SqlitePool, client: &reqwest::Client) { } }; - // a channel to receive feed entries over, from the feed-reading tasks - let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); - + let mut handles = JoinSet::new(); for feed in feeds { - let url = feed.url; - let client = client.clone(); - let db = db.clone(); - let tx = tx.clone(); - tokio::spawn(async move { - check_feed(db, feed.id, client, url, tx).await; - }); + handles.spawn(check_feed(db.clone(), feed.id, client.clone(), feed.url)); + } + while let Some(posts) = handles.join_next().await { + let Ok(posts) = posts else { + let e = posts.unwrap_err(); + tracing::error!("got join error: {e}"); + continue; + }; + match posts { + Err(s) => { + tracing::warn!("could not fetch feed: {s}") + } + Ok(posts) => { + // send to zulip + for post in posts { + tracing::debug!("{post:?}"); + } + } + } } } @@ -115,7 +125,6 @@ async fn check_feed( feed_id: i64, client: reqwest::Client, url: String, - tx: tokio::sync::mpsc::UnboundedSender, ) -> Result, String> { let rec = sqlx::query!( "select date_time from runs where succeeded = true and feed = ? order by id desc limit 1", @@ -128,36 +137,41 @@ async fn check_feed( let last_fetched = rec.map(|d| d.date_time.and_utc()).unwrap_or(LAST_FETCHED); let now = Utc::now(); let mut out = Vec::new(); - let feed = client.get(&url).send().await; - if let Ok(feed) = feed - && let Ok(feed) = feed.bytes().await - && let Ok(feed) = parse(feed.reader()) - { - for post in feed.entries { - let last_year = now - ONE_YEAR; - if post.published.unwrap_or(last_year) > last_fetched { - let entry = FeedEntry { - url: post - .links - .first() - .cloned() - .map(|l| l.href) - .unwrap_or("".to_string()), - feed_id, - title: post - .title - .map(|t| t.content) - .unwrap_or("Blogdor Says: NO POST TITLE".to_string()), - published: post.published.unwrap_or(now), - received: now, - feed_description: feed.description.to_owned().map(|d| d.content), - body: post.content.and_then(|c| c.body), - }; - out.push(entry); - } + let feed = client + .get(&url) + .send() + .await + .map_err(|e| format!("could not get feed from {url}, got {e}"))? + .bytes() + .await + .map_err(|e| format!("could not get bytes from response from {url}, got {e}"))?; + let feed = + parse(feed.reader()).map_err(|e| format!("could not parse feed from {url}, got {e}"))?; + for post in feed.entries { + let last_year = now - ONE_YEAR; + if post.published.unwrap_or(last_year) > last_fetched { + let entry = FeedEntry { + url: post + .links + .first() + .cloned() + .map(|l| l.href) + .unwrap_or("".to_string()), + feed_id, + title: post + .title + .map(|t| t.content) + .unwrap_or("Blogdor Says: NO POST TITLE".to_string()), + published: post.published.unwrap_or(now), + received: now, + feed_description: feed.description.to_owned().map(|d| d.content), + body: post.content.and_then(|c| c.body), + }; + out.push(entry); } } - todo!() + + Ok(out) } async fn get_db_pool() -> SqlitePool {