Compare commits
No commits in common. "fe1318ba4bed791193a0daeb3f468ff8790c0be4" and "452e413edec78f937f7342f468ef5210eb65094b" have entirely different histories.
fe1318ba4b
...
452e413ede
7 changed files with 39 additions and 155 deletions
1
migrations/.#0001_users.up.sql
Symbolic link
1
migrations/.#0001_users.up.sql
Symbolic link
|
|
@ -0,0 +1 @@
|
|||
ardent@pinkee.17538:1750983153
|
||||
|
|
@ -1 +1 @@
|
|||
DROP TABLE IF EXISTS users;
|
||||
-- Add down migration script here
|
||||
|
|
|
|||
|
|
@ -1,4 +1,3 @@
|
|||
CREATE TABLE IF NOT EXISTS users (
|
||||
id INTEGER PRIMARY KEY,
|
||||
zulip_id INT UNIQUE NOT NULL
|
||||
|
||||
);
|
||||
|
|
|
|||
|
|
@ -2,8 +2,8 @@ CREATE TABLE IF NOT EXISTS feeds (
|
|||
id INTEGER PRIMARY KEY,
|
||||
url TEXT UNIQUE NOT NULL,
|
||||
added_by INT NOT NULL,
|
||||
last_modified_by TEXT NOT NULL,
|
||||
active BOOLEAN NOT NULL DEFAULT FALSE,
|
||||
created_at DATETIME NOT NULL DEFAULT current_timestamp,
|
||||
updated_at DATETIME NOT NULL DEFAULT current_timestamp,
|
||||
FOREIGN KEY (added_by) REFERENCES users(zulip_id)
|
||||
updated_at DATETIME NOT NULL DEFAULT current_timestamp
|
||||
);
|
||||
|
|
|
|||
56
src/lib.rs
56
src/lib.rs
|
|
@ -1,14 +1,14 @@
|
|||
use std::time::Duration;
|
||||
|
||||
use feed_rs::parser::parse;
|
||||
use reqwest::{Response, StatusCode};
|
||||
use reqwest::StatusCode;
|
||||
use server::ServerState;
|
||||
use sqlx::{
|
||||
SqlitePool,
|
||||
sqlite::{SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions},
|
||||
types::chrono::{DateTime, Utc},
|
||||
};
|
||||
use tokio::{sync::mpsc::UnboundedSender, task::JoinSet};
|
||||
use tokio::task::JoinSet;
|
||||
use tokio_util::{bytes::Buf, sync::CancellationToken};
|
||||
use unicode_segmentation::UnicodeSegmentation;
|
||||
|
||||
|
|
@ -55,12 +55,6 @@ pub struct FeedResult {
|
|||
pub feed_id: i64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct NewFeed {
|
||||
feed: String,
|
||||
user: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default, Clone, PartialEq, Eq, serde::Serialize)]
|
||||
struct ZulipMessage<'s> {
|
||||
to: u32,
|
||||
|
|
@ -105,12 +99,11 @@ impl BlogdorTheAggregator {
|
|||
self.cancel.cancelled().await
|
||||
}
|
||||
|
||||
pub async fn spawn_http(&self, announce_tx: UnboundedSender<NewFeed>) {
|
||||
pub async fn spawn_http(&self) {
|
||||
let state = ServerState::new(
|
||||
self.db.clone(),
|
||||
&self.zulip_to_blogdor_email,
|
||||
&self.blogdor_token,
|
||||
announce_tx,
|
||||
);
|
||||
server::spawn_server(state, self.cancel.clone()).await;
|
||||
}
|
||||
|
|
@ -145,26 +138,6 @@ impl BlogdorTheAggregator {
|
|||
Ok(feed_results)
|
||||
}
|
||||
|
||||
pub async fn announce_feed(&self, announce: &NewFeed) {
|
||||
let content = format!("{} added a new feed: {}", announce.user, announce.feed);
|
||||
let msg = ZulipMessage {
|
||||
to: self.channel_id,
|
||||
typ: "stream",
|
||||
content,
|
||||
topic: Some("New feeds"),
|
||||
};
|
||||
match self.send_zulip_message(&msg).await {
|
||||
Err(e) => {
|
||||
tracing::error!("got error sending to zulip: {e}");
|
||||
}
|
||||
Ok(r) => {
|
||||
if r.status() != StatusCode::OK {
|
||||
tracing::warn!("did not successfully post to zulip: status {}", r.status());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// will also update the successful_runs table if it posts to zulip
|
||||
pub async fn post_entries(&self, posts: &[FeedEntry]) {
|
||||
let FeedEntry {
|
||||
|
|
@ -192,8 +165,17 @@ impl BlogdorTheAggregator {
|
|||
content,
|
||||
topic: Some(&post.title),
|
||||
};
|
||||
let msg = serde_urlencoded::to_string(msg).expect("serialize msg");
|
||||
|
||||
match self.send_zulip_message(&msg).await {
|
||||
match self
|
||||
.client
|
||||
.post(&self.endpoint)
|
||||
.basic_auth(&self.blogdor_to_zulip_email, Some(&self.zulip_token))
|
||||
.body(msg)
|
||||
.header("Content-Type", "application/x-www-form-urlencoded")
|
||||
.send()
|
||||
.await
|
||||
{
|
||||
Err(e) => {
|
||||
tracing::error!("got error sending to zulip: {e}");
|
||||
success = false;
|
||||
|
|
@ -225,18 +207,6 @@ impl BlogdorTheAggregator {
|
|||
pub async fn close_db(&self) {
|
||||
self.db.close().await;
|
||||
}
|
||||
|
||||
async fn send_zulip_message<'s>(&'s self, msg: &ZulipMessage<'s>) -> Result<Response, String> {
|
||||
let msg = serde_urlencoded::to_string(msg).expect("serialize msg");
|
||||
self.client
|
||||
.post(&self.endpoint)
|
||||
.basic_auth(&self.blogdor_to_zulip_email, Some(&self.zulip_token))
|
||||
.body(msg)
|
||||
.header("Content-Type", "application/x-www-form-urlencoded")
|
||||
.send()
|
||||
.await
|
||||
.map_err(|e| format!("{e}"))
|
||||
}
|
||||
}
|
||||
|
||||
// takes args by value because it's meant to be called from inside a spawned
|
||||
|
|
|
|||
15
src/main.rs
15
src/main.rs
|
|
@ -1,7 +1,6 @@
|
|||
use std::time::Duration;
|
||||
|
||||
use blogdor::{BlogdorTheAggregator, NewFeed};
|
||||
use tokio::sync::mpsc::{UnboundedReceiver, unbounded_channel};
|
||||
use blogdor::BlogdorTheAggregator;
|
||||
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
|
||||
|
||||
const BLOGDOR_SNOOZE: Duration = Duration::from_hours(1);
|
||||
|
|
@ -11,10 +10,9 @@ async fn main() {
|
|||
init_logs();
|
||||
|
||||
let bta = BlogdorTheAggregator::new().await;
|
||||
let (tx, rx) = unbounded_channel();
|
||||
bta.spawn_http(tx).await;
|
||||
bta.spawn_http().await;
|
||||
|
||||
run_loop(&bta, rx).await;
|
||||
run_loop(&bta).await;
|
||||
|
||||
bta.close_db().await;
|
||||
|
||||
|
|
@ -31,16 +29,11 @@ fn init_logs() {
|
|||
.init();
|
||||
}
|
||||
|
||||
async fn run_loop(bta: &BlogdorTheAggregator, mut announce_rx: UnboundedReceiver<NewFeed>) {
|
||||
async fn run_loop(bta: &BlogdorTheAggregator) {
|
||||
let mut alarm = tokio::time::interval(BLOGDOR_SNOOZE);
|
||||
loop {
|
||||
tokio::select! {
|
||||
biased;
|
||||
announce = announce_rx.recv() => {
|
||||
if let Some(announce) = announce {
|
||||
bta.announce_feed(&announce).await;
|
||||
}
|
||||
}
|
||||
_ = alarm.tick() => {
|
||||
match bta.check_feeds().await {
|
||||
Ok(results) => {
|
||||
|
|
|
|||
113
src/server.rs
113
src/server.rs
|
|
@ -10,18 +10,15 @@ use axum::{
|
|||
use serde::Deserialize;
|
||||
use serde_json::{Map, Value};
|
||||
use sqlx::SqlitePool;
|
||||
use tokio::sync::mpsc::UnboundedSender;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use winnow::{
|
||||
Parser,
|
||||
ascii::{newline, space0},
|
||||
combinator::{alt, eof, fail},
|
||||
error::{StrContext, StrContextValue},
|
||||
ascii::space0,
|
||||
combinator::{alt, fail},
|
||||
error::StrContext,
|
||||
token::take_while,
|
||||
};
|
||||
|
||||
use crate::NewFeed;
|
||||
|
||||
type Payload = Map<String, Value>;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
|
|
@ -29,19 +26,12 @@ pub struct ServerState {
|
|||
db: SqlitePool,
|
||||
email: String,
|
||||
token: String,
|
||||
announce_tx: UnboundedSender<NewFeed>,
|
||||
}
|
||||
|
||||
impl ServerState {
|
||||
pub fn new(
|
||||
db: SqlitePool,
|
||||
email: &str,
|
||||
token: &str,
|
||||
announce_tx: UnboundedSender<NewFeed>,
|
||||
) -> Self {
|
||||
pub fn new(db: SqlitePool, email: &str, token: &str) -> Self {
|
||||
Self {
|
||||
db,
|
||||
announce_tx,
|
||||
email: email.to_string(),
|
||||
token: token.to_string(),
|
||||
}
|
||||
|
|
@ -80,10 +70,13 @@ async fn handle_manage_feed(
|
|||
message,
|
||||
_rest: _,
|
||||
} = request;
|
||||
|
||||
tracing::debug!("email: {bot_email}, token: {token}");
|
||||
if state.email == bot_email && state.token == token {
|
||||
tracing::debug!("gonna do a thing with {message:?}");
|
||||
|
||||
let ZulipMessage {
|
||||
content,
|
||||
mut content,
|
||||
ref sender_email,
|
||||
sender_id,
|
||||
sender_full_name,
|
||||
..
|
||||
|
|
@ -94,83 +87,18 @@ async fn handle_manage_feed(
|
|||
let command = parse_command(&mut content.as_str());
|
||||
let command = match command {
|
||||
Err(e) => {
|
||||
resp.insert("content", e.to_string());
|
||||
resp["content"] = format!("I did not understand: {e}");
|
||||
return Json(resp).into_response();
|
||||
}
|
||||
Ok(c) => c,
|
||||
};
|
||||
|
||||
tracing::debug!(command = ?command);
|
||||
|
||||
match command.action {
|
||||
Action::Add => match add_feed(&state.db, sender_id, command.feed).await {
|
||||
Ok(_) => {
|
||||
let _ = state.announce_tx.send(NewFeed {
|
||||
feed: command.feed.to_string(),
|
||||
user: sender_full_name,
|
||||
});
|
||||
resp.insert("content", "Blogdor Says: SUCCESS!".to_string());
|
||||
}
|
||||
Err(e) => {
|
||||
resp.insert("content", format!("Blogdor Says: OH NO! {e}"));
|
||||
}
|
||||
},
|
||||
Action::Remove => return StatusCode::UNAVAILABLE_FOR_LEGAL_REASONS.into_response(),
|
||||
Action::Help => {
|
||||
resp.insert("content", "DM or `@blogdor's manager` with `add <feed url, RSS or Atom XML files>`, `remove <feed url originally added by you>`, or `help` to get this message (duh).".to_string());
|
||||
}
|
||||
}
|
||||
|
||||
Json(resp).into_response()
|
||||
} else {
|
||||
StatusCode::IM_A_TEAPOT.into_response()
|
||||
}
|
||||
}
|
||||
|
||||
async fn add_feed(db: &SqlitePool, user: u32, feed: &str) -> Result<(), String> {
|
||||
add_user(db, user).await?;
|
||||
|
||||
sqlx::query!(
|
||||
"insert into feeds (url, added_by, active) values (?, ?, true)",
|
||||
feed,
|
||||
user
|
||||
)
|
||||
.execute(db)
|
||||
.await
|
||||
.map_err(|e| format!("{e}"))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn add_user(db: &SqlitePool, user: u32) -> Result<(), String> {
|
||||
if let Err(e) = sqlx::query!("insert into users (zulip_id) values (?)", user)
|
||||
.execute(db)
|
||||
.await
|
||||
{
|
||||
match e {
|
||||
sqlx::Error::Database(database_error) => {
|
||||
// the users table has only one constraint, which is a uniqueness one on
|
||||
// zulip_id, so if it's violated, we don't care, it just means we already have
|
||||
// that user; if it's not a constraint violation, then something
|
||||
// else and bad has happened
|
||||
if database_error.constraint().is_some() {
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
sqlx::Error::Io(error) => {
|
||||
tracing::error!("got IO error: {error}");
|
||||
return Err("you should maybe retry that".to_string());
|
||||
}
|
||||
_ => return Err("yikes".to_string()),
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn _remove_feed() -> Result<(), String> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
async fn graceful_shutdown(cancel: CancellationToken) {
|
||||
use tokio::signal;
|
||||
let ctrl_c = async {
|
||||
|
|
@ -209,37 +137,35 @@ struct ManageFeedMessage {
|
|||
#[derive(Clone, Debug, PartialEq, Eq, Deserialize)]
|
||||
struct ZulipMessage {
|
||||
content: String,
|
||||
sender_email: String,
|
||||
sender_id: u32,
|
||||
sender_full_name: String,
|
||||
#[serde(flatten)]
|
||||
_rest: Payload,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
enum Action {
|
||||
Add,
|
||||
Remove,
|
||||
Help,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
struct FeedCommand<'req> {
|
||||
feed: &'req str,
|
||||
action: Action,
|
||||
}
|
||||
|
||||
fn parse_command<'i>(input: &mut &'i str) -> winnow::Result<FeedCommand<'i>> {
|
||||
let _ = alt(("@**blogdor's manager**", space0)).parse_next(input)?;
|
||||
|
||||
let _ = "**@blogdor's manager**".parse_next(input)?;
|
||||
let action = (
|
||||
space0,
|
||||
alt((
|
||||
"add",
|
||||
"remove",
|
||||
"help",
|
||||
fail.context(StrContext::Expected(StrContextValue::Description(
|
||||
"`add <feed url>`, `remove <feed url>`, or `help`",
|
||||
))),
|
||||
fail.context(StrContext::Expected(
|
||||
"supported commands are `add`, `remove`, or `help`".into(),
|
||||
)),
|
||||
)),
|
||||
)
|
||||
.map(|(_, a)| match a {
|
||||
|
|
@ -250,13 +176,8 @@ fn parse_command<'i>(input: &mut &'i str) -> winnow::Result<FeedCommand<'i>> {
|
|||
})
|
||||
.parse_next(input)?;
|
||||
|
||||
let feed = (
|
||||
space0,
|
||||
take_while(0.., |c: char| !c.is_whitespace()),
|
||||
space0,
|
||||
alt((eof.void(), newline.void())),
|
||||
)
|
||||
.map(|(_, f, _, _)| f)
|
||||
let feed = (space0, take_while(0.., |c: char| !c.is_whitespace()))
|
||||
.map(|(_, f)| f)
|
||||
.parse_next(input)?;
|
||||
Ok(FeedCommand { feed, action })
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue