ready to get fetchin'

This commit is contained in:
joe 2025-12-13 11:15:09 -08:00
parent 72229bf073
commit d3aa796412

View file

@ -1,6 +1,6 @@
use std::time::Duration; use std::time::Duration;
use feed_rs::{model::Content, parser::parse}; use feed_rs::parser::parse;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
pub mod server; pub mod server;
@ -10,7 +10,7 @@ use sqlx::{
sqlite::{SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions}, sqlite::{SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions},
types::chrono::{DateTime, Utc}, types::chrono::{DateTime, Utc},
}; };
use tokio::task::JoinHandle; use tokio::task::{JoinHandle, JoinSet};
use tokio_util::{bytes::Buf, sync::CancellationToken}; use tokio_util::{bytes::Buf, sync::CancellationToken};
const MAX_CONNS: u32 = 200; const MAX_CONNS: u32 = 200;
@ -96,17 +96,27 @@ async fn check_feeds(db: &SqlitePool, client: &reqwest::Client) {
} }
}; };
// a channel to receive feed entries over, from the feed-reading tasks let mut handles = JoinSet::new();
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
for feed in feeds { for feed in feeds {
let url = feed.url; handles.spawn(check_feed(db.clone(), feed.id, client.clone(), feed.url));
let client = client.clone(); }
let db = db.clone(); while let Some(posts) = handles.join_next().await {
let tx = tx.clone(); let Ok(posts) = posts else {
tokio::spawn(async move { let e = posts.unwrap_err();
check_feed(db, feed.id, client, url, tx).await; tracing::error!("got join error: {e}");
}); continue;
};
match posts {
Err(s) => {
tracing::warn!("could not fetch feed: {s}")
}
Ok(posts) => {
// send to zulip
for post in posts {
tracing::debug!("{post:?}");
}
}
}
} }
} }
@ -115,7 +125,6 @@ async fn check_feed(
feed_id: i64, feed_id: i64,
client: reqwest::Client, client: reqwest::Client,
url: String, url: String,
tx: tokio::sync::mpsc::UnboundedSender<FeedEntry>,
) -> Result<Vec<FeedEntry>, String> { ) -> Result<Vec<FeedEntry>, String> {
let rec = sqlx::query!( let rec = sqlx::query!(
"select date_time from runs where succeeded = true and feed = ? order by id desc limit 1", "select date_time from runs where succeeded = true and feed = ? order by id desc limit 1",
@ -128,11 +137,16 @@ async fn check_feed(
let last_fetched = rec.map(|d| d.date_time.and_utc()).unwrap_or(LAST_FETCHED); let last_fetched = rec.map(|d| d.date_time.and_utc()).unwrap_or(LAST_FETCHED);
let now = Utc::now(); let now = Utc::now();
let mut out = Vec::new(); let mut out = Vec::new();
let feed = client.get(&url).send().await; let feed = client
if let Ok(feed) = feed .get(&url)
&& let Ok(feed) = feed.bytes().await .send()
&& let Ok(feed) = parse(feed.reader()) .await
{ .map_err(|e| format!("could not get feed from {url}, got {e}"))?
.bytes()
.await
.map_err(|e| format!("could not get bytes from response from {url}, got {e}"))?;
let feed =
parse(feed.reader()).map_err(|e| format!("could not parse feed from {url}, got {e}"))?;
for post in feed.entries { for post in feed.entries {
let last_year = now - ONE_YEAR; let last_year = now - ONE_YEAR;
if post.published.unwrap_or(last_year) > last_fetched { if post.published.unwrap_or(last_year) > last_fetched {
@ -156,8 +170,8 @@ async fn check_feed(
out.push(entry); out.push(entry);
} }
} }
}
todo!() Ok(out)
} }
async fn get_db_pool() -> SqlitePool { async fn get_db_pool() -> SqlitePool {