add stale feed check and nag
This commit is contained in:
parent
019aafa647
commit
e7f0a58c8a
4 changed files with 91 additions and 5 deletions
1
migrations/0004_fetches.down.sql
Normal file
1
migrations/0004_fetches.down.sql
Normal file
|
|
@ -0,0 +1 @@
|
|||
DROP TABLE IF EXISTS fetches;
|
||||
6
migrations/0004_fetches.up.sql
Normal file
6
migrations/0004_fetches.up.sql
Normal file
|
|
@ -0,0 +1,6 @@
|
|||
CREATE TABLE IF NOT EXISTS fetches (
|
||||
id INTEGER PRIMARY KEY,
|
||||
feed INT NOT NULL,
|
||||
fetched DATETIME NOT NULL DEFAULT current_timestamp,
|
||||
FOREIGN KEY (feed) REFERENCES feeds(id)
|
||||
);
|
||||
80
src/lib.rs
80
src/lib.rs
|
|
@ -23,6 +23,8 @@ const ZULIP_MESSAGE_CUTOFF: usize = 700;
|
|||
|
||||
const LAST_FETCHED: DateTime<Utc> = DateTime::from_timestamp_nanos(0);
|
||||
|
||||
const STALE_FETCH_THRESHOLD: Duration = Duration::from_hours(24);
|
||||
|
||||
pub struct BlogdorTheAggregator {
|
||||
db: SqlitePool,
|
||||
client: reqwest::Client,
|
||||
|
|
@ -64,12 +66,20 @@ pub struct NewFeed {
|
|||
struct ZulipMessage<'s> {
|
||||
to: u32,
|
||||
#[serde(rename = "type")]
|
||||
typ: &'s str,
|
||||
typ: MessageType,
|
||||
content: String,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
topic: Option<&'s str>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, serde::Serialize)]
|
||||
#[serde(rename_all = "lowercase")]
|
||||
enum MessageType {
|
||||
#[default]
|
||||
Stream,
|
||||
Direct,
|
||||
}
|
||||
|
||||
impl BlogdorTheAggregator {
|
||||
pub async fn new() -> Self {
|
||||
let db = get_db_pool().await;
|
||||
|
|
@ -144,11 +154,67 @@ impl BlogdorTheAggregator {
|
|||
Ok(feed_results)
|
||||
}
|
||||
|
||||
pub async fn check_stale(&self) {
|
||||
let feeds = match sqlx::query!("select id, url, added_by, created_at from feeds")
|
||||
.fetch_all(&self.db)
|
||||
.await
|
||||
{
|
||||
Err(e) => {
|
||||
tracing::error!("could not fetch feeds: {e}");
|
||||
return;
|
||||
}
|
||||
Ok(f) => f,
|
||||
};
|
||||
|
||||
let now = Utc::now();
|
||||
|
||||
for feed in feeds.into_iter() {
|
||||
let id = feed.id;
|
||||
let url = &feed.url;
|
||||
let user = feed.added_by;
|
||||
let fetched = match sqlx::query!(
|
||||
"select fetched from fetches where feed = ? order by id desc limit 1",
|
||||
id
|
||||
)
|
||||
.fetch_optional(&self.db)
|
||||
.await
|
||||
{
|
||||
Err(e) => {
|
||||
tracing::error!("could not get last fetched for {url} from db: {e}");
|
||||
continue;
|
||||
}
|
||||
Ok(f) => f,
|
||||
};
|
||||
let dur = if let Some(fetched) = fetched {
|
||||
now - fetched.fetched.and_utc()
|
||||
} else {
|
||||
now - feed.created_at.and_utc()
|
||||
};
|
||||
|
||||
if dur.num_seconds() > STALE_FETCH_THRESHOLD.as_secs() as i64 {
|
||||
let hours = dur.num_hours() % 24;
|
||||
let days = dur.num_days();
|
||||
let content = format!(
|
||||
"It has been {days} days and {hours} hours since Blogdor was able to check your feed at {url}. If you'd like to remove it, you can DM @blogdor's manager with 'remove {url}'."
|
||||
);
|
||||
let msg = ZulipMessage {
|
||||
to: user as u32,
|
||||
typ: MessageType::Direct,
|
||||
content,
|
||||
topic: None,
|
||||
};
|
||||
if let Err(e) = self.send_zulip_message(&msg).await {
|
||||
tracing::error!("error sending zulip message to user {user}: {e}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn announce_feed(&self, announce: &NewFeed) {
|
||||
let content = format!("{} added a new feed: {}", announce.user, announce.feed);
|
||||
let msg = ZulipMessage {
|
||||
to: self.channel_id,
|
||||
typ: "stream",
|
||||
typ: MessageType::Stream,
|
||||
content,
|
||||
topic: Some("New feeds"),
|
||||
};
|
||||
|
|
@ -199,7 +265,7 @@ impl BlogdorTheAggregator {
|
|||
|
||||
let msg = ZulipMessage {
|
||||
to: self.channel_id,
|
||||
typ: "stream",
|
||||
typ: MessageType::Stream,
|
||||
content,
|
||||
topic: Some(title),
|
||||
};
|
||||
|
|
@ -290,6 +356,14 @@ async fn check_feed(
|
|||
.map_err(|e| format!("could not get bytes from response from {url}, got {e}"))?;
|
||||
let mut feed =
|
||||
parse(feed.reader()).map_err(|e| format!("could not parse feed from {url}, got {e}"))?;
|
||||
|
||||
if let Err(e) = sqlx::query!("insert into fetches (feed) values (?)", feed_id)
|
||||
.execute(&db)
|
||||
.await
|
||||
{
|
||||
tracing::error!("got error inserting {feed_id} into fetches: {e}");
|
||||
}
|
||||
|
||||
feed.entries.sort_by_key(|e| std::cmp::Reverse(e.posted()));
|
||||
for post in feed.entries.into_iter().take(5) {
|
||||
if post.posted().unwrap_or(LAST_FETCHED) > last_fetched {
|
||||
|
|
|
|||
|
|
@ -32,7 +32,9 @@ fn init_logs() {
|
|||
}
|
||||
|
||||
async fn run_loop(bta: &BlogdorTheAggregator, mut announce_rx: UnboundedReceiver<NewFeed>) {
|
||||
let mut alarm = tokio::time::interval(BLOGDOR_SNOOZE);
|
||||
let mut check_feeds = tokio::time::interval(BLOGDOR_SNOOZE);
|
||||
let mut check_stale = tokio::time::interval(Duration::from_hours(24));
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
biased;
|
||||
|
|
@ -41,7 +43,7 @@ async fn run_loop(bta: &BlogdorTheAggregator, mut announce_rx: UnboundedReceiver
|
|||
bta.announce_feed(&announce).await;
|
||||
}
|
||||
}
|
||||
_ = alarm.tick() => {
|
||||
_ = check_feeds.tick() => {
|
||||
match bta.check_feeds().await {
|
||||
Ok(results) => {
|
||||
for result in results {
|
||||
|
|
@ -71,6 +73,9 @@ async fn run_loop(bta: &BlogdorTheAggregator, mut announce_rx: UnboundedReceiver
|
|||
}
|
||||
}
|
||||
}
|
||||
_ = check_stale.tick() => {
|
||||
bta.check_stale().await;
|
||||
}
|
||||
_ = bta.cancelled() => {
|
||||
tracing::info!("shutting down the aggregation loop");
|
||||
break;
|
||||
|
|
|
|||
Loading…
Reference in a new issue