use channels to send events from app to backend and back

this removes the notification and upload approval implementation from the backend; still need to
replace the gui pop-up with a ratatui widget, but now that's much easier.
This commit is contained in:
Joe Ardent 2025-07-27 17:03:29 -07:00
parent 2617e37911
commit 6558e18dec
3 changed files with 123 additions and 24 deletions

View file

@ -1,8 +1,13 @@
use std::{collections::BTreeMap, io, net::SocketAddr, sync::OnceLock, time::Duration}; use std::{collections::BTreeMap, net::SocketAddr, sync::OnceLock, time::Duration};
use crossterm::event::{Event, EventStream, KeyCode, KeyEvent, KeyEventKind}; use crossterm::event::{Event, EventStream, KeyCode, KeyEvent, KeyEventKind};
use futures::{FutureExt, StreamExt}; use futures::{FutureExt, StreamExt};
use joecalsend::{Config, JoecalState, Listeners, models::Device}; use joecalsend::{
Config, JoecalState, Listeners, TransferEvent, UploadDialog,
error::{LocalSendError, Result},
models::Device,
};
use native_dialog::MessageDialogBuilder;
use ratatui::{ use ratatui::{
DefaultTerminal, DefaultTerminal,
buffer::Buffer, buffer::Buffer,
@ -12,7 +17,10 @@ use ratatui::{
text::{Line, Text}, text::{Line, Text},
widgets::{Block, Paragraph, Widget}, widgets::{Block, Paragraph, Widget},
}; };
use tokio::task::JoinSet; use tokio::{
sync::mpsc::{UnboundedReceiver, unbounded_channel},
task::JoinSet,
};
pub mod ui; pub mod ui;
@ -24,6 +32,9 @@ pub struct App {
pub events: EventStream, pub events: EventStream,
// addr -> (alias, fingerprint) // addr -> (alias, fingerprint)
pub peers: Peers, pub peers: Peers,
// for getting messages back from the web server or web client about things we've done; the
// other end is held by the state
transfer_event_rx: OnceLock<UnboundedReceiver<TransferEvent>>,
} }
#[derive(Debug, Clone, Copy, PartialEq, Eq)] #[derive(Debug, Clone, Copy, PartialEq, Eq)]
@ -34,6 +45,12 @@ pub enum CurrentScreen {
Stopping, Stopping,
} }
impl Default for App {
fn default() -> Self {
Self::new()
}
}
impl App { impl App {
pub fn new() -> Self { pub fn new() -> Self {
App { App {
@ -41,6 +58,7 @@ impl App {
screen: vec![CurrentScreen::Main], screen: vec![CurrentScreen::Main],
peers: Default::default(), peers: Default::default(),
events: Default::default(), events: Default::default(),
transfer_event_rx: Default::default(),
} }
} }
@ -50,14 +68,18 @@ impl App {
terminal: &mut DefaultTerminal, terminal: &mut DefaultTerminal,
config: Config, config: Config,
device: Device, device: Device,
) -> io::Result<()> { ) -> Result<()> {
let state = JoecalState::new(device) let (transfer_event_tx, transfer_event_rx) = unbounded_channel();
let state = JoecalState::new(device, transfer_event_tx)
.await .await
.expect("Could not create JoecalState"); .expect("Could not create JoecalState");
let _ = self.transfer_event_rx.set(transfer_event_rx);
let mut handles = JoinSet::new(); let mut handles = JoinSet::new();
state.start(&config, &mut handles).await; state.start(&config, &mut handles).await;
self.state.get_or_init(|| state); let _ = self.state.set(state);
loop { loop {
terminal.draw(|frame| self.draw(frame))?; terminal.draw(|frame| self.draw(frame))?;
self.handle_events().await?; self.handle_events().await?;
@ -83,7 +105,7 @@ impl App {
Ok(()) Ok(())
} }
async fn handle_events(&mut self) -> io::Result<()> { async fn handle_events(&mut self) -> Result<()> {
tokio::select! { tokio::select! {
event = self.events.next().fuse() => { event = self.events.next().fuse() => {
if let Some(Ok(evt)) = event { if let Some(Ok(evt)) = event {
@ -97,6 +119,39 @@ impl App {
} }
} }
} }
transfer_event = self.transfer_event_rx.get_mut().unwrap().recv() => {
if let Some(event) = transfer_event {
match event {
TransferEvent::UploadRequest { alias, id } => {
let sender =
self
.state
.get()
.unwrap()
.upload_requests
.lock()
.await
.get(&id).ok_or(LocalSendError::SessionInactive)?.clone();
// TODO: replace this with ratatui widget dialog
let upload_confirmed = MessageDialogBuilder::default()
.set_title(&alias)
.set_text("Do you want to receive files from this device?")
.confirm()
.show()
.unwrap();
if upload_confirmed {
let _ = sender.send(UploadDialog::UploadConfirm);
} else {
let _ = sender.send(UploadDialog::UploadDeny);
}
}
TransferEvent::Sent => {}
_ => {}
}
}
}
_ = tokio::time::sleep(Duration::from_millis(200)) => {} _ = tokio::time::sleep(Duration::from_millis(200)) => {}
} }

View file

@ -10,11 +10,15 @@ use std::{
sync::{Arc, OnceLock}, sync::{Arc, OnceLock},
}; };
use julid::Julid;
use models::Device; use models::Device;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use tokio::{ use tokio::{
net::UdpSocket, net::UdpSocket,
sync::{Mutex, mpsc}, sync::{
Mutex,
mpsc::{self, UnboundedSender},
},
task::JoinSet, task::JoinSet,
}; };
use transfer::Session; use transfer::Session;
@ -33,6 +37,19 @@ pub enum Listeners {
Multicast, Multicast,
} }
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum UploadDialog {
UploadDeny,
UploadConfirm,
}
pub enum TransferEvent {
Sent,
Received,
Failed,
UploadRequest { alias: String, id: Julid },
}
/// Contains the main network and backend state for an application session. /// Contains the main network and backend state for an application session.
#[derive(Clone)] #[derive(Clone)]
pub struct JoecalState { pub struct JoecalState {
@ -42,11 +59,18 @@ pub struct JoecalState {
pub running_state: Arc<Mutex<RunningState>>, pub running_state: Arc<Mutex<RunningState>>,
pub socket: Arc<UdpSocket>, pub socket: Arc<UdpSocket>,
pub client: reqwest::Client, pub client: reqwest::Client,
stop_tx: OnceLock<ShutdownSender>, pub upload_requests: Arc<Mutex<HashMap<Julid, UnboundedSender<UploadDialog>>>>,
shutdown_sender: OnceLock<ShutdownSender>,
// the receiving end will be held by the application so it can update the UI based on backend
// events
transfer_event_tx: UnboundedSender<TransferEvent>,
} }
impl JoecalState { impl JoecalState {
pub async fn new(device: Device) -> crate::error::Result<Self> { pub async fn new(
device: Device,
transfer_event_tx: UnboundedSender<TransferEvent>,
) -> crate::error::Result<Self> {
let socket = UdpSocket::bind(LISTENING_SOCKET_ADDR).await?; let socket = UdpSocket::bind(LISTENING_SOCKET_ADDR).await?;
socket.set_multicast_loop_v4(true)?; socket.set_multicast_loop_v4(true)?;
socket.set_multicast_ttl_v4(2)?; // one hop out from localnet socket.set_multicast_ttl_v4(2)?; // one hop out from localnet
@ -59,7 +83,9 @@ impl JoecalState {
peers: Default::default(), peers: Default::default(),
sessions: Default::default(), sessions: Default::default(),
running_state: Default::default(), running_state: Default::default(),
stop_tx: Default::default(), shutdown_sender: Default::default(),
upload_requests: Default::default(),
transfer_event_tx,
}) })
} }
@ -68,7 +94,7 @@ impl JoecalState {
let konfig = config.clone(); let konfig = config.clone();
handles.spawn({ handles.spawn({
let (tx, shutdown_rx) = mpsc::channel(1); let (tx, shutdown_rx) = mpsc::channel(1);
self.stop_tx.get_or_init(|| tx); let _ = self.shutdown_sender.set(tx);
async move { async move {
if let Err(e) = state.start_http_server(shutdown_rx, &konfig).await { if let Err(e) = state.start_http_server(shutdown_rx, &konfig).await {
eprintln!("HTTP server error: {e}"); eprintln!("HTTP server error: {e}");
@ -110,7 +136,7 @@ impl JoecalState {
let mut rstate = self.running_state.lock().await; let mut rstate = self.running_state.lock().await;
*rstate = RunningState::Stopping; *rstate = RunningState::Stopping;
let _ = self let _ = self
.stop_tx .shutdown_sender
.get() .get()
.expect("Could not get stop signal transmitter") .expect("Could not get stop signal transmitter")
.send(()) .send(())

View file

@ -8,11 +8,11 @@ use axum::{
response::IntoResponse, response::IntoResponse,
}; };
use julid::Julid; use julid::Julid;
use native_dialog::MessageDialogBuilder;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use tokio::sync::mpsc::unbounded_channel;
use crate::{ use crate::{
JoecalState, JoecalState, TransferEvent, UploadDialog,
error::{LocalSendError, Result}, error::{LocalSendError, Result},
models::{Device, FileMetadata}, models::{Device, FileMetadata},
}; };
@ -62,7 +62,6 @@ impl JoecalState {
} }
let peer = self.peers.lock().await.get(&peer).unwrap().clone(); let peer = self.peers.lock().await.get(&peer).unwrap().clone();
println!("Peer: {peer:?}");
let response = self let response = self
.client .client
@ -196,15 +195,34 @@ pub async fn register_prepare_upload(
) -> impl IntoResponse { ) -> impl IntoResponse {
println!("Received upload request from alias: {}", req.info.alias); println!("Received upload request from alias: {}", req.info.alias);
let result = MessageDialogBuilder::default() let id = Julid::new();
.set_title(&req.info.alias) let (tx, mut rx) = unbounded_channel();
.set_text("Do you want to receive files from this device?") state
.confirm() .upload_requests
.show() .lock()
.unwrap(); .await
.entry(id)
.insert_entry(tx);
if result { let dialog_send = state.transfer_event_tx.send(TransferEvent::UploadRequest {
let session_id = Julid::new().to_string(); alias: req.info.alias.clone(),
id,
});
match dialog_send {
Ok(_) => {}
Err(_e) => {
let _ = state.upload_requests.lock().await.remove(&id);
return StatusCode::INTERNAL_SERVER_ERROR.into_response();
}
}
// safe to unwrap because it's only `None` when there are no more transmitters,
// and we still have the `tx` we created earlier
let result = rx.recv().await.unwrap();
let _ = state.upload_requests.lock().await.remove(&id);
if result == UploadDialog::UploadConfirm {
let session_id = id.as_string();
let file_tokens: HashMap<String, String> = req let file_tokens: HashMap<String, String> = req
.files .files