start removing the feed entry channel
This commit is contained in:
parent
2cafaee52f
commit
72229bf073
1 changed files with 36 additions and 32 deletions
30
src/lib.rs
30
src/lib.rs
|
|
@ -29,6 +29,7 @@ pub struct BlogdorTheAggregator {
|
||||||
#[derive(Debug, Default, Clone, PartialEq, Eq)]
|
#[derive(Debug, Default, Clone, PartialEq, Eq)]
|
||||||
pub struct FeedEntry {
|
pub struct FeedEntry {
|
||||||
url: String,
|
url: String,
|
||||||
|
feed_id: i64,
|
||||||
title: String,
|
title: String,
|
||||||
published: DateTime<Utc>,
|
published: DateTime<Utc>,
|
||||||
received: DateTime<Utc>,
|
received: DateTime<Utc>,
|
||||||
|
|
@ -111,21 +112,22 @@ async fn check_feeds(db: &SqlitePool, client: &reqwest::Client) {
|
||||||
|
|
||||||
async fn check_feed(
|
async fn check_feed(
|
||||||
db: SqlitePool,
|
db: SqlitePool,
|
||||||
id: i64,
|
feed_id: i64,
|
||||||
client: reqwest::Client,
|
client: reqwest::Client,
|
||||||
url: String,
|
url: String,
|
||||||
tx: tokio::sync::mpsc::UnboundedSender<FeedEntry>,
|
tx: tokio::sync::mpsc::UnboundedSender<FeedEntry>,
|
||||||
) {
|
) -> Result<Vec<FeedEntry>, String> {
|
||||||
if let Ok(rec) = sqlx::query!(
|
let rec = sqlx::query!(
|
||||||
"select date_time from runs where succeeded = true and feed = ? order by id desc limit 1",
|
"select date_time from runs where succeeded = true and feed = ? order by id desc limit 1",
|
||||||
id
|
feed_id
|
||||||
)
|
)
|
||||||
.fetch_optional(&db)
|
.fetch_optional(&db)
|
||||||
.await
|
.await
|
||||||
{
|
.map_err(|e| format!("Could not fetch runs for {url} from DB, got {e}"))?;
|
||||||
|
|
||||||
let last_fetched = rec.map(|d| d.date_time.and_utc()).unwrap_or(LAST_FETCHED);
|
let last_fetched = rec.map(|d| d.date_time.and_utc()).unwrap_or(LAST_FETCHED);
|
||||||
let now = Utc::now();
|
let now = Utc::now();
|
||||||
|
let mut out = Vec::new();
|
||||||
let feed = client.get(&url).send().await;
|
let feed = client.get(&url).send().await;
|
||||||
if let Ok(feed) = feed
|
if let Ok(feed) = feed
|
||||||
&& let Ok(feed) = feed.bytes().await
|
&& let Ok(feed) = feed.bytes().await
|
||||||
|
|
@ -135,7 +137,13 @@ async fn check_feed(
|
||||||
let last_year = now - ONE_YEAR;
|
let last_year = now - ONE_YEAR;
|
||||||
if post.published.unwrap_or(last_year) > last_fetched {
|
if post.published.unwrap_or(last_year) > last_fetched {
|
||||||
let entry = FeedEntry {
|
let entry = FeedEntry {
|
||||||
url: url.clone(),
|
url: post
|
||||||
|
.links
|
||||||
|
.first()
|
||||||
|
.cloned()
|
||||||
|
.map(|l| l.href)
|
||||||
|
.unwrap_or("".to_string()),
|
||||||
|
feed_id,
|
||||||
title: post
|
title: post
|
||||||
.title
|
.title
|
||||||
.map(|t| t.content)
|
.map(|t| t.content)
|
||||||
|
|
@ -145,15 +153,11 @@ async fn check_feed(
|
||||||
feed_description: feed.description.to_owned().map(|d| d.content),
|
feed_description: feed.description.to_owned().map(|d| d.content),
|
||||||
body: post.content.and_then(|c| c.body),
|
body: post.content.and_then(|c| c.body),
|
||||||
};
|
};
|
||||||
if let Err(e) = tx.send(entry) {
|
out.push(entry);
|
||||||
tracing::error!("error sending feed entry: {e}");
|
|
||||||
continue;
|
|
||||||
};
|
|
||||||
// update DB
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
todo!()
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn get_db_pool() -> SqlitePool {
|
async fn get_db_pool() -> SqlitePool {
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue