diff --git a/src/frontend/mod.rs b/src/frontend/mod.rs index d0f225d..09febe6 100644 --- a/src/frontend/mod.rs +++ b/src/frontend/mod.rs @@ -1,8 +1,13 @@ -use std::{collections::BTreeMap, io, net::SocketAddr, sync::OnceLock, time::Duration}; +use std::{collections::BTreeMap, net::SocketAddr, sync::OnceLock, time::Duration}; use crossterm::event::{Event, EventStream, KeyCode, KeyEvent, KeyEventKind}; use futures::{FutureExt, StreamExt}; -use joecalsend::{Config, JoecalState, Listeners, models::Device}; +use joecalsend::{ + Config, JoecalState, Listeners, TransferEvent, UploadDialog, + error::{LocalSendError, Result}, + models::Device, +}; +use native_dialog::MessageDialogBuilder; use ratatui::{ DefaultTerminal, buffer::Buffer, @@ -12,7 +17,10 @@ use ratatui::{ text::{Line, Text}, widgets::{Block, Paragraph, Widget}, }; -use tokio::task::JoinSet; +use tokio::{ + sync::mpsc::{UnboundedReceiver, unbounded_channel}, + task::JoinSet, +}; pub mod ui; @@ -24,6 +32,9 @@ pub struct App { pub events: EventStream, // addr -> (alias, fingerprint) pub peers: Peers, + // for getting messages back from the web server or web client about things we've done; the + // other end is held by the state + transfer_event_rx: OnceLock>, } #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -34,6 +45,12 @@ pub enum CurrentScreen { Stopping, } +impl Default for App { + fn default() -> Self { + Self::new() + } +} + impl App { pub fn new() -> Self { App { @@ -41,6 +58,7 @@ impl App { screen: vec![CurrentScreen::Main], peers: Default::default(), events: Default::default(), + transfer_event_rx: Default::default(), } } @@ -50,14 +68,18 @@ impl App { terminal: &mut DefaultTerminal, config: Config, device: Device, - ) -> io::Result<()> { - let state = JoecalState::new(device) + ) -> Result<()> { + let (transfer_event_tx, transfer_event_rx) = unbounded_channel(); + + let state = JoecalState::new(device, transfer_event_tx) .await .expect("Could not create JoecalState"); + let _ = self.transfer_event_rx.set(transfer_event_rx); + let mut handles = JoinSet::new(); state.start(&config, &mut handles).await; - self.state.get_or_init(|| state); + let _ = self.state.set(state); loop { terminal.draw(|frame| self.draw(frame))?; self.handle_events().await?; @@ -83,7 +105,7 @@ impl App { Ok(()) } - async fn handle_events(&mut self) -> io::Result<()> { + async fn handle_events(&mut self) -> Result<()> { tokio::select! { event = self.events.next().fuse() => { if let Some(Ok(evt)) = event { @@ -97,6 +119,39 @@ impl App { } } } + transfer_event = self.transfer_event_rx.get_mut().unwrap().recv() => { + if let Some(event) = transfer_event { + match event { + TransferEvent::UploadRequest { alias, id } => { + let sender = + self + .state + .get() + .unwrap() + .upload_requests + .lock() + .await + .get(&id).ok_or(LocalSendError::SessionInactive)?.clone(); + + // TODO: replace this with ratatui widget dialog + let upload_confirmed = MessageDialogBuilder::default() + .set_title(&alias) + .set_text("Do you want to receive files from this device?") + .confirm() + .show() + .unwrap(); + + if upload_confirmed { + let _ = sender.send(UploadDialog::UploadConfirm); + } else { + let _ = sender.send(UploadDialog::UploadDeny); + } + } + TransferEvent::Sent => {} + _ => {} + } + } + } _ = tokio::time::sleep(Duration::from_millis(200)) => {} } diff --git a/src/lib.rs b/src/lib.rs index b957cb2..b6ec164 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -10,11 +10,15 @@ use std::{ sync::{Arc, OnceLock}, }; +use julid::Julid; use models::Device; use serde::{Deserialize, Serialize}; use tokio::{ net::UdpSocket, - sync::{Mutex, mpsc}, + sync::{ + Mutex, + mpsc::{self, UnboundedSender}, + }, task::JoinSet, }; use transfer::Session; @@ -33,6 +37,19 @@ pub enum Listeners { Multicast, } +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum UploadDialog { + UploadDeny, + UploadConfirm, +} + +pub enum TransferEvent { + Sent, + Received, + Failed, + UploadRequest { alias: String, id: Julid }, +} + /// Contains the main network and backend state for an application session. #[derive(Clone)] pub struct JoecalState { @@ -42,11 +59,18 @@ pub struct JoecalState { pub running_state: Arc>, pub socket: Arc, pub client: reqwest::Client, - stop_tx: OnceLock, + pub upload_requests: Arc>>>, + shutdown_sender: OnceLock, + // the receiving end will be held by the application so it can update the UI based on backend + // events + transfer_event_tx: UnboundedSender, } impl JoecalState { - pub async fn new(device: Device) -> crate::error::Result { + pub async fn new( + device: Device, + transfer_event_tx: UnboundedSender, + ) -> crate::error::Result { let socket = UdpSocket::bind(LISTENING_SOCKET_ADDR).await?; socket.set_multicast_loop_v4(true)?; socket.set_multicast_ttl_v4(2)?; // one hop out from localnet @@ -59,7 +83,9 @@ impl JoecalState { peers: Default::default(), sessions: Default::default(), running_state: Default::default(), - stop_tx: Default::default(), + shutdown_sender: Default::default(), + upload_requests: Default::default(), + transfer_event_tx, }) } @@ -68,7 +94,7 @@ impl JoecalState { let konfig = config.clone(); handles.spawn({ let (tx, shutdown_rx) = mpsc::channel(1); - self.stop_tx.get_or_init(|| tx); + let _ = self.shutdown_sender.set(tx); async move { if let Err(e) = state.start_http_server(shutdown_rx, &konfig).await { eprintln!("HTTP server error: {e}"); @@ -110,7 +136,7 @@ impl JoecalState { let mut rstate = self.running_state.lock().await; *rstate = RunningState::Stopping; let _ = self - .stop_tx + .shutdown_sender .get() .expect("Could not get stop signal transmitter") .send(()) diff --git a/src/transfer.rs b/src/transfer.rs index de88f30..c47b289 100644 --- a/src/transfer.rs +++ b/src/transfer.rs @@ -8,11 +8,11 @@ use axum::{ response::IntoResponse, }; use julid::Julid; -use native_dialog::MessageDialogBuilder; use serde::{Deserialize, Serialize}; +use tokio::sync::mpsc::unbounded_channel; use crate::{ - JoecalState, + JoecalState, TransferEvent, UploadDialog, error::{LocalSendError, Result}, models::{Device, FileMetadata}, }; @@ -62,7 +62,6 @@ impl JoecalState { } let peer = self.peers.lock().await.get(&peer).unwrap().clone(); - println!("Peer: {peer:?}"); let response = self .client @@ -196,15 +195,34 @@ pub async fn register_prepare_upload( ) -> impl IntoResponse { println!("Received upload request from alias: {}", req.info.alias); - let result = MessageDialogBuilder::default() - .set_title(&req.info.alias) - .set_text("Do you want to receive files from this device?") - .confirm() - .show() - .unwrap(); + let id = Julid::new(); + let (tx, mut rx) = unbounded_channel(); + state + .upload_requests + .lock() + .await + .entry(id) + .insert_entry(tx); - if result { - let session_id = Julid::new().to_string(); + let dialog_send = state.transfer_event_tx.send(TransferEvent::UploadRequest { + alias: req.info.alias.clone(), + id, + }); + match dialog_send { + Ok(_) => {} + Err(_e) => { + let _ = state.upload_requests.lock().await.remove(&id); + return StatusCode::INTERNAL_SERVER_ERROR.into_response(); + } + } + + // safe to unwrap because it's only `None` when there are no more transmitters, + // and we still have the `tx` we created earlier + let result = rx.recv().await.unwrap(); + let _ = state.upload_requests.lock().await.remove(&id); + + if result == UploadDialog::UploadConfirm { + let session_id = id.as_string(); let file_tokens: HashMap = req .files