ignore non-exclusive messages, validate add requests
This commit is contained in:
parent
c1434f64b9
commit
f65bae9014
5 changed files with 137 additions and 32 deletions
10
sample-data/ignore-leading-text.json
Normal file
10
sample-data/ignore-leading-text.json
Normal file
|
|
@ -0,0 +1,10 @@
|
|||
{
|
||||
"bot_email": "blogdor-outgoing-bot@zulip-host",
|
||||
"token": "another_token",
|
||||
"message": {
|
||||
"sender_email": "sender-email",
|
||||
"sender_id": 1,
|
||||
"sender_full_name": "magoo",
|
||||
"content": "blah blah blah @**blogdor's manager** add https://proclamations.nebcorp-hias.com/atom.xml"
|
||||
}
|
||||
}
|
||||
10
sample-data/valid-add.json
Normal file
10
sample-data/valid-add.json
Normal file
|
|
@ -0,0 +1,10 @@
|
|||
{
|
||||
"bot_email": "blogdor-outgoing-bot@zulip-host",
|
||||
"token": "another_token",
|
||||
"message": {
|
||||
"sender_email": "sender-email",
|
||||
"sender_id": 1,
|
||||
"sender_full_name": "magoo",
|
||||
"content": "@**blogdor's manager** add https://proclamations.nebcorp-hias.com/atom.xml"
|
||||
}
|
||||
}
|
||||
40
src/lib.rs
40
src/lib.rs
|
|
@ -1,7 +1,7 @@
|
|||
use std::time::Duration;
|
||||
|
||||
use feed_rs::parser::parse;
|
||||
use reqwest::{Response, StatusCode};
|
||||
use reqwest::{Client, Response, StatusCode};
|
||||
use server::ServerState;
|
||||
use sqlx::{
|
||||
SqlitePool,
|
||||
|
|
@ -83,7 +83,7 @@ enum MessageType {
|
|||
impl BlogdorTheAggregator {
|
||||
pub async fn new() -> Self {
|
||||
let db = get_db_pool().await;
|
||||
let client = reqwest::Client::new(); // TODO: retries?
|
||||
let client = reqwest::Client::new();
|
||||
let cancel = CancellationToken::new();
|
||||
let endpoint = std::env::var("ZULIP_URL").expect("ZULIP_URL must be set");
|
||||
let channel_id: u32 = std::env::var("ZULIP_CHANNEL")
|
||||
|
|
@ -110,16 +110,21 @@ impl BlogdorTheAggregator {
|
|||
}
|
||||
}
|
||||
|
||||
pub fn client(&self) -> reqwest::Client {
|
||||
self.client.clone()
|
||||
}
|
||||
|
||||
pub async fn cancelled(&self) {
|
||||
self.cancel.cancelled().await
|
||||
}
|
||||
|
||||
pub async fn spawn_http(&self, announce_tx: UnboundedSender<NewFeed>) {
|
||||
pub async fn spawn_http(&self, announce_tx: UnboundedSender<NewFeed>, client: reqwest::Client) {
|
||||
let state = ServerState::new(
|
||||
self.db.clone(),
|
||||
&self.zulip_to_blogdor_email,
|
||||
&self.blogdor_token,
|
||||
announce_tx,
|
||||
client,
|
||||
);
|
||||
server::spawn_server(state, self.cancel.clone()).await;
|
||||
}
|
||||
|
|
@ -205,6 +210,8 @@ impl BlogdorTheAggregator {
|
|||
};
|
||||
if let Err(e) = self.send_zulip_message(&msg).await {
|
||||
tracing::error!("error sending zulip message to user {user}: {e}");
|
||||
} else {
|
||||
tracing::debug!("sent DM to {user} about {url} being fucked");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -345,17 +352,8 @@ async fn check_feed(
|
|||
tracing::debug!("checking {url}");
|
||||
let last_fetched = rec.map(|d| d.date_time.and_utc()).unwrap_or(LAST_FETCHED);
|
||||
let now = Utc::now();
|
||||
let mut entries = None;
|
||||
let feed = client
|
||||
.get(&url)
|
||||
.send()
|
||||
.await
|
||||
.map_err(|e| format!("could not get feed from {url}, got {e}"))?
|
||||
.bytes()
|
||||
.await
|
||||
.map_err(|e| format!("could not get bytes from response from {url}, got {e}"))?;
|
||||
let mut feed =
|
||||
parse(feed.reader()).map_err(|e| format!("could not parse feed from {url}, got {e}"))?;
|
||||
|
||||
let mut feed = fetch_and_parse_feed(&url, &client).await?;
|
||||
|
||||
if let Err(e) = sqlx::query!("insert into fetches (feed) values (?)", feed_id)
|
||||
.execute(&db)
|
||||
|
|
@ -364,6 +362,7 @@ async fn check_feed(
|
|||
tracing::error!("got error inserting {feed_id} into fetches: {e}");
|
||||
}
|
||||
|
||||
let mut entries = None;
|
||||
feed.entries.sort_by_key(|e| std::cmp::Reverse(e.posted()));
|
||||
for post in feed.entries.into_iter().take(5) {
|
||||
if post.posted().unwrap_or(LAST_FETCHED) > last_fetched {
|
||||
|
|
@ -405,6 +404,19 @@ async fn check_feed(
|
|||
})
|
||||
}
|
||||
|
||||
async fn fetch_and_parse_feed(url: &str, client: &Client) -> Result<feed_rs::model::Feed, String> {
|
||||
let feed = client
|
||||
.get(url)
|
||||
.send()
|
||||
.await
|
||||
.map_err(|e| format!("could not get feed from {url}, got {e}"))?
|
||||
.bytes()
|
||||
.await
|
||||
.map_err(|e| format!("could not get bytes from response from {url}, got {e}"))?;
|
||||
|
||||
parse(feed.reader()).map_err(|e| format!("could not parse feed from {url}, got {e}"))
|
||||
}
|
||||
|
||||
async fn get_db_pool() -> SqlitePool {
|
||||
let db_filename = {
|
||||
std::env::var("DATABASE_FILE").unwrap_or_else(|_| {
|
||||
|
|
|
|||
|
|
@ -12,7 +12,7 @@ async fn main() {
|
|||
|
||||
let bta = BlogdorTheAggregator::new().await;
|
||||
let (tx, rx) = unbounded_channel();
|
||||
bta.spawn_http(tx).await;
|
||||
bta.spawn_http(tx, bta.client()).await;
|
||||
|
||||
run_loop(&bta, rx).await;
|
||||
|
||||
|
|
|
|||
107
src/server.rs
107
src/server.rs
|
|
@ -17,7 +17,7 @@ use winnow::{
|
|||
ascii::{newline, space0},
|
||||
combinator::{alt, eof, fail},
|
||||
error::{StrContext, StrContextValue},
|
||||
token::take_while,
|
||||
token::{literal, take_until, take_while},
|
||||
};
|
||||
|
||||
use crate::NewFeed;
|
||||
|
|
@ -27,6 +27,7 @@ type Payload = Map<String, Value>;
|
|||
#[derive(Debug, Clone)]
|
||||
pub struct ServerState {
|
||||
db: SqlitePool,
|
||||
client: reqwest::Client,
|
||||
email: String,
|
||||
token: String,
|
||||
announce_tx: UnboundedSender<NewFeed>,
|
||||
|
|
@ -38,9 +39,11 @@ impl ServerState {
|
|||
email: &str,
|
||||
token: &str,
|
||||
announce_tx: UnboundedSender<NewFeed>,
|
||||
client: reqwest::Client,
|
||||
) -> Self {
|
||||
Self {
|
||||
db,
|
||||
client,
|
||||
announce_tx,
|
||||
email: email.to_string(),
|
||||
token: token.to_string(),
|
||||
|
|
@ -92,29 +95,37 @@ async fn handle_manage_feed(
|
|||
let mut resp: HashMap<&str, String> = HashMap::new();
|
||||
|
||||
let command = parse_command(&mut content.as_str());
|
||||
|
||||
let command = match command {
|
||||
Err(e) => {
|
||||
resp.insert("content", e.to_string());
|
||||
return Json(resp).into_response();
|
||||
}
|
||||
Ok(c) => c,
|
||||
Ok(c) => {
|
||||
let Some(c) = c else {
|
||||
return StatusCode::OK.into_response();
|
||||
};
|
||||
c
|
||||
}
|
||||
};
|
||||
|
||||
tracing::debug!(command = ?command);
|
||||
|
||||
match command.action {
|
||||
Action::Add => match add_feed(&state.db, sender_id, command.feed).await {
|
||||
Ok(_) => {
|
||||
let _ = state.announce_tx.send(NewFeed {
|
||||
feed: command.feed.to_string(),
|
||||
user: sender_full_name,
|
||||
});
|
||||
resp.insert("content", "Blogdor Says: SUCCESS!".to_string());
|
||||
Action::Add => {
|
||||
match add_feed(&state.db, sender_id, command.feed, &state.client).await {
|
||||
Ok(_) => {
|
||||
let _ = state.announce_tx.send(NewFeed {
|
||||
feed: command.feed.to_string(),
|
||||
user: sender_full_name,
|
||||
});
|
||||
resp.insert("content", "Blogdor Says: SUCCESS!".to_string());
|
||||
}
|
||||
Err(e) => {
|
||||
resp.insert("content", format!("Blogdor Says: OH NO! {e}"));
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
resp.insert("content", format!("Blogdor Says: OH NO! {e}"));
|
||||
}
|
||||
},
|
||||
}
|
||||
Action::Remove => match remove_feed(&state.db, sender_id, command.feed).await {
|
||||
Ok(_) => {
|
||||
resp.insert("content", "Blogdor Says: BURNINATED!".to_string());
|
||||
|
|
@ -159,8 +170,16 @@ async fn remove_feed(db: &SqlitePool, user: u32, feed: &str) -> Result<(), Strin
|
|||
Ok(())
|
||||
}
|
||||
|
||||
async fn add_feed(db: &SqlitePool, user: u32, feed: &str) -> Result<(), String> {
|
||||
async fn add_feed(
|
||||
db: &SqlitePool,
|
||||
user: u32,
|
||||
feed: &str,
|
||||
client: &reqwest::Client,
|
||||
) -> Result<(), String> {
|
||||
add_user(db, user).await?;
|
||||
|
||||
let _ = crate::fetch_and_parse_feed(feed, client).await?;
|
||||
|
||||
sqlx::query!(
|
||||
"update feeds set active = true, updated_at = current_timestamp where url = ? and added_by = ?",
|
||||
feed,
|
||||
|
|
@ -277,8 +296,18 @@ struct FeedCommand<'req> {
|
|||
action: Action,
|
||||
}
|
||||
|
||||
fn parse_command<'i>(input: &mut &'i str) -> winnow::Result<FeedCommand<'i>> {
|
||||
let _ = alt(("@**blogdor's manager**", space0)).parse_next(input)?;
|
||||
fn parse_command<'i>(input: &mut &'i str) -> winnow::Result<Option<FeedCommand<'i>>> {
|
||||
let s = take_until::<_, _, ()>(0.., "@**blogdor's manager**").parse_next(input);
|
||||
match s {
|
||||
Err(_) => {}
|
||||
Ok(s) => {
|
||||
if !s.trim().is_empty() {
|
||||
return Ok(None);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let _ = literal::<_, _, ()>("@**blogdor's manager**").parse_next(input);
|
||||
|
||||
let action = (
|
||||
space0,
|
||||
|
|
@ -307,5 +336,49 @@ fn parse_command<'i>(input: &mut &'i str) -> winnow::Result<FeedCommand<'i>> {
|
|||
)
|
||||
.map(|(_, f, _, _)| f)
|
||||
.parse_next(input)?;
|
||||
Ok(FeedCommand { feed, action })
|
||||
Ok(Some(FeedCommand { feed, action }))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn command() {
|
||||
let mut input = "blah blah blah @**blogdor's manager** yo yo";
|
||||
let c = parse_command(&mut input).unwrap();
|
||||
assert!(c.is_none());
|
||||
|
||||
let mut input = "@**blogdor's manager** yo yo";
|
||||
let c = parse_command(&mut input);
|
||||
assert!(c.is_err());
|
||||
assert_eq!(input, "yo yo");
|
||||
|
||||
let mut input = "@**blogdor's manager** add feed-url";
|
||||
let c = parse_command(&mut input).unwrap().unwrap();
|
||||
assert_eq!(input, "");
|
||||
assert_eq!(
|
||||
c,
|
||||
FeedCommand {
|
||||
feed: "feed-url",
|
||||
action: Action::Add
|
||||
}
|
||||
);
|
||||
|
||||
let mut input = "remove feed-url";
|
||||
let c = parse_command(&mut input).unwrap().unwrap();
|
||||
assert_eq!(input, "");
|
||||
assert_eq!(
|
||||
c,
|
||||
FeedCommand {
|
||||
feed: "feed-url",
|
||||
action: Action::Remove
|
||||
}
|
||||
);
|
||||
|
||||
let mut input = "yo yo";
|
||||
let c = parse_command(&mut input);
|
||||
assert!(c.is_err());
|
||||
assert_eq!(input, "yo yo");
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue