support more than adds
This commit is contained in:
parent
18c8749f91
commit
45a56b9fe9
3 changed files with 99 additions and 132 deletions
97
src/lib.rs
97
src/lib.rs
|
|
@ -55,23 +55,36 @@ pub struct FeedEntry {
|
||||||
body: Option<String>,
|
body: Option<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Default, Clone, PartialEq, Eq)]
|
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||||
pub struct FeedResult {
|
pub struct FeedResult {
|
||||||
pub entries: Option<Vec<FeedEntry>>,
|
pub entries: Option<Vec<FeedEntry>>,
|
||||||
pub url: String,
|
pub url: String,
|
||||||
pub feed_id: i64,
|
pub feed_id: i64,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||||
pub struct NewFeed {
|
pub struct FeedCommand {
|
||||||
url: String,
|
feed: String,
|
||||||
owner: u32,
|
action: Action,
|
||||||
result_sender: UnboundedSender<Result<(), String>>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl PartialEq for NewFeed {
|
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||||
|
pub enum Action {
|
||||||
|
Add,
|
||||||
|
Remove,
|
||||||
|
Help,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct UserRequest {
|
||||||
|
command: FeedCommand,
|
||||||
|
owner: u32,
|
||||||
|
result_sender: UnboundedSender<Result<String, String>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl PartialEq for UserRequest {
|
||||||
fn eq(&self, other: &Self) -> bool {
|
fn eq(&self, other: &Self) -> bool {
|
||||||
self.url == other.url && self.owner == other.owner
|
self.command == other.command && self.owner == other.owner
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -149,12 +162,11 @@ impl BlogdorTheAggregator {
|
||||||
self.cancel.cancelled().await
|
self.cancel.cancelled().await
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn spawn_http(&self, announce_tx: UnboundedSender<NewFeed>) {
|
pub async fn spawn_http(&self, user_request_tx: UnboundedSender<UserRequest>) {
|
||||||
let state = ServerState::new(
|
let state = ServerState::new(
|
||||||
self.db.clone(),
|
|
||||||
&self.zulip_to_blogdor_email,
|
&self.zulip_to_blogdor_email,
|
||||||
&self.blogdor_token,
|
&self.blogdor_token,
|
||||||
announce_tx,
|
user_request_tx,
|
||||||
);
|
);
|
||||||
server::spawn_server(state, self.cancel.clone()).await;
|
server::spawn_server(state, self.cancel.clone()).await;
|
||||||
}
|
}
|
||||||
|
|
@ -233,21 +245,40 @@ impl BlogdorTheAggregator {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn add_feed(&self, add_request: &NewFeed) {
|
pub async fn process_user_request(&self, user_request: &UserRequest) {
|
||||||
if let Err(e) = self.add_user(add_request.owner).await {
|
match user_request.command.action {
|
||||||
let _ = add_request.result_sender.send(Err(e));
|
Action::Add => self.add_feed(user_request).await,
|
||||||
|
Action::Help => user_request.result_sender.send(Ok("DM or `@blogdor's manager` with `add <feed url, RSS or Atom XML files>`, `remove <feed url originally added by you>`, or `help` to get this message (duh).".to_string())).unwrap_or_default(),
|
||||||
|
Action::Remove => self.remove_feed(user_request).await,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn remove_feed(&self, remove_request: &UserRequest) {
|
||||||
|
let _ = remove_request
|
||||||
|
.result_sender
|
||||||
|
.send(Err("currently unsupported".to_string()));
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn add_feed(&self, user_request: &UserRequest) {
|
||||||
|
if let Err(e) = self.add_user(user_request.owner).await {
|
||||||
|
let _ = user_request.result_sender.send(Err(e));
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Err(e) = crate::fetch_and_parse_feed(&add_request.url, &self.client).await {
|
let url = &user_request.command.feed;
|
||||||
let _ = add_request.result_sender.send(Err(e));
|
let owner = user_request.owner;
|
||||||
|
|
||||||
|
let msg;
|
||||||
|
|
||||||
|
if let Err(e) = crate::fetch_and_parse_feed(url, &self.client).await {
|
||||||
|
let _ = user_request.result_sender.send(Err(e));
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Some(id) = match sqlx::query!(
|
if let Some(id) = match sqlx::query!(
|
||||||
"select id from feeds where owner = ? and url = ?",
|
"select id from feeds where owner = ? and url = ?",
|
||||||
add_request.url,
|
owner,
|
||||||
add_request.owner
|
url
|
||||||
)
|
)
|
||||||
.fetch_optional(&self.db)
|
.fetch_optional(&self.db)
|
||||||
.await
|
.await
|
||||||
|
|
@ -255,7 +286,7 @@ impl BlogdorTheAggregator {
|
||||||
Ok(r) => r.map(|r| r.id),
|
Ok(r) => r.map(|r| r.id),
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
tracing::error!("couldn't fetch an optional row from the feeds table: {e}");
|
tracing::error!("couldn't fetch an optional row from the feeds table: {e}");
|
||||||
let _ = add_request
|
let _ = user_request
|
||||||
.result_sender
|
.result_sender
|
||||||
.send(Err("whoa, something weird and bad happened".to_string()));
|
.send(Err("whoa, something weird and bad happened".to_string()));
|
||||||
return;
|
return;
|
||||||
|
|
@ -266,17 +297,19 @@ impl BlogdorTheAggregator {
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
tracing::error!("got error inserting into status: {e}");
|
tracing::error!("got error inserting into status: {e}");
|
||||||
let _ = add_request.result_sender.send(Err(format!(
|
let _ = user_request.result_sender.send(Err(format!(
|
||||||
"could not activate previously added feed at {} for Zulip user {}",
|
"could not activate previously added feed at {} for Zulip user {}",
|
||||||
&add_request.url, add_request.owner
|
url, owner
|
||||||
)));
|
)));
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
msg = format!("marked previously added feed at {url} as active");
|
||||||
} else {
|
} else {
|
||||||
let txn = match self.db.begin().await {
|
let txn = match self.db.begin().await {
|
||||||
Ok(txn) => txn,
|
Ok(txn) => txn,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
tracing::error!("got error begining a transaction: {e}");
|
tracing::error!("got error begining a transaction: {e}");
|
||||||
let _ = add_request
|
let _ = user_request
|
||||||
.result_sender
|
.result_sender
|
||||||
.send(Err("could not add feed".to_string()));
|
.send(Err("could not add feed".to_string()));
|
||||||
return;
|
return;
|
||||||
|
|
@ -285,15 +318,15 @@ impl BlogdorTheAggregator {
|
||||||
// get the ID for the feed
|
// get the ID for the feed
|
||||||
let id = match sqlx::query!(
|
let id = match sqlx::query!(
|
||||||
"insert into feeds (url, owner) values (?, ?) returning id",
|
"insert into feeds (url, owner) values (?, ?) returning id",
|
||||||
add_request.url,
|
url,
|
||||||
add_request.owner
|
owner
|
||||||
)
|
)
|
||||||
.fetch_one(&self.db)
|
.fetch_one(&self.db)
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
tracing::error!("error inserting into feeds: {e}");
|
tracing::error!("error inserting into feeds: {e}");
|
||||||
let _ = add_request
|
let _ = user_request
|
||||||
.result_sender
|
.result_sender
|
||||||
.send(Err("could not add feed".to_string()));
|
.send(Err("could not add feed".to_string()));
|
||||||
return;
|
return;
|
||||||
|
|
@ -307,7 +340,7 @@ impl BlogdorTheAggregator {
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
tracing::error!("error inserting into status: {e}");
|
tracing::error!("error inserting into status: {e}");
|
||||||
let _ = add_request
|
let _ = user_request
|
||||||
.result_sender
|
.result_sender
|
||||||
.send(Err("could not add feed".to_string()));
|
.send(Err("could not add feed".to_string()));
|
||||||
return;
|
return;
|
||||||
|
|
@ -320,7 +353,7 @@ impl BlogdorTheAggregator {
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
tracing::error!("error inserting into runs: {e}");
|
tracing::error!("error inserting into runs: {e}");
|
||||||
let _ = add_request
|
let _ = user_request
|
||||||
.result_sender
|
.result_sender
|
||||||
.send(Err("could not add feed".to_string()));
|
.send(Err("could not add feed".to_string()));
|
||||||
return;
|
return;
|
||||||
|
|
@ -329,18 +362,16 @@ impl BlogdorTheAggregator {
|
||||||
// woo!
|
// woo!
|
||||||
if let Err(e) = txn.commit().await {
|
if let Err(e) = txn.commit().await {
|
||||||
tracing::error!("error committing add-feed transaction: {e}");
|
tracing::error!("error committing add-feed transaction: {e}");
|
||||||
let _ = add_request
|
let _ = user_request
|
||||||
.result_sender
|
.result_sender
|
||||||
.send(Err("could not add feed".to_string()));
|
.send(Err("could not add feed".to_string()));
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
msg = format!("added new feed at {url}");
|
||||||
}
|
}
|
||||||
let _ = add_request.result_sender.send(Ok(()));
|
let _ = user_request.result_sender.send(Ok(msg));
|
||||||
|
|
||||||
let content = format!(
|
let content = format!("@**|{}**: added a new feed: {}", owner, url);
|
||||||
"@**|{}**: added a new feed: {}",
|
|
||||||
add_request.owner, add_request.url
|
|
||||||
);
|
|
||||||
let msg = ZulipMessage {
|
let msg = ZulipMessage {
|
||||||
to: self.channel_id,
|
to: self.channel_id,
|
||||||
typ: MessageType::Stream,
|
typ: MessageType::Stream,
|
||||||
|
|
|
||||||
10
src/main.rs
10
src/main.rs
|
|
@ -1,6 +1,6 @@
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
use blogdor::{BlogdorTheAggregator, NewFeed};
|
use blogdor::{BlogdorTheAggregator, UserRequest};
|
||||||
use tokio::sync::mpsc::{UnboundedReceiver, unbounded_channel};
|
use tokio::sync::mpsc::{UnboundedReceiver, unbounded_channel};
|
||||||
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
|
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
|
||||||
|
|
||||||
|
|
@ -31,16 +31,16 @@ fn init_logs() {
|
||||||
.init();
|
.init();
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn run_loop(bta: &BlogdorTheAggregator, mut announce_rx: UnboundedReceiver<NewFeed>) {
|
async fn run_loop(bta: &BlogdorTheAggregator, mut user_req_rx: UnboundedReceiver<UserRequest>) {
|
||||||
let mut check_feeds = tokio::time::interval(BLOGDOR_SNOOZE);
|
let mut check_feeds = tokio::time::interval(BLOGDOR_SNOOZE);
|
||||||
let mut check_stale = tokio::time::interval(Duration::from_hours(24));
|
let mut check_stale = tokio::time::interval(Duration::from_hours(24));
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
biased;
|
biased;
|
||||||
announce = announce_rx.recv() => {
|
user_req = user_req_rx.recv() => {
|
||||||
if let Some(announce) = announce {
|
if let Some(ureq) = user_req {
|
||||||
bta.add_feed(&announce).await;
|
bta.process_user_request(&ureq).await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
_ = check_feeds.tick() => {
|
_ = check_feeds.tick() => {
|
||||||
|
|
|
||||||
124
src/server.rs
124
src/server.rs
|
|
@ -9,7 +9,6 @@ use axum::{
|
||||||
};
|
};
|
||||||
use serde::Deserialize;
|
use serde::Deserialize;
|
||||||
use serde_json::{Map, Value};
|
use serde_json::{Map, Value};
|
||||||
use sqlx::SqlitePool;
|
|
||||||
use tokio::sync::mpsc::{UnboundedSender, unbounded_channel};
|
use tokio::sync::mpsc::{UnboundedSender, unbounded_channel};
|
||||||
use tokio_util::sync::CancellationToken;
|
use tokio_util::sync::CancellationToken;
|
||||||
use winnow::{
|
use winnow::{
|
||||||
|
|
@ -20,28 +19,21 @@ use winnow::{
|
||||||
token::{literal, take_until, take_while},
|
token::{literal, take_until, take_while},
|
||||||
};
|
};
|
||||||
|
|
||||||
use crate::NewFeed;
|
use crate::{Action, FeedCommand, UserRequest};
|
||||||
|
|
||||||
type Payload = Map<String, Value>;
|
type Payload = Map<String, Value>;
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct ServerState {
|
pub struct ServerState {
|
||||||
db: SqlitePool,
|
|
||||||
email: String,
|
email: String,
|
||||||
token: String,
|
token: String,
|
||||||
announce_tx: UnboundedSender<NewFeed>,
|
user_request_tx: UnboundedSender<UserRequest>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ServerState {
|
impl ServerState {
|
||||||
pub fn new(
|
pub fn new(email: &str, token: &str, user_request_tx: UnboundedSender<UserRequest>) -> Self {
|
||||||
db: SqlitePool,
|
|
||||||
email: &str,
|
|
||||||
token: &str,
|
|
||||||
announce_tx: UnboundedSender<NewFeed>,
|
|
||||||
) -> Self {
|
|
||||||
Self {
|
Self {
|
||||||
db,
|
user_request_tx,
|
||||||
announce_tx,
|
|
||||||
email: email.to_string(),
|
email: email.to_string(),
|
||||||
token: token.to_string(),
|
token: token.to_string(),
|
||||||
}
|
}
|
||||||
|
|
@ -105,49 +97,28 @@ async fn handle_manage_feed(
|
||||||
|
|
||||||
tracing::debug!(command = ?command);
|
tracing::debug!(command = ?command);
|
||||||
|
|
||||||
match command.action {
|
let (tx, mut rx) = unbounded_channel();
|
||||||
Action::Add => {
|
match state.user_request_tx.send(UserRequest {
|
||||||
let (tx, mut rx) = unbounded_channel();
|
command,
|
||||||
match state.announce_tx.send(NewFeed {
|
owner: sender_id,
|
||||||
url: command.feed.to_string(),
|
result_sender: tx,
|
||||||
owner: sender_id,
|
}) {
|
||||||
result_sender: tx,
|
Ok(_) => {}
|
||||||
}) {
|
Err(e) => {
|
||||||
Ok(_) => {}
|
tracing::error!("could not send add feed message to runloop: {e}");
|
||||||
Err(e) => {
|
resp.insert("content", "oh no, something terrible happened!".to_string());
|
||||||
tracing::error!("could not send add feed message to runloop: {e}");
|
|
||||||
resp.insert("content", "oh no, something terrible happened!".to_string());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let s = rx.recv().await;
|
|
||||||
dbg!(&s);
|
|
||||||
match s {
|
|
||||||
Some(Ok(_)) => {
|
|
||||||
dbg!("success");
|
|
||||||
resp.insert("content", "Blogdor Says: SUCCESS!".to_string());
|
|
||||||
}
|
|
||||||
Some(Err(e)) => {
|
|
||||||
dbg!(&e);
|
|
||||||
resp.insert("content", format!("Blogdor Says: ERRORED! {e}"));
|
|
||||||
}
|
|
||||||
None => return StatusCode::INTERNAL_SERVER_ERROR.into_response(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Action::Remove => match remove_feed(&state.db, sender_id, command.feed).await {
|
|
||||||
Ok(_) => {
|
|
||||||
resp.insert("content", "sorry, that's currently unsupported".to_string());
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
resp.insert("content", e);
|
|
||||||
}
|
|
||||||
},
|
|
||||||
Action::Help => {
|
|
||||||
resp.insert("content", "DM or `@blogdor's manager` with `add <feed url, RSS or Atom XML files>`, `remove <feed url originally added by you>`, or `help` to get this message (duh).".to_string());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
dbg!(&resp);
|
match rx.recv().await {
|
||||||
|
Some(Ok(r)) => {
|
||||||
|
resp.insert("content", format!("Blogdor Says: {r}"));
|
||||||
|
}
|
||||||
|
Some(Err(e)) => {
|
||||||
|
resp.insert("content", format!("Blogdor Says: ERRORED! {e}"));
|
||||||
|
}
|
||||||
|
None => return StatusCode::INTERNAL_SERVER_ERROR.into_response(),
|
||||||
|
}
|
||||||
|
|
||||||
Json(resp).into_response()
|
Json(resp).into_response()
|
||||||
} else {
|
} else {
|
||||||
|
|
@ -155,31 +126,6 @@ async fn handle_manage_feed(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn remove_feed(_db: &SqlitePool, _user: u32, _feed: &str) -> Result<(), String> {
|
|
||||||
// sqlx::query!(
|
|
||||||
// "update feeds set active = false, updated_at = current_timestamp where
|
|
||||||
// url = ? and added_by = ?", feed,
|
|
||||||
// user
|
|
||||||
// )
|
|
||||||
// .execute(db)
|
|
||||||
// .await
|
|
||||||
// .map_err(|e| {
|
|
||||||
// tracing::error!("could not set {feed} inactive by {user}, got {e}");
|
|
||||||
// "sorry buddy, Blogdor couldn't do that".to_string()
|
|
||||||
// })?;
|
|
||||||
// sqlx::query!(
|
|
||||||
// "select * from feeds where added_by = ? and active = false",
|
|
||||||
// user
|
|
||||||
// )
|
|
||||||
// .fetch_one(db)
|
|
||||||
// .await
|
|
||||||
// .map_err(|e| {
|
|
||||||
// tracing::error!("could not set {feed} inactive by {user}, got {e}");
|
|
||||||
// "sorry buddy, Blogdor couldn't do that".to_string()
|
|
||||||
// })?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn graceful_shutdown(cancel: CancellationToken) {
|
async fn graceful_shutdown(cancel: CancellationToken) {
|
||||||
use tokio::signal;
|
use tokio::signal;
|
||||||
let ctrl_c = async {
|
let ctrl_c = async {
|
||||||
|
|
@ -223,20 +169,7 @@ struct ZulipMessage {
|
||||||
_rest: Payload,
|
_rest: Payload,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
fn parse_command<'i>(input: &mut &'i str) -> winnow::Result<Option<FeedCommand>> {
|
||||||
enum Action {
|
|
||||||
Add,
|
|
||||||
Remove,
|
|
||||||
Help,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
|
||||||
struct FeedCommand<'req> {
|
|
||||||
feed: &'req str,
|
|
||||||
action: Action,
|
|
||||||
}
|
|
||||||
|
|
||||||
fn parse_command<'i>(input: &mut &'i str) -> winnow::Result<Option<FeedCommand<'i>>> {
|
|
||||||
let s = take_until::<_, _, ()>(0.., "@**blogdor's manager**").parse_next(input);
|
let s = take_until::<_, _, ()>(0.., "@**blogdor's manager**").parse_next(input);
|
||||||
match s {
|
match s {
|
||||||
Err(_) => {}
|
Err(_) => {}
|
||||||
|
|
@ -276,7 +209,10 @@ fn parse_command<'i>(input: &mut &'i str) -> winnow::Result<Option<FeedCommand<'
|
||||||
)
|
)
|
||||||
.map(|(_, f, _, _)| f)
|
.map(|(_, f, _, _)| f)
|
||||||
.parse_next(input)?;
|
.parse_next(input)?;
|
||||||
Ok(Some(FeedCommand { feed, action }))
|
Ok(Some(FeedCommand {
|
||||||
|
feed: feed.to_string(),
|
||||||
|
action,
|
||||||
|
}))
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
|
|
@ -300,7 +236,7 @@ mod test {
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
c,
|
c,
|
||||||
FeedCommand {
|
FeedCommand {
|
||||||
feed: "feed-url",
|
feed: "feed-url".to_string(),
|
||||||
action: Action::Add
|
action: Action::Add
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
|
@ -311,7 +247,7 @@ mod test {
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
c,
|
c,
|
||||||
FeedCommand {
|
FeedCommand {
|
||||||
feed: "feed-url",
|
feed: "feed-url".to_string(),
|
||||||
action: Action::Remove
|
action: Action::Remove
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue