Compare commits

..

3 commits

Author SHA1 Message Date
Joe
ab5f97d05f heavy start on add-feeds endpoint 2025-12-14 22:53:29 -08:00
Joe
07ae653d49 better message formatting 2025-12-14 15:20:48 -08:00
Joe
a1b1bffce9 remove moro 2025-12-14 14:42:22 -08:00
5 changed files with 129 additions and 132 deletions

71
Cargo.lock generated
View file

@ -89,28 +89,6 @@ dependencies = [
"windows-sys 0.61.2", "windows-sys 0.61.2",
] ]
[[package]]
name = "async-channel"
version = "1.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "81953c529336010edd6d8e358f886d9581267795c61b19475b71314bffa46d35"
dependencies = [
"concurrent-queue",
"event-listener 2.5.3",
"futures-core",
]
[[package]]
name = "async-trait"
version = "0.1.89"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9035ad2d096bed7955a320ee7e2230574d28fd3c3a0f186cbea1ff3c7eed5dbb"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.111",
]
[[package]] [[package]]
name = "atoi" name = "atoi"
version = "2.0.0" version = "2.0.0"
@ -231,10 +209,10 @@ dependencies = [
"feed-rs", "feed-rs",
"html2md", "html2md",
"justerror", "justerror",
"moro",
"rand 0.9.2", "rand 0.9.2",
"reqwest", "reqwest",
"serde", "serde",
"serde_json",
"serde_urlencoded", "serde_urlencoded",
"sqlx", "sqlx",
"thiserror 2.0.17", "thiserror 2.0.17",
@ -558,12 +536,6 @@ dependencies = [
"windows-sys 0.48.0", "windows-sys 0.48.0",
] ]
[[package]]
name = "event-listener"
version = "2.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0"
[[package]] [[package]]
name = "event-listener" name = "event-listener"
version = "5.4.1" version = "5.4.1"
@ -662,21 +634,6 @@ dependencies = [
"new_debug_unreachable", "new_debug_unreachable",
] ]
[[package]]
name = "futures"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876"
dependencies = [
"futures-channel",
"futures-core",
"futures-executor",
"futures-io",
"futures-sink",
"futures-task",
"futures-util",
]
[[package]] [[package]]
name = "futures-channel" name = "futures-channel"
version = "0.3.31" version = "0.3.31"
@ -721,17 +678,6 @@ version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6"
[[package]]
name = "futures-macro"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.111",
]
[[package]] [[package]]
name = "futures-sink" name = "futures-sink"
version = "0.3.31" version = "0.3.31"
@ -750,10 +696,8 @@ version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81"
dependencies = [ dependencies = [
"futures-channel",
"futures-core", "futures-core",
"futures-io", "futures-io",
"futures-macro",
"futures-sink", "futures-sink",
"futures-task", "futures-task",
"memchr", "memchr",
@ -1436,17 +1380,6 @@ dependencies = [
"windows-sys 0.61.2", "windows-sys 0.61.2",
] ]
[[package]]
name = "moro"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8472c674b8319e7529bfdb3c51216810e36727be2056136d07130a0b1c132df6"
dependencies = [
"async-channel",
"async-trait",
"futures",
]
[[package]] [[package]]
name = "native-tls" name = "native-tls"
version = "0.2.14" version = "0.2.14"
@ -2247,7 +2180,7 @@ dependencies = [
"crc", "crc",
"crossbeam-queue", "crossbeam-queue",
"either", "either",
"event-listener 5.4.1", "event-listener",
"futures-core", "futures-core",
"futures-intrusive", "futures-intrusive",
"futures-io", "futures-io",

View file

@ -10,9 +10,9 @@ clap = { version = "4.5.53", features = ["derive"] }
feed-rs = { version = "2.3.1", features = ["sanitize"] } feed-rs = { version = "2.3.1", features = ["sanitize"] }
html2md = "0.2.15" html2md = "0.2.15"
justerror = "1.1.0" justerror = "1.1.0"
moro = "0.4.0"
reqwest = "0.12.24" reqwest = "0.12.24"
serde = { version = "1.0.228", features = ["derive"] } serde = { version = "1.0.228", features = ["derive"] }
serde_json = "1.0.145"
serde_urlencoded = "0.7.1" serde_urlencoded = "0.7.1"
sqlx = { version = "0.8.6", default-features = false, features = ["chrono", "derive", "macros", "migrate", "runtime-tokio", "sqlite", "tls-none"] } sqlx = { version = "0.8.6", default-features = false, features = ["chrono", "derive", "macros", "migrate", "runtime-tokio", "sqlite", "tls-none"] }
thiserror = "2.0.17" thiserror = "2.0.17"

View file

@ -2,6 +2,7 @@ use std::time::Duration;
use feed_rs::parser::parse; use feed_rs::parser::parse;
use reqwest::StatusCode; use reqwest::StatusCode;
use server::ServerState;
use sqlx::{ use sqlx::{
SqlitePool, SqlitePool,
sqlite::{SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions}, sqlite::{SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions},
@ -30,7 +31,8 @@ pub struct BlogdorTheAggregator {
endpoint: String, endpoint: String,
channel_id: u32, channel_id: u32,
email: String, email: String,
password: String, password: String, // sent *to zulip* in POSTs *from us*
token: String, // checked against incoming POSTs *from zulip*
} }
#[derive(Debug, Default, Clone, PartialEq, Eq)] #[derive(Debug, Default, Clone, PartialEq, Eq)]
@ -72,9 +74,9 @@ impl BlogdorTheAggregator {
.expect("ZULIP_CHANNEL must be set") .expect("ZULIP_CHANNEL must be set")
.parse() .parse()
.expect("ZULIP_CHANNEL must be an integer"); .expect("ZULIP_CHANNEL must be an integer");
let email = std::env::var("BLOGDOR_EMAIL").expect("BLOGDOR_EMAIL must be set");
let password = std::env::var("ZULIP_TOKEN").expect("ZULIP_TOKEN must be set"); let password = std::env::var("ZULIP_TOKEN").expect("ZULIP_TOKEN must be set");
let email = std::env::var("BLOGDOR_EMAIL").expect("BLOGDOR_EMAIL must be set");
let token = std::env::var("BLOGDOR_TOKEN").expect("BLOGDOR_TOKEN must be set");
Self { Self {
db, db,
@ -84,6 +86,7 @@ impl BlogdorTheAggregator {
channel_id, channel_id,
email, email,
password, password,
token,
} }
} }
@ -92,7 +95,8 @@ impl BlogdorTheAggregator {
} }
pub async fn spawn_http(&self) { pub async fn spawn_http(&self) {
server::spawn_server(self.db.clone(), self.cancel.clone()).await; let state = ServerState::new(self.db.clone(), &self.email, &self.token);
server::spawn_server(state, self.cancel.clone()).await;
} }
pub async fn check_feeds(&self) -> Result<Vec<Result<FeedResult, String>>, String> { pub async fn check_feeds(&self) -> Result<Vec<Result<FeedResult, String>>, String> {
@ -125,22 +129,27 @@ impl BlogdorTheAggregator {
Ok(feed_results) Ok(feed_results)
} }
// will also update the successful_runs table if it posts to zulip
pub async fn post_entries(&self, posts: &[FeedEntry]) { pub async fn post_entries(&self, posts: &[FeedEntry]) {
let FeedEntry { let FeedEntry {
feed_id, received, .. feed_id, received, ..
} = posts.last().unwrap(); } = posts.last().unwrap();
let mut success = true; let mut success = true;
for post in posts.iter() { for post in posts.iter() {
let body = post let body = post.body.as_deref().unwrap_or("Blogdor Says: NO BODY!");
.body
.iter() let tail = if body.len() < ZULIP_MESSAGE_CUTOFF {
.next() ""
.cloned() } else {
.unwrap_or("Blogdor Says: NO BODY!".to_string()); "..."
};
let content = format!( let content = format!(
"{body} ...\n\n---\noriginally posted to {}, on {}", "{body}{tail}\n\n---\noriginally published on [{}]({})",
post.post_url, post.published post.published.format("%B %e, %Y"),
post.post_url
); );
let msg = ZulipMessage { let msg = ZulipMessage {
to: self.channel_id, to: self.channel_id,
typ: "stream", typ: "stream",

View file

@ -11,6 +11,7 @@ async fn main() {
let bta = BlogdorTheAggregator::new().await; let bta = BlogdorTheAggregator::new().await;
bta.spawn_http().await; bta.spawn_http().await;
run_loop(&bta).await; run_loop(&bta).await;
bta.close_db().await; bta.close_db().await;
@ -30,8 +31,6 @@ fn init_logs() {
async fn run_loop(bta: &BlogdorTheAggregator) { async fn run_loop(bta: &BlogdorTheAggregator) {
let mut alarm = tokio::time::interval(BLOGDOR_SNOOZE); let mut alarm = tokio::time::interval(BLOGDOR_SNOOZE);
moro::async_scope!(|scope| {
scope.spawn(async {
loop { loop {
tokio::select! { tokio::select! {
biased; biased;
@ -71,7 +70,4 @@ async fn run_loop(bta: &BlogdorTheAggregator) {
} }
} }
} }
});
})
.await;
} }

View file

@ -1,15 +1,42 @@
use std::net::SocketAddr; use std::{net::SocketAddr, sync::Arc};
use axum::{Router, routing::post}; use axum::{
Router,
extract::{Json, State},
http::StatusCode,
response::IntoResponse,
routing::post,
};
use serde::Deserialize;
use serde_json::{Map, Value};
use sqlx::SqlitePool; use sqlx::SqlitePool;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
type Payload = Map<String, Value>;
#[derive(Debug, Clone)]
pub struct ServerState {
db: SqlitePool,
email: String,
token: String,
}
impl ServerState {
pub fn new(db: SqlitePool, email: &str, token: &str) -> Self {
Self {
db,
email: email.to_string(),
token: token.to_string(),
}
}
}
pub(crate) async fn spawn_server( pub(crate) async fn spawn_server(
pool: SqlitePool, state: ServerState,
cancel: CancellationToken, cancel: CancellationToken,
) -> tokio::task::JoinHandle<()> { ) -> tokio::task::JoinHandle<()> {
tokio::task::spawn(async move { tokio::task::spawn(async move {
let server = make_router(pool); let server = make_router(state);
let addr: SocketAddr = ([127, 0, 0, 1], 3000).into(); let addr: SocketAddr = ([127, 0, 0, 1], 3000).into();
tracing::debug!("binding to {addr:?}"); tracing::debug!("binding to {addr:?}");
let listener = tokio::net::TcpListener::bind(&addr).await.unwrap(); let listener = tokio::net::TcpListener::bind(&addr).await.unwrap();
@ -20,15 +47,28 @@ pub(crate) async fn spawn_server(
}) })
} }
fn make_router(db: SqlitePool) -> Router { fn make_router(state: ServerState) -> Router {
Router::new() Router::new()
.route( .route("/api/v1/add-feed", post(handle_add_feed))
"/api/v1/add-feed", .with_state(state.into())
post(async move || { }
tracing::debug!("got a post to add a feed");
}), async fn handle_add_feed(
) State(state): State<Arc<ServerState>>,
.with_state(db) Json(request): Json<AddFeedRequest>,
) -> impl IntoResponse {
let AddFeedRequest {
bot_email,
token,
message,
_rest: _,
} = request;
if state.email == bot_email && state.token == token {
tracing::debug!("gonna do a thing with {message:?}");
} else {
tracing::debug!("psych");
}
(StatusCode::IM_A_TEAPOT, "nee-ope")
} }
async fn graceful_shutdown(cancel: CancellationToken) { async fn graceful_shutdown(cancel: CancellationToken) {
@ -56,3 +96,22 @@ async fn graceful_shutdown(cancel: CancellationToken) {
} }
cancel.cancel(); cancel.cancel();
} }
#[derive(Clone, Debug, PartialEq, Eq, Deserialize)]
struct AddFeedRequest {
bot_email: String,
token: String,
message: ZulipMessage,
#[serde(flatten)]
_rest: Payload,
}
#[derive(Clone, Debug, PartialEq, Eq, Deserialize)]
struct ZulipMessage {
content: String,
sender_email: String,
sender_id: u32,
sender_full_name: String,
#[serde(flatten)]
_rest: Payload,
}