initial skeleton: listens for POSTS to add feeds, checks for new feed content every hour.

This commit is contained in:
Joe 2025-12-07 14:14:45 -08:00
commit 3114a88619
12 changed files with 3543 additions and 0 deletions

2
.env Normal file
View file

@ -0,0 +1,2 @@
DATABASE_URL=sqlite://./blogdor.db
DATABASE_FILE=./blogdor.db

2
.gitignore vendored Normal file
View file

@ -0,0 +1,2 @@
/target
blogdor.db

3273
Cargo.lock generated Normal file

File diff suppressed because it is too large Load diff

21
Cargo.toml Normal file
View file

@ -0,0 +1,21 @@
[package]
name = "blogdor"
description = "Blogdor the Aggregator"
version = "0.1.0"
edition = "2024"
[dependencies]
axum = { version = "0.8.7", default-features = false, features = ["http1", "http2", "json", "macros", "tokio"] }
clap = { version = "4.5.53", features = ["derive"] }
feed-rs = { version = "2.3.1", features = ["sanitize"] }
justerror = "1.1.0"
reqwest = "0.12.24"
sqlx = { version = "0.8.6", default-features = false, features = ["chrono", "derive", "macros", "migrate", "runtime-tokio", "sqlite", "tls-none"] }
thiserror = "2.0.17"
tokio = { version = "1.48.0", features = ["full"] }
tokio-util = "0.7.17"
tracing = "0.1.43"
tracing-subscriber = { version = "0.3.22", features = ["env-filter"] }
[dev-dependencies]
rand = "0.9.2"

10
README.md Normal file
View file

@ -0,0 +1,10 @@
# Dev Setup
This project uses [statically-verified queries](https://docs.rs/sqlx/latest/sqlx/macro.query.html),
which require the presence of a database with the right schema.
- Install the SQLx CLI: `cargo install sqlx-cli`
- Create the DB: `sqlx db create` -- this will create a sqlite DB in the current directory called
`blogdor.db`
- Run the migrations: `sqlx migrate run`
- Live happily ever after.

5
build.rs Normal file
View file

@ -0,0 +1,5 @@
// generated by `sqlx migrate build-script`
fn main() {
// trigger recompilation when a new migration is added
println!("cargo:rerun-if-changed=migrations");
}

View file

@ -0,0 +1 @@
DROP TABLE IF EXISTS feeds;

View file

@ -0,0 +1,9 @@
CREATE TABLE IF NOT EXISTS feeds (
id INTEGER PRIMARY KEY,
url TEXT UNIQUE NOT NULL,
added_by TEXT NOT NULL,
last_modified_by TEXT NOT NULL,
active BOOLEAN NOT NULL DEFAULT FALSE,
created_at DATETIME NOT NULL DEFAULT current_timestamp,
updated_at DATETIME NOT NULL DEFAULT current_timestamp
);

View file

@ -0,0 +1 @@
DROP TABLE IF EXISTS runs;

View file

@ -0,0 +1,7 @@
CREATE TABLE IF NOT EXISTS runs (
id INTEGER PRIMARY KEY,
date_time DATETIME NOT NULL DEFAULT current_timestamp,
succeeded BOOLEAN NOT NULL DEFAULT FALSE,
feed INTEGER NOT NULL,
FOREIGN KEY (feed) REFERENCES feeds(id)
);

133
src/lib.rs Normal file
View file

@ -0,0 +1,133 @@
use std::{
//sync::{Arc, OnceLock},
time::Duration,
};
use reqwest::Client;
use sqlx::{
sqlite::{SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions},
types::chrono::DateTime,
types::chrono::Utc,
SqlitePool,
};
use tokio_util::sync::CancellationToken;
const MAX_CONNS: u32 = 200;
const MIN_CONNS: u32 = 5;
const TIMEOUT: u64 = 2000; // in milliseconds
pub struct BlogdorTheAggregator {
pub db: SqlitePool,
pub client: reqwest::Client,
}
#[derive(Debug, Default, Clone, PartialEq, Eq)]
pub struct FeedEntry {
url: String,
title: String,
published: DateTime<Utc>,
received: DateTime<Utc>,
description: Option<String>,
body: Option<String>,
}
impl BlogdorTheAggregator {
pub async fn new() -> Self {
let db = get_db_pool().await;
let client = reqwest::Client::new();
Self { db, client }
}
pub async fn aggregate(&self, cancel: CancellationToken) {
let db = self.db.clone();
let client = self.client.clone();
tokio::task::spawn(async move {
let mut alarm = tokio::time::interval(Duration::from_hours(1));
loop {
tokio::select! {
_ = alarm.tick() => {
check_feeds(&db, &client).await;
}
_ = cancel.cancelled() => {
tracing::info!("shutting down the aggregation loop");
break;
}
}
}
});
}
}
async fn check_feeds(_db: &SqlitePool, _client: &Client) {
tracing::debug!("checking feeds");
}
async fn get_db_pool() -> SqlitePool {
let db_filename = {
std::env::var("DATABASE_FILE").unwrap_or_else(|_| {
#[cfg(not(test))]
{
"blogdor.db".to_string()
}
#[cfg(test)]
{
use rand::RngCore;
let mut rng = rand::rng();
let id = rng.next_u64();
// see https://www.sqlite.org/inmemorydb.html for meaning of the string;
// it allows each separate test to have its own dedicated memory-backed db that
// will live as long as the whole process
format!("file:testdb-{id}?mode=memory&cache=shared")
}
})
};
tracing::info!("Connecting to DB at {db_filename}");
let conn_opts = SqliteConnectOptions::new()
.foreign_keys(true)
.journal_mode(SqliteJournalMode::Wal)
.synchronous(sqlx::sqlite::SqliteSynchronous::Normal)
.filename(&db_filename)
.busy_timeout(Duration::from_secs(TIMEOUT))
.pragma("temp_store", "memory")
.create_if_missing(true)
.optimize_on_close(true, None)
.pragma("mmap_size", "3000000000");
let pool = SqlitePoolOptions::new()
.max_connections(MAX_CONNS)
.min_connections(MIN_CONNS)
.idle_timeout(Some(Duration::from_secs(3)))
.max_lifetime(Some(Duration::from_secs(3600)))
.connect_with(conn_opts)
.await
.expect("could not get sqlite pool");
sqlx::migrate!()
.run(&pool)
.await
.expect("could not run migrations");
tracing::info!("Ran migrations");
pool
}
//-************************************************************************
// Tests for `db` module.
//-************************************************************************
#[cfg(test)]
mod tests {
#[tokio::test]
async fn it_migrates_the_db() {
let db = super::get_db_pool().await;
let r = sqlx::query!("select count(*) as count from feeds")
.fetch_one(&db)
.await;
assert!(r.is_ok());
}
}

79
src/main.rs Normal file
View file

@ -0,0 +1,79 @@
use axum::{routing::post, Router};
use blogdor::BlogdorTheAggregator;
use sqlx::SqlitePool;
use std::net::SocketAddr;
use tokio_util::sync::CancellationToken;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
#[tokio::main(flavor = "multi_thread")]
async fn main() -> Result<(), ()> {
tracing_subscriber::registry()
.with(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| "blogdor=debug,axum=debug".into()),
)
.with(tracing_subscriber::fmt::layer())
.init();
let blogdor_the_aggregator = BlogdorTheAggregator::new().await;
let pool = blogdor_the_aggregator.db.clone();
let server = make_router(pool.clone());
let cancel = CancellationToken::new();
blogdor_the_aggregator.aggregate(cancel.clone()).await;
let server_handle = tokio::task::spawn(async move {
let addr: SocketAddr = ([127, 0, 0, 1], 3000).into();
tracing::debug!("binding to {addr:?}");
let listener = tokio::net::TcpListener::bind(&addr).await.unwrap();
axum::serve(listener, server)
.with_graceful_shutdown(graceful_shutdown(cancel))
.await
.unwrap()
});
server_handle.await.unwrap();
pool.close().await;
Ok(())
}
fn make_router(db: SqlitePool) -> Router {
Router::new()
.route(
"/api/v1/add-feed",
post(async move || {
tracing::debug!("got a post to add a feed");
}),
)
.with_state(db)
}
async fn graceful_shutdown(cancel: CancellationToken) {
use tokio::signal;
let ctrl_c = async {
signal::ctrl_c()
.await
.expect("failed to install Ctrl+C handler");
};
#[cfg(unix)]
let terminate = async {
signal::unix::signal(signal::unix::SignalKind::terminate())
.expect("failed to install signal handler")
.recv()
.await;
};
#[cfg(not(unix))]
let terminate = std::future::pending::<()>();
tokio::select! {
_ = ctrl_c => {tracing::info!("received ctrl-c, shutting down web server")},
_ = terminate => {tracing::info!("received kill signal, shutting down web server")},
}
cancel.cancel();
}