diff --git a/.gitignore b/.gitignore index 376bb46..065223a 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,5 @@ blogdor.db secrets .env +.#* +dump.sql diff --git a/src/db.rs b/src/db.rs index 5297285..9fbdfe9 100644 --- a/src/db.rs +++ b/src/db.rs @@ -33,25 +33,33 @@ impl BlogdorTheAggregator { self.db.close().await; } - pub async fn db_action<'q, T>(&self, query: DbAction<'q>) -> Result { + pub async fn db_action<'q>(&self, query: DbAction<'q>) -> Result { match query { DbAction::Execute(q) => { + let t = self.db.begin().await.map_err(|e| format!("{e}"))?; q.execute(&self.db).await.map_err(|e| format!("{e}"))?; + t.commit().await.map_err(|e| format!("{e}"))?; Ok(DbValue::None) } DbAction::FetchOne(q) => { + let t = self.db.begin().await.map_err(|e| format!("{e}"))?; let r = q.fetch_one(&self.db).await.map_err(|e| format!("{e}"))?; + t.commit().await.map_err(|e| format!("{e}"))?; Ok(DbValue::One(r)) } DbAction::FetchMany(q) => { + let t = self.db.begin().await.map_err(|e| format!("{e}"))?; let r = q.fetch_all(&self.db).await.map_err(|e| format!("{e}"))?; + t.commit().await.map_err(|e| format!("{e}"))?; Ok(DbValue::Many(r)) } DbAction::FetchOptional(q) => { + let t = self.db.begin().await.map_err(|e| format!("{e}"))?; let r = q .fetch_optional(&self.db) .await .map_err(|e| format!("{e}"))?; + t.commit().await.map_err(|e| format!("{e}"))?; Ok(DbValue::Optional(r)) } } diff --git a/src/lib.rs b/src/lib.rs index b72c744..664c203 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,9 +2,10 @@ use std::time::Duration; use feed_rs::parser::parse; use reqwest::{Client, Response, StatusCode}; +use serde::{Deserialize, Serialize}; use server::ServerState; use sqlx::{ - SqlitePool, + FromRow, Row, SqlitePool, types::chrono::{DateTime, Utc}, }; use tokio::{sync::mpsc::UnboundedSender, task::JoinSet}; @@ -12,7 +13,7 @@ use tokio_util::{bytes::Buf, sync::CancellationToken}; use unicode_segmentation::UnicodeSegmentation; mod db; -use db::DbAction; +use db::{DbAction, DbValue}; pub mod server; @@ -24,8 +25,11 @@ const LAST_FETCHED: DateTime = DateTime::from_timestamp_nanos(0); const STALE_FETCH_THRESHOLD: Duration = Duration::from_hours(24); const ADD_FEED_QUERY: &str = ""; -const ACTIVE_FEEDS_QUERY: &str = "select id, url from feeds where active = true"; -const STALE_FEEDS_QUERY: &str = "select id, url, added_by, created_at from feeds"; +const ACTIVE_FEEDS_QUERY: &str = "select id, url, created_at from feeds where active = true"; +const STALE_FEEDS_QUERY: &str = + "select id, url, added_by, created_at from feeds where active = true"; +const FETCH_RUN_QUERY: &str = + "select date_time from successful_runs where feed = ? order by id desc limit 1"; pub struct BlogdorTheAggregator { db: SqlitePool, @@ -82,6 +86,22 @@ enum MessageType { Direct, } +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, FromRow)] +pub struct ActiveFeed { + url: String, + id: i64, + owner: i64, + added: DateTime, + #[sqlx(flatten)] + last_run: FeedRun, +} + +#[derive(Debug, Default, Clone, PartialEq, Eq, Serialize, Deserialize, FromRow)] +pub struct FeedRun { + fetched: Option>, + posted: Option>, +} + impl BlogdorTheAggregator { pub async fn new() -> Self { let db = db::get_db_pool().await; @@ -131,21 +151,36 @@ impl BlogdorTheAggregator { server::spawn_server(state, self.cancel.clone()).await; } - pub async fn check_feeds(&self) -> Result>, String> { + pub async fn check_feeds(&self) -> Result, String> { tracing::debug!("checking feeds"); - let feeds = sqlx::query!("select id, url from feeds where active = true") - .fetch_all(&self.db) + let feed_query = sqlx::query(ACTIVE_FEEDS_QUERY); + let feeds = self + .db_action(DbAction::FetchMany(feed_query)) .await - .map_err(|e| format!("{e}"))?; + .map_err(|e| { + tracing::error!("got error getting feeds from DB: {e}"); + "couldn't get active feeds".to_string() + })?; + let DbValue::Many(feeds) = feeds else { + unreachable!() + }; let mut handles = JoinSet::new(); for feed in feeds { - handles.spawn(check_feed( - self.db.clone(), - feed.id, - self.client.clone(), - feed.url, - )); + let id = feed.get("id"); + let url = feed.get("url"); + let created_at: DateTime = feed.get("created_at"); + let last = if let Ok(v) = self + .db_action(DbAction::FetchOne(sqlx::query(FETCH_RUN_QUERY))) + .await + { + let DbValue::One(r) = v else { unreachable!() }; + r.get("date_time") + } else { + created_at + }; + + handles.spawn(check_feed(self.client.clone(), id, url, last)); } let mut feed_results = Vec::new(); @@ -155,6 +190,12 @@ impl BlogdorTheAggregator { tracing::error!("got join error: {e}"); continue; }; + let Ok(feed_result) = feed_result else { + let e = feed_result.unwrap_err(); + tracing::error!("got error fetching feed: {e}"); + continue; + }; + //self.db_action(DbAction::Execute(sqlx::query("insert into succ"))); feed_results.push(feed_result); } @@ -334,32 +375,17 @@ impl Posted for feed_rs::model::Entry { // takes args by value because it's meant to be called from inside a spawned // tokio task scope async fn check_feed( - db: SqlitePool, - feed_id: i64, client: reqwest::Client, + feed_id: i64, url: String, + last_fetched: DateTime, ) -> Result { - let rec = sqlx::query!( - "select date_time from successful_runs where feed = ? order by id desc limit 1", - feed_id - ) - .fetch_optional(&db) - .await - .map_err(|e| format!("Could not fetch runs for {url} from DB, got {e}"))?; - tracing::debug!("checking {url}"); - let last_fetched = rec.map(|d| d.date_time.and_utc()).unwrap_or(LAST_FETCHED); + let now = Utc::now(); let mut feed = fetch_and_parse_feed(&url, &client).await?; - if let Err(e) = sqlx::query!("insert into fetches (feed) values (?)", feed_id) - .execute(&db) - .await - { - tracing::error!("got error inserting {feed_id} into fetches: {e}"); - } - let mut entries = None; feed.entries.sort_by_key(|e| std::cmp::Reverse(e.posted())); for post in feed.entries.into_iter().take(5) { diff --git a/src/main.rs b/src/main.rs index f5404e3..5b12cad 100644 --- a/src/main.rs +++ b/src/main.rs @@ -47,23 +47,15 @@ async fn run_loop(bta: &BlogdorTheAggregator, mut announce_rx: UnboundedReceiver match bta.check_feeds().await { Ok(results) => { for result in results { - match result { - Ok(result) => { - if let Some(ref posts) = result.entries { - tracing::debug!( - "got {} new posts from {}", - posts.len(), - result.url - ); - bta.post_entries(posts).await; - } else { - tracing::debug!("no new posts from {}", result.url); - } - }, - // inner error for singular feed - Err(e) => { - tracing::warn!("could not check feed: {e}"); - }, + if let Some(ref posts) = result.entries { + tracing::debug!( + "got {} new posts from {}", + posts.len(), + result.url + ); + bta.post_entries(posts).await; + } else { + tracing::debug!("no new posts from {}", result.url); } } },