tweak the fetching and posting code
This commit is contained in:
parent
4434a31c09
commit
b39612e969
5 changed files with 278 additions and 139 deletions
70
Cargo.lock
generated
70
Cargo.lock
generated
|
|
@ -89,6 +89,28 @@ dependencies = [
|
||||||
"windows-sys 0.61.2",
|
"windows-sys 0.61.2",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "async-channel"
|
||||||
|
version = "1.9.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "81953c529336010edd6d8e358f886d9581267795c61b19475b71314bffa46d35"
|
||||||
|
dependencies = [
|
||||||
|
"concurrent-queue",
|
||||||
|
"event-listener 2.5.3",
|
||||||
|
"futures-core",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "async-trait"
|
||||||
|
version = "0.1.89"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "9035ad2d096bed7955a320ee7e2230574d28fd3c3a0f186cbea1ff3c7eed5dbb"
|
||||||
|
dependencies = [
|
||||||
|
"proc-macro2",
|
||||||
|
"quote",
|
||||||
|
"syn 2.0.111",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "atoi"
|
name = "atoi"
|
||||||
version = "2.0.0"
|
version = "2.0.0"
|
||||||
|
|
@ -209,6 +231,7 @@ dependencies = [
|
||||||
"feed-rs",
|
"feed-rs",
|
||||||
"html2md",
|
"html2md",
|
||||||
"justerror",
|
"justerror",
|
||||||
|
"moro",
|
||||||
"rand 0.9.2",
|
"rand 0.9.2",
|
||||||
"reqwest",
|
"reqwest",
|
||||||
"serde",
|
"serde",
|
||||||
|
|
@ -535,6 +558,12 @@ dependencies = [
|
||||||
"windows-sys 0.48.0",
|
"windows-sys 0.48.0",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "event-listener"
|
||||||
|
version = "2.5.3"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "event-listener"
|
name = "event-listener"
|
||||||
version = "5.4.1"
|
version = "5.4.1"
|
||||||
|
|
@ -633,6 +662,21 @@ dependencies = [
|
||||||
"new_debug_unreachable",
|
"new_debug_unreachable",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "futures"
|
||||||
|
version = "0.3.31"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876"
|
||||||
|
dependencies = [
|
||||||
|
"futures-channel",
|
||||||
|
"futures-core",
|
||||||
|
"futures-executor",
|
||||||
|
"futures-io",
|
||||||
|
"futures-sink",
|
||||||
|
"futures-task",
|
||||||
|
"futures-util",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "futures-channel"
|
name = "futures-channel"
|
||||||
version = "0.3.31"
|
version = "0.3.31"
|
||||||
|
|
@ -677,6 +721,17 @@ version = "0.3.31"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6"
|
checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "futures-macro"
|
||||||
|
version = "0.3.31"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650"
|
||||||
|
dependencies = [
|
||||||
|
"proc-macro2",
|
||||||
|
"quote",
|
||||||
|
"syn 2.0.111",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "futures-sink"
|
name = "futures-sink"
|
||||||
version = "0.3.31"
|
version = "0.3.31"
|
||||||
|
|
@ -695,8 +750,10 @@ version = "0.3.31"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81"
|
checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
|
"futures-channel",
|
||||||
"futures-core",
|
"futures-core",
|
||||||
"futures-io",
|
"futures-io",
|
||||||
|
"futures-macro",
|
||||||
"futures-sink",
|
"futures-sink",
|
||||||
"futures-task",
|
"futures-task",
|
||||||
"memchr",
|
"memchr",
|
||||||
|
|
@ -1379,6 +1436,17 @@ dependencies = [
|
||||||
"windows-sys 0.61.2",
|
"windows-sys 0.61.2",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "moro"
|
||||||
|
version = "0.4.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "8472c674b8319e7529bfdb3c51216810e36727be2056136d07130a0b1c132df6"
|
||||||
|
dependencies = [
|
||||||
|
"async-channel",
|
||||||
|
"async-trait",
|
||||||
|
"futures",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "native-tls"
|
name = "native-tls"
|
||||||
version = "0.2.14"
|
version = "0.2.14"
|
||||||
|
|
@ -2179,7 +2247,7 @@ dependencies = [
|
||||||
"crc",
|
"crc",
|
||||||
"crossbeam-queue",
|
"crossbeam-queue",
|
||||||
"either",
|
"either",
|
||||||
"event-listener",
|
"event-listener 5.4.1",
|
||||||
"futures-core",
|
"futures-core",
|
||||||
"futures-intrusive",
|
"futures-intrusive",
|
||||||
"futures-io",
|
"futures-io",
|
||||||
|
|
|
||||||
|
|
@ -10,6 +10,7 @@ clap = { version = "4.5.53", features = ["derive"] }
|
||||||
feed-rs = { version = "2.3.1", features = ["sanitize"] }
|
feed-rs = { version = "2.3.1", features = ["sanitize"] }
|
||||||
html2md = "0.2.15"
|
html2md = "0.2.15"
|
||||||
justerror = "1.1.0"
|
justerror = "1.1.0"
|
||||||
|
moro = "0.4.0"
|
||||||
reqwest = "0.12.24"
|
reqwest = "0.12.24"
|
||||||
serde = { version = "1.0.228", features = ["derive"] }
|
serde = { version = "1.0.228", features = ["derive"] }
|
||||||
serde_urlencoded = "0.7.1"
|
serde_urlencoded = "0.7.1"
|
||||||
|
|
|
||||||
|
|
@ -1 +1 @@
|
||||||
DROP TABLE IF EXISTS runs;
|
DROP TABLE IF EXISTS successful_runs;
|
||||||
|
|
|
||||||
179
src/lib.rs
179
src/lib.rs
|
|
@ -7,7 +7,7 @@ use sqlx::{
|
||||||
sqlite::{SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions},
|
sqlite::{SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions},
|
||||||
types::chrono::{DateTime, Utc},
|
types::chrono::{DateTime, Utc},
|
||||||
};
|
};
|
||||||
use tokio::task::{JoinHandle, JoinSet};
|
use tokio::task::JoinSet;
|
||||||
use tokio_util::{bytes::Buf, sync::CancellationToken};
|
use tokio_util::{bytes::Buf, sync::CancellationToken};
|
||||||
use unicode_segmentation::UnicodeSegmentation;
|
use unicode_segmentation::UnicodeSegmentation;
|
||||||
|
|
||||||
|
|
@ -17,6 +17,9 @@ const MAX_CONNS: u32 = 200;
|
||||||
const MIN_CONNS: u32 = 5;
|
const MIN_CONNS: u32 = 5;
|
||||||
const TIMEOUT: u64 = 2000; // in milliseconds
|
const TIMEOUT: u64 = 2000; // in milliseconds
|
||||||
|
|
||||||
|
const ZULIP_INTERVAL: Duration = Duration::from_millis(250);
|
||||||
|
const ZULIP_MESSAGE_CUTOFF: usize = 700;
|
||||||
|
|
||||||
const LAST_FETCHED: DateTime<Utc> = DateTime::from_timestamp_nanos(0);
|
const LAST_FETCHED: DateTime<Utc> = DateTime::from_timestamp_nanos(0);
|
||||||
const ONE_YEAR: Duration = Duration::from_secs(365 * 24 * 60 * 60);
|
const ONE_YEAR: Duration = Duration::from_secs(365 * 24 * 60 * 60);
|
||||||
|
|
||||||
|
|
@ -24,11 +27,16 @@ pub struct BlogdorTheAggregator {
|
||||||
db: SqlitePool,
|
db: SqlitePool,
|
||||||
client: reqwest::Client,
|
client: reqwest::Client,
|
||||||
cancel: CancellationToken,
|
cancel: CancellationToken,
|
||||||
|
endpoint: String,
|
||||||
|
channel_id: u32,
|
||||||
|
email: String,
|
||||||
|
password: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Default, Clone, PartialEq, Eq)]
|
#[derive(Debug, Default, Clone, PartialEq, Eq)]
|
||||||
pub struct FeedEntry {
|
pub struct FeedEntry {
|
||||||
url: String,
|
post_url: String,
|
||||||
|
feed_url: String,
|
||||||
feed_id: i64,
|
feed_id: i64,
|
||||||
title: String,
|
title: String,
|
||||||
published: DateTime<Utc>,
|
published: DateTime<Utc>,
|
||||||
|
|
@ -37,7 +45,14 @@ pub struct FeedEntry {
|
||||||
body: Option<String>,
|
body: Option<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Default, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
#[derive(Debug, Default, Clone, PartialEq, Eq)]
|
||||||
|
pub struct FeedResult {
|
||||||
|
pub entries: Option<Vec<FeedEntry>>,
|
||||||
|
pub url: String,
|
||||||
|
pub feed_id: i64,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Default, Clone, PartialEq, Eq, serde::Serialize)]
|
||||||
struct ZulipMessage<'s> {
|
struct ZulipMessage<'s> {
|
||||||
to: u32,
|
to: u32,
|
||||||
#[serde(rename = "type")]
|
#[serde(rename = "type")]
|
||||||
|
|
@ -52,52 +67,6 @@ impl BlogdorTheAggregator {
|
||||||
let db = get_db_pool().await;
|
let db = get_db_pool().await;
|
||||||
let client = reqwest::Client::new(); // TODO: retries?
|
let client = reqwest::Client::new(); // TODO: retries?
|
||||||
let cancel = CancellationToken::new();
|
let cancel = CancellationToken::new();
|
||||||
|
|
||||||
Self { db, client, cancel }
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn aggregate(&self) -> JoinHandle<()> {
|
|
||||||
let db = self.db.clone();
|
|
||||||
let client = self.client.clone();
|
|
||||||
let cancel = self.cancel.clone();
|
|
||||||
tokio::task::spawn(async move {
|
|
||||||
let mut alarm = tokio::time::interval(Duration::from_hours(1));
|
|
||||||
loop {
|
|
||||||
tokio::select! {
|
|
||||||
_ = alarm.tick() => {
|
|
||||||
check_feeds(&db, &client).await;
|
|
||||||
}
|
|
||||||
_ = cancel.cancelled() => {
|
|
||||||
tracing::info!("shutting down the aggregation loop");
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn listen_http(&self) -> JoinHandle<()> {
|
|
||||||
server::spawn_server(self.db.clone(), self.cancel.clone()).await
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn close_db(&self) {
|
|
||||||
self.db.close().await;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn check_feeds(db: &SqlitePool, client: &reqwest::Client) {
|
|
||||||
tracing::debug!("checking feeds");
|
|
||||||
let feeds = match sqlx::query!("select id, url from feeds where active = true")
|
|
||||||
.fetch_all(db)
|
|
||||||
.await
|
|
||||||
{
|
|
||||||
Ok(feeds) => feeds,
|
|
||||||
Err(e) => {
|
|
||||||
tracing::error!("got error fetching feeds from db: {e}");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let endpoint = std::env::var("ZULIP_URL").expect("ZULIP_URL must be set");
|
let endpoint = std::env::var("ZULIP_URL").expect("ZULIP_URL must be set");
|
||||||
let channel_id: u32 = std::env::var("ZULIP_CHANNEL")
|
let channel_id: u32 = std::env::var("ZULIP_CHANNEL")
|
||||||
.expect("ZULIP_CHANNEL must be set")
|
.expect("ZULIP_CHANNEL must be set")
|
||||||
|
|
@ -107,23 +76,59 @@ async fn check_feeds(db: &SqlitePool, client: &reqwest::Client) {
|
||||||
let email = std::env::var("BLOGDOR_EMAIL").expect("BLOGDOR_EMAIL must be set");
|
let email = std::env::var("BLOGDOR_EMAIL").expect("BLOGDOR_EMAIL must be set");
|
||||||
let password = std::env::var("ZULIP_TOKEN").expect("ZULIP_TOKEN must be set");
|
let password = std::env::var("ZULIP_TOKEN").expect("ZULIP_TOKEN must be set");
|
||||||
|
|
||||||
|
Self {
|
||||||
|
db,
|
||||||
|
client,
|
||||||
|
cancel,
|
||||||
|
endpoint,
|
||||||
|
channel_id,
|
||||||
|
email,
|
||||||
|
password,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn cancelled(&self) {
|
||||||
|
self.cancel.cancelled().await
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn spawn_http(&self) {
|
||||||
|
server::spawn_server(self.db.clone(), self.cancel.clone()).await;
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn check_feeds(&self) -> Result<Vec<Result<FeedResult, String>>, String> {
|
||||||
|
tracing::debug!("checking feeds");
|
||||||
|
let feeds = sqlx::query!("select id, url from feeds where active = true")
|
||||||
|
.fetch_all(&self.db)
|
||||||
|
.await
|
||||||
|
.map_err(|e| format!("{e}"))?;
|
||||||
|
|
||||||
let mut handles = JoinSet::new();
|
let mut handles = JoinSet::new();
|
||||||
for feed in feeds {
|
for feed in feeds {
|
||||||
handles.spawn(check_feed(db.clone(), feed.id, client.clone(), feed.url));
|
handles.spawn(check_feed(
|
||||||
|
self.db.clone(),
|
||||||
|
feed.id,
|
||||||
|
self.client.clone(),
|
||||||
|
feed.url,
|
||||||
|
));
|
||||||
}
|
}
|
||||||
while let Some(posts) = handles.join_next().await {
|
|
||||||
let Ok(posts) = posts else {
|
let mut feed_results = Vec::new();
|
||||||
let e = posts.unwrap_err();
|
while let Some(feed_result) = handles.join_next().await {
|
||||||
|
let Ok(feed_result) = feed_result else {
|
||||||
|
let e = feed_result.unwrap_err();
|
||||||
tracing::error!("got join error: {e}");
|
tracing::error!("got join error: {e}");
|
||||||
continue;
|
continue;
|
||||||
};
|
};
|
||||||
match posts {
|
feed_results.push(feed_result);
|
||||||
Err(s) => {
|
|
||||||
tracing::warn!("could not fetch feed: {s}")
|
|
||||||
}
|
}
|
||||||
Ok(None) => {}
|
|
||||||
Ok(Some(posts)) => {
|
Ok(feed_results)
|
||||||
let FeedEntry { feed_id, .. } = posts.last().unwrap();
|
}
|
||||||
|
|
||||||
|
pub async fn post_entries(&self, posts: &[FeedEntry]) {
|
||||||
|
let FeedEntry {
|
||||||
|
feed_id, received, ..
|
||||||
|
} = posts.last().unwrap();
|
||||||
let mut success = true;
|
let mut success = true;
|
||||||
for post in posts.iter() {
|
for post in posts.iter() {
|
||||||
let body = post
|
let body = post
|
||||||
|
|
@ -133,20 +138,21 @@ async fn check_feeds(db: &SqlitePool, client: &reqwest::Client) {
|
||||||
.cloned()
|
.cloned()
|
||||||
.unwrap_or("Blogdor Says: NO BODY!".to_string());
|
.unwrap_or("Blogdor Says: NO BODY!".to_string());
|
||||||
let content = format!(
|
let content = format!(
|
||||||
"{body}\n\n---\noriginally posted to {}, on {}",
|
"{body} ...\n\n---\noriginally posted to {}, on {}",
|
||||||
post.url, post.published
|
post.post_url, post.published
|
||||||
);
|
);
|
||||||
let msg = ZulipMessage {
|
let msg = ZulipMessage {
|
||||||
to: channel_id,
|
to: self.channel_id,
|
||||||
typ: "stream",
|
typ: "stream",
|
||||||
content,
|
content,
|
||||||
topic: Some(&post.title),
|
topic: Some(&post.title),
|
||||||
};
|
};
|
||||||
let msg = serde_urlencoded::to_string(msg).expect("serialize msg");
|
let msg = serde_urlencoded::to_string(msg).expect("serialize msg");
|
||||||
|
|
||||||
match client
|
match self
|
||||||
.post(&endpoint)
|
.client
|
||||||
.basic_auth(&email, Some(&password))
|
.post(&self.endpoint)
|
||||||
|
.basic_auth(&self.email, Some(&self.password))
|
||||||
.body(msg)
|
.body(msg)
|
||||||
.header("Content-Type", "application/x-www-form-urlencoded")
|
.header("Content-Type", "application/x-www-form-urlencoded")
|
||||||
.send()
|
.send()
|
||||||
|
|
@ -160,31 +166,39 @@ async fn check_feeds(db: &SqlitePool, client: &reqwest::Client) {
|
||||||
if r.status() == StatusCode::OK {
|
if r.status() == StatusCode::OK {
|
||||||
success &= true;
|
success &= true;
|
||||||
} else {
|
} else {
|
||||||
|
tracing::warn!("did not successfully post to zulip: status {}", r.status());
|
||||||
success = false;
|
success = false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
tokio::time::sleep(Duration::from_millis(200)).await;
|
tokio::time::sleep(ZULIP_INTERVAL).await;
|
||||||
}
|
}
|
||||||
if success
|
if success
|
||||||
&& let Err(e) =
|
&& let Err(e) = sqlx::query!(
|
||||||
sqlx::query!("insert into successful_runs (feed) values (?)", feed_id)
|
"insert into successful_runs (feed, date_time) values (?, ?)",
|
||||||
.execute(db)
|
feed_id,
|
||||||
|
received
|
||||||
|
)
|
||||||
|
.execute(&self.db)
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
tracing::error!("could not insert run for {feed_id}, got {e}");
|
tracing::error!("could not insert run for {feed_id}, got {e}");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
pub async fn close_db(&self) {
|
||||||
|
self.db.close().await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// takes args by value because it's meant to be called from inside a spawned
|
||||||
|
// tokio task scope
|
||||||
async fn check_feed(
|
async fn check_feed(
|
||||||
db: SqlitePool,
|
db: SqlitePool,
|
||||||
feed_id: i64,
|
feed_id: i64,
|
||||||
client: reqwest::Client,
|
client: reqwest::Client,
|
||||||
url: String,
|
url: String,
|
||||||
) -> Result<Option<Vec<FeedEntry>>, String> {
|
) -> Result<FeedResult, String> {
|
||||||
let rec = sqlx::query!(
|
let rec = sqlx::query!(
|
||||||
"select date_time from successful_runs where feed = ? order by id desc limit 1",
|
"select date_time from successful_runs where feed = ? order by id desc limit 1",
|
||||||
feed_id
|
feed_id
|
||||||
|
|
@ -196,7 +210,7 @@ async fn check_feed(
|
||||||
tracing::debug!("checking {url}");
|
tracing::debug!("checking {url}");
|
||||||
let last_fetched = rec.map(|d| d.date_time.and_utc()).unwrap_or(LAST_FETCHED);
|
let last_fetched = rec.map(|d| d.date_time.and_utc()).unwrap_or(LAST_FETCHED);
|
||||||
let now = Utc::now();
|
let now = Utc::now();
|
||||||
let mut out = Vec::new();
|
let mut entries = None;
|
||||||
let feed = client
|
let feed = client
|
||||||
.get(&url)
|
.get(&url)
|
||||||
.send()
|
.send()
|
||||||
|
|
@ -211,13 +225,14 @@ async fn check_feed(
|
||||||
let last_year = now - ONE_YEAR;
|
let last_year = now - ONE_YEAR;
|
||||||
if post.published.unwrap_or(last_year) > last_fetched {
|
if post.published.unwrap_or(last_year) > last_fetched {
|
||||||
let entry = FeedEntry {
|
let entry = FeedEntry {
|
||||||
url: post
|
post_url: post
|
||||||
.links
|
.links
|
||||||
.first()
|
.first()
|
||||||
.cloned()
|
.cloned()
|
||||||
.map(|l| l.href)
|
.map(|l| l.href)
|
||||||
.unwrap_or("".to_string()),
|
.unwrap_or("Blogdor Says: NO POST URL".to_string()),
|
||||||
feed_id,
|
feed_id,
|
||||||
|
feed_url: url.clone(),
|
||||||
title: post
|
title: post
|
||||||
.title
|
.title
|
||||||
.map(|t| t.content)
|
.map(|t| t.content)
|
||||||
|
|
@ -229,23 +244,21 @@ async fn check_feed(
|
||||||
c.body.map(|f| {
|
c.body.map(|f| {
|
||||||
let s = html2md::parse_html(&f)
|
let s = html2md::parse_html(&f)
|
||||||
.graphemes(false)
|
.graphemes(false)
|
||||||
.take(500)
|
.take(ZULIP_MESSAGE_CUTOFF)
|
||||||
.collect::<String>();
|
.collect::<String>();
|
||||||
s.to_string()
|
s.to_string()
|
||||||
})
|
})
|
||||||
}),
|
}),
|
||||||
};
|
};
|
||||||
out.push(entry);
|
entries.get_or_insert(Vec::new()).push(entry);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if out.is_empty() {
|
Ok(FeedResult {
|
||||||
tracing::debug!("no new items from {url}");
|
entries,
|
||||||
Ok(None)
|
url,
|
||||||
} else {
|
feed_id,
|
||||||
tracing::debug!("found {} new items from {url}", out.len());
|
})
|
||||||
Ok(Some(out))
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn get_db_pool() -> SqlitePool {
|
async fn get_db_pool() -> SqlitePool {
|
||||||
|
|
|
||||||
71
src/main.rs
71
src/main.rs
|
|
@ -1,8 +1,24 @@
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
use blogdor::BlogdorTheAggregator;
|
use blogdor::BlogdorTheAggregator;
|
||||||
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
|
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
|
||||||
|
|
||||||
|
const BLOGDOR_SNOOZE: Duration = Duration::from_hours(1);
|
||||||
|
|
||||||
#[tokio::main(flavor = "multi_thread")]
|
#[tokio::main(flavor = "multi_thread")]
|
||||||
async fn main() {
|
async fn main() {
|
||||||
|
init_logs();
|
||||||
|
|
||||||
|
let bta = BlogdorTheAggregator::new().await;
|
||||||
|
bta.spawn_http().await;
|
||||||
|
run_loop(&bta).await;
|
||||||
|
|
||||||
|
bta.close_db().await;
|
||||||
|
|
||||||
|
tracing::info!("db closed, exiting");
|
||||||
|
}
|
||||||
|
|
||||||
|
fn init_logs() {
|
||||||
tracing_subscriber::registry()
|
tracing_subscriber::registry()
|
||||||
.with(
|
.with(
|
||||||
tracing_subscriber::EnvFilter::try_from_default_env()
|
tracing_subscriber::EnvFilter::try_from_default_env()
|
||||||
|
|
@ -10,11 +26,52 @@ async fn main() {
|
||||||
)
|
)
|
||||||
.with(tracing_subscriber::fmt::layer())
|
.with(tracing_subscriber::fmt::layer())
|
||||||
.init();
|
.init();
|
||||||
|
}
|
||||||
let bta = BlogdorTheAggregator::new().await;
|
|
||||||
let aggregator_handle = bta.aggregate().await;
|
async fn run_loop(bta: &BlogdorTheAggregator) {
|
||||||
let server_handle = bta.listen_http().await;
|
let mut alarm = tokio::time::interval(BLOGDOR_SNOOZE);
|
||||||
server_handle.await.unwrap_or_default();
|
moro::async_scope!(|scope| {
|
||||||
aggregator_handle.await.unwrap_or_default();
|
scope.spawn(async {
|
||||||
bta.close_db().await;
|
loop {
|
||||||
|
tokio::select! {
|
||||||
|
biased;
|
||||||
|
_ = alarm.tick() => {
|
||||||
|
match bta.check_feeds().await {
|
||||||
|
Ok(results) => {
|
||||||
|
for result in results {
|
||||||
|
match result {
|
||||||
|
Ok(result) => {
|
||||||
|
if let Some(ref posts) = result.entries {
|
||||||
|
tracing::debug!(
|
||||||
|
"got {} new posts from {}",
|
||||||
|
posts.len(),
|
||||||
|
result.url
|
||||||
|
);
|
||||||
|
bta.post_entries(posts).await;
|
||||||
|
} else {
|
||||||
|
tracing::debug!("no new posts from {}", result.url);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
// inner error for singular feed
|
||||||
|
Err(e) => {
|
||||||
|
tracing::warn!("could not check feed: {e}");
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
// outer check_feeds error
|
||||||
|
Err(e) => {
|
||||||
|
tracing::warn!("could not check feeds: {e}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_ = bta.cancelled() => {
|
||||||
|
tracing::info!("shutting down the aggregation loop");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
})
|
||||||
|
.await;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue