checkpoint

This commit is contained in:
Joe 2025-12-07 22:12:15 -08:00
parent bf11a10077
commit 2cafaee52f
4 changed files with 75 additions and 13 deletions

4
.rustfmt.toml Normal file
View file

@ -0,0 +1,4 @@
imports_granularity = "Crate"
group_imports = "StdExternalCrate"
wrap_comments = true
edition = "2024"

View file

@ -1,22 +1,25 @@
use std::time::Duration; use std::time::Duration;
use feed_rs::{model::Content, parser::parse};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
pub mod server; pub mod server;
use reqwest::Client;
use sqlx::{ use sqlx::{
sqlite::{SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions},
types::chrono::DateTime,
types::chrono::Utc,
SqlitePool, SqlitePool,
sqlite::{SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions},
types::chrono::{DateTime, Utc},
}; };
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken; use tokio_util::{bytes::Buf, sync::CancellationToken};
const MAX_CONNS: u32 = 200; const MAX_CONNS: u32 = 200;
const MIN_CONNS: u32 = 5; const MIN_CONNS: u32 = 5;
const TIMEOUT: u64 = 2000; // in milliseconds const TIMEOUT: u64 = 2000; // in milliseconds
const LAST_FETCHED: DateTime<Utc> = DateTime::from_timestamp_nanos(0);
const ONE_YEAR: Duration = Duration::from_secs(365 * 24 * 60 * 60);
pub struct BlogdorTheAggregator { pub struct BlogdorTheAggregator {
db: SqlitePool, db: SqlitePool,
client: reqwest::Client, client: reqwest::Client,
@ -29,7 +32,7 @@ pub struct FeedEntry {
title: String, title: String,
published: DateTime<Utc>, published: DateTime<Utc>,
received: DateTime<Utc>, received: DateTime<Utc>,
description: Option<String>, feed_description: Option<String>,
body: Option<String>, body: Option<String>,
} }
@ -44,7 +47,7 @@ impl BlogdorTheAggregator {
.init(); .init();
let db = get_db_pool().await; let db = get_db_pool().await;
let client = reqwest::Client::new(); let client = reqwest::Client::new(); // TODO: retries?
let cancel = CancellationToken::new(); let cancel = CancellationToken::new();
Self { db, client, cancel } Self { db, client, cancel }
@ -70,7 +73,7 @@ impl BlogdorTheAggregator {
}) })
} }
pub async fn listen(&self) -> JoinHandle<()> { pub async fn listen_http(&self) -> JoinHandle<()> {
server::spawn_server(self.db.clone(), self.cancel.clone()).await server::spawn_server(self.db.clone(), self.cancel.clone()).await
} }
@ -79,7 +82,7 @@ impl BlogdorTheAggregator {
} }
} }
async fn check_feeds(db: &SqlitePool, _client: &Client) { async fn check_feeds(db: &SqlitePool, client: &reqwest::Client) {
tracing::debug!("checking feeds"); tracing::debug!("checking feeds");
let feeds = match sqlx::query!("select id, url from feeds where active = true") let feeds = match sqlx::query!("select id, url from feeds where active = true")
.fetch_all(db) .fetch_all(db)
@ -92,9 +95,64 @@ async fn check_feeds(db: &SqlitePool, _client: &Client) {
} }
}; };
// a channel to receive feed entries over, from the feed-reading tasks
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
for feed in feeds { for feed in feeds {
let id = feed.id;
let url = feed.url; let url = feed.url;
let client = client.clone();
let db = db.clone();
let tx = tx.clone();
tokio::spawn(async move {
check_feed(db, feed.id, client, url, tx).await;
});
}
}
async fn check_feed(
db: SqlitePool,
id: i64,
client: reqwest::Client,
url: String,
tx: tokio::sync::mpsc::UnboundedSender<FeedEntry>,
) {
if let Ok(rec) = sqlx::query!(
"select date_time from runs where succeeded = true and feed = ? order by id desc limit 1",
id
)
.fetch_optional(&db)
.await
{
let last_fetched = rec.map(|d| d.date_time.and_utc()).unwrap_or(LAST_FETCHED);
let now = Utc::now();
let feed = client.get(&url).send().await;
if let Ok(feed) = feed
&& let Ok(feed) = feed.bytes().await
&& let Ok(feed) = parse(feed.reader())
{
for post in feed.entries {
let last_year = now - ONE_YEAR;
if post.published.unwrap_or(last_year) > last_fetched {
let entry = FeedEntry {
url: url.clone(),
title: post
.title
.map(|t| t.content)
.unwrap_or("Blogdor Says: NO POST TITLE".to_string()),
published: post.published.unwrap_or(now),
received: now,
feed_description: feed.description.to_owned().map(|d| d.content),
body: post.content.and_then(|c| c.body),
};
if let Err(e) = tx.send(entry) {
tracing::error!("error sending feed entry: {e}");
continue;
};
// update DB
}
}
}
} }
} }

View file

@ -4,7 +4,7 @@ use blogdor::BlogdorTheAggregator;
async fn main() { async fn main() {
let bta = BlogdorTheAggregator::new().await; let bta = BlogdorTheAggregator::new().await;
let aggregator_handle = bta.aggregate().await; let aggregator_handle = bta.aggregate().await;
let server_handle = bta.listen().await; let server_handle = bta.listen_http().await;
server_handle.await.unwrap_or_default(); server_handle.await.unwrap_or_default();
aggregator_handle.await.unwrap_or_default(); aggregator_handle.await.unwrap_or_default();
bta.close_db().await; bta.close_db().await;

View file

@ -1,10 +1,10 @@
use std::net::SocketAddr; use std::net::SocketAddr;
use axum::{routing::post, Router}; use axum::{Router, routing::post};
use sqlx::SqlitePool; use sqlx::SqlitePool;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
pub async fn spawn_server( pub(crate) async fn spawn_server(
pool: SqlitePool, pool: SqlitePool,
cancel: CancellationToken, cancel: CancellationToken,
) -> tokio::task::JoinHandle<()> { ) -> tokio::task::JoinHandle<()> {