more stuff

This commit is contained in:
Joe 2025-12-28 22:08:23 -08:00
parent 413dc6ab9a
commit b3cdf9f8ef
4 changed files with 78 additions and 50 deletions

2
.gitignore vendored
View file

@ -2,3 +2,5 @@
blogdor.db
secrets
.env
.#*
dump.sql

View file

@ -33,25 +33,33 @@ impl BlogdorTheAggregator {
self.db.close().await;
}
pub async fn db_action<'q, T>(&self, query: DbAction<'q>) -> Result<DbValue, String> {
pub async fn db_action<'q>(&self, query: DbAction<'q>) -> Result<DbValue, String> {
match query {
DbAction::Execute(q) => {
let t = self.db.begin().await.map_err(|e| format!("{e}"))?;
q.execute(&self.db).await.map_err(|e| format!("{e}"))?;
t.commit().await.map_err(|e| format!("{e}"))?;
Ok(DbValue::None)
}
DbAction::FetchOne(q) => {
let t = self.db.begin().await.map_err(|e| format!("{e}"))?;
let r = q.fetch_one(&self.db).await.map_err(|e| format!("{e}"))?;
t.commit().await.map_err(|e| format!("{e}"))?;
Ok(DbValue::One(r))
}
DbAction::FetchMany(q) => {
let t = self.db.begin().await.map_err(|e| format!("{e}"))?;
let r = q.fetch_all(&self.db).await.map_err(|e| format!("{e}"))?;
t.commit().await.map_err(|e| format!("{e}"))?;
Ok(DbValue::Many(r))
}
DbAction::FetchOptional(q) => {
let t = self.db.begin().await.map_err(|e| format!("{e}"))?;
let r = q
.fetch_optional(&self.db)
.await
.map_err(|e| format!("{e}"))?;
t.commit().await.map_err(|e| format!("{e}"))?;
Ok(DbValue::Optional(r))
}
}

View file

@ -2,9 +2,10 @@ use std::time::Duration;
use feed_rs::parser::parse;
use reqwest::{Client, Response, StatusCode};
use serde::{Deserialize, Serialize};
use server::ServerState;
use sqlx::{
SqlitePool,
FromRow, Row, SqlitePool,
types::chrono::{DateTime, Utc},
};
use tokio::{sync::mpsc::UnboundedSender, task::JoinSet};
@ -12,7 +13,7 @@ use tokio_util::{bytes::Buf, sync::CancellationToken};
use unicode_segmentation::UnicodeSegmentation;
mod db;
use db::DbAction;
use db::{DbAction, DbValue};
pub mod server;
@ -24,8 +25,11 @@ const LAST_FETCHED: DateTime<Utc> = DateTime::from_timestamp_nanos(0);
const STALE_FETCH_THRESHOLD: Duration = Duration::from_hours(24);
const ADD_FEED_QUERY: &str = "";
const ACTIVE_FEEDS_QUERY: &str = "select id, url from feeds where active = true";
const STALE_FEEDS_QUERY: &str = "select id, url, added_by, created_at from feeds";
const ACTIVE_FEEDS_QUERY: &str = "select id, url, created_at from feeds where active = true";
const STALE_FEEDS_QUERY: &str =
"select id, url, added_by, created_at from feeds where active = true";
const FETCH_RUN_QUERY: &str =
"select date_time from successful_runs where feed = ? order by id desc limit 1";
pub struct BlogdorTheAggregator {
db: SqlitePool,
@ -82,6 +86,22 @@ enum MessageType {
Direct,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, FromRow)]
pub struct ActiveFeed {
url: String,
id: i64,
owner: i64,
added: DateTime<Utc>,
#[sqlx(flatten)]
last_run: FeedRun,
}
#[derive(Debug, Default, Clone, PartialEq, Eq, Serialize, Deserialize, FromRow)]
pub struct FeedRun {
fetched: Option<DateTime<Utc>>,
posted: Option<DateTime<Utc>>,
}
impl BlogdorTheAggregator {
pub async fn new() -> Self {
let db = db::get_db_pool().await;
@ -131,21 +151,36 @@ impl BlogdorTheAggregator {
server::spawn_server(state, self.cancel.clone()).await;
}
pub async fn check_feeds(&self) -> Result<Vec<Result<FeedResult, String>>, String> {
pub async fn check_feeds(&self) -> Result<Vec<FeedResult>, String> {
tracing::debug!("checking feeds");
let feeds = sqlx::query!("select id, url from feeds where active = true")
.fetch_all(&self.db)
let feed_query = sqlx::query(ACTIVE_FEEDS_QUERY);
let feeds = self
.db_action(DbAction::FetchMany(feed_query))
.await
.map_err(|e| format!("{e}"))?;
.map_err(|e| {
tracing::error!("got error getting feeds from DB: {e}");
"couldn't get active feeds".to_string()
})?;
let DbValue::Many(feeds) = feeds else {
unreachable!()
};
let mut handles = JoinSet::new();
for feed in feeds {
handles.spawn(check_feed(
self.db.clone(),
feed.id,
self.client.clone(),
feed.url,
));
let id = feed.get("id");
let url = feed.get("url");
let created_at: DateTime<Utc> = feed.get("created_at");
let last = if let Ok(v) = self
.db_action(DbAction::FetchOne(sqlx::query(FETCH_RUN_QUERY)))
.await
{
let DbValue::One(r) = v else { unreachable!() };
r.get("date_time")
} else {
created_at
};
handles.spawn(check_feed(self.client.clone(), id, url, last));
}
let mut feed_results = Vec::new();
@ -155,6 +190,12 @@ impl BlogdorTheAggregator {
tracing::error!("got join error: {e}");
continue;
};
let Ok(feed_result) = feed_result else {
let e = feed_result.unwrap_err();
tracing::error!("got error fetching feed: {e}");
continue;
};
//self.db_action(DbAction::Execute(sqlx::query("insert into succ")));
feed_results.push(feed_result);
}
@ -334,32 +375,17 @@ impl Posted for feed_rs::model::Entry {
// takes args by value because it's meant to be called from inside a spawned
// tokio task scope
async fn check_feed(
db: SqlitePool,
feed_id: i64,
client: reqwest::Client,
feed_id: i64,
url: String,
last_fetched: DateTime<Utc>,
) -> Result<FeedResult, String> {
let rec = sqlx::query!(
"select date_time from successful_runs where feed = ? order by id desc limit 1",
feed_id
)
.fetch_optional(&db)
.await
.map_err(|e| format!("Could not fetch runs for {url} from DB, got {e}"))?;
tracing::debug!("checking {url}");
let last_fetched = rec.map(|d| d.date_time.and_utc()).unwrap_or(LAST_FETCHED);
let now = Utc::now();
let mut feed = fetch_and_parse_feed(&url, &client).await?;
if let Err(e) = sqlx::query!("insert into fetches (feed) values (?)", feed_id)
.execute(&db)
.await
{
tracing::error!("got error inserting {feed_id} into fetches: {e}");
}
let mut entries = None;
feed.entries.sort_by_key(|e| std::cmp::Reverse(e.posted()));
for post in feed.entries.into_iter().take(5) {

View file

@ -47,23 +47,15 @@ async fn run_loop(bta: &BlogdorTheAggregator, mut announce_rx: UnboundedReceiver
match bta.check_feeds().await {
Ok(results) => {
for result in results {
match result {
Ok(result) => {
if let Some(ref posts) = result.entries {
tracing::debug!(
"got {} new posts from {}",
posts.len(),
result.url
);
bta.post_entries(posts).await;
} else {
tracing::debug!("no new posts from {}", result.url);
}
},
// inner error for singular feed
Err(e) => {
tracing::warn!("could not check feed: {e}");
},
if let Some(ref posts) = result.entries {
tracing::debug!(
"got {} new posts from {}",
posts.len(),
result.url
);
bta.post_entries(posts).await;
} else {
tracing::debug!("no new posts from {}", result.url);
}
}
},