diff --git a/src/app/mod.rs b/src/app/mod.rs index 50ee51c..96a972c 100644 --- a/src/app/mod.rs +++ b/src/app/mod.rs @@ -7,7 +7,7 @@ use std::{ use crossterm::event::{Event, EventStream, KeyCode, KeyEvent, KeyEventKind}; use futures::{FutureExt, StreamExt}; -use jocalsend::{JocalService, ReceiveDialog, ReceiveRequest, TransferEvent, error::Result}; +use jocalsend::{JocalEvent, JocalService, ReceiveDialog, ReceiveRequest, error::Result}; use julid::Julid; use log::{LevelFilter, debug, error, warn}; use ratatui::{ @@ -54,7 +54,7 @@ pub struct App { receiving_state: TableState, // for getting messages back from the web server or web client about things we've done; the // other end is held by the service - event_listener: UnboundedReceiver, + event_listener: UnboundedReceiver, file_finder: FileFinder, text: Option, input: Input, @@ -83,7 +83,7 @@ pub enum FileMode { } impl App { - pub fn new(service: JocalService, event_listener: UnboundedReceiver) -> Self { + pub fn new(service: JocalService, event_listener: UnboundedReceiver) -> Self { App { service, event_listener, @@ -117,12 +117,15 @@ impl App { if let Some(event) = transfer_event { debug!("got transferr event {event:?}"); match event { - TransferEvent::ReceiveRequest { id, request } => { + JocalEvent::ReceiveRequest { id, request } => { self.receive_requests.insert(id, request); } - TransferEvent::Cancelled(id) | TransferEvent::Received(id) => { + JocalEvent::Cancelled(id) | JocalEvent::ReceivedInbound(id) => { self.receive_requests.remove(&id); } + JocalEvent::SendApproved(_id) => todo!(), + JocalEvent::SendDenied(_id) => todo!(), + JocalEvent::Tick => {} } } } diff --git a/src/discovery.rs b/src/discovery.rs index c139003..08c327c 100644 --- a/src/discovery.rs +++ b/src/discovery.rs @@ -1,7 +1,6 @@ use std::{ net::{SocketAddr, SocketAddrV4}, sync::Arc, - time::Duration, }; use axum::{ @@ -11,7 +10,7 @@ use axum::{ use log::{debug, error, trace, warn}; use tokio::net::UdpSocket; -use crate::{Config, JocalService, RunningState, models::Device}; +use crate::{Config, DEFAULT_INTERVAL, JocalService, RunningState, models::Device}; impl JocalService { pub async fn announce(&self, socket: Option) -> crate::error::Result<()> { @@ -29,7 +28,7 @@ impl JocalService { pub async fn listen_multicast(&self) -> crate::error::Result<()> { let mut buf = [0; 65536]; - let mut timeout = tokio::time::interval(Duration::from_secs(5)); + let mut timeout = tokio::time::interval(DEFAULT_INTERVAL); timeout.tick().await; loop { @@ -45,7 +44,6 @@ impl JocalService { } }, r = self.socket.recv_from(&mut buf) => { - trace!("received multicast datagram"); match r { Ok((size, src)) => { let received_msg = String::from_utf8_lossy(&buf[..size]); diff --git a/src/lib.rs b/src/lib.rs index 38aaf7e..31ef97d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -32,11 +32,14 @@ pub const MULTICAST_IP: Ipv4Addr = Ipv4Addr::new(224, 0, 0, 167); pub const LISTENING_SOCKET_ADDR: SocketAddrV4 = SocketAddrV4::new(Ipv4Addr::from_bits(0), DEFAULT_PORT); +pub const DEFAULT_INTERVAL: Duration = Duration::from_millis(200); + #[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum Listeners { +pub enum JocalTasks { Udp, Http, Multicast, + Tick, } #[derive(Debug, Clone, PartialEq, Eq)] @@ -46,10 +49,13 @@ pub enum ReceiveDialog { } #[derive(Debug, Clone)] -pub enum TransferEvent { - Received(Julid), +pub enum JocalEvent { + ReceivedInbound(Julid), + SendApproved(Julid), + SendDenied(Julid), Cancelled(Julid), ReceiveRequest { id: Julid, request: ReceiveRequest }, + Tick, } #[derive(Clone)] @@ -80,13 +86,13 @@ pub struct JocalService { http_handle: OnceLock, // the receiving end will be held by the application so it can update the UI based on backend // events - transfer_event_tx: UnboundedSender, + transfer_event_tx: UnboundedSender, } impl JocalService { pub async fn new( config: Config, - ) -> crate::error::Result<(Self, UnboundedReceiver)> { + ) -> crate::error::Result<(Self, UnboundedReceiver)> { let (tx, rx) = mpsc::unbounded_channel(); let socket = UdpSocket::bind(LISTENING_SOCKET_ADDR).await?; socket.set_multicast_loop_v4(true)?; @@ -113,43 +119,56 @@ impl JocalService { )) } - pub async fn start(&self, handles: &mut JoinSet) { + pub async fn start(&self, handles: &mut JoinSet) { let service = self.clone(); - handles.spawn({ - async move { - if let Err(e) = service.start_http_server().await { - error!("HTTP server error: {e}"); - } - Listeners::Http + handles.spawn(async move { + if let Err(e) = service.start_http_server().await { + error!("HTTP server error: {e}"); } + JocalTasks::Http }); let service = self.clone(); - handles.spawn({ - async move { - if let Err(e) = service.listen_multicast().await { - error!("UDP listener error: {e}"); - } - Listeners::Multicast + handles.spawn(async move { + if let Err(e) = service.listen_multicast().await { + error!("UDP listener error: {e}"); } + JocalTasks::Multicast }); let service = self.clone(); - handles.spawn({ - async move { - loop { - let rstate = service.running_state.lock().await; - if *rstate == RunningState::Stopping { - break; - } - if let Err(e) = service.announce(None).await { - error!("Announcement error: {e}"); - } - tokio::time::sleep(Duration::from_secs(5)).await; + handles.spawn(async move { + let service = &service; + let mut tick = tokio::time::interval(DEFAULT_INTERVAL); + + loop { + let rstate = service.running_state.lock().await; + if *rstate == RunningState::Stopping { + break; } - Listeners::Udp + tick.tick().await; + service + .transfer_event_tx + .send(JocalEvent::Tick) + .unwrap_or_else(|e| log::warn!("could not send tick event: {e:?}")); } + JocalTasks::Tick + }); + + let service = self.clone(); + handles.spawn(async move { + loop { + let rstate = service.running_state.lock().await; + if *rstate == RunningState::Stopping { + break; + } + if let Err(e) = service.announce(None).await { + error!("Announcement error: {e}"); + } + tokio::time::sleep(Duration::from_secs(2)).await; + } + JocalTasks::Udp }); } @@ -168,7 +187,7 @@ impl JocalService { peers.clear(); } - pub fn send_event(&self, event: TransferEvent) { + pub fn send_event(&self, event: JocalEvent) { if let Err(e) = self.transfer_event_tx.send(event.clone()) { error!("got error sending transfer event '{event:?}': {e:?}"); } diff --git a/src/main.rs b/src/main.rs index a40114e..988a549 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,7 +1,7 @@ use std::{path::Path, str::FromStr, time::Duration}; use clap::Parser; -use jocalsend::{Config, JocalService, Listeners, error::Result}; +use jocalsend::{Config, JocalService, JocalTasks, error::Result}; use log::{error, info}; use ratatui::DefaultTerminal; use ratatui_explorer::FileExplorer; @@ -105,7 +105,7 @@ async fn start_and_run(terminal: &mut DefaultTerminal, config: Config) -> Result Ok(()) } -async fn shutdown(handles: &mut JoinSet) { +async fn shutdown(handles: &mut JoinSet) { let mut timeout = tokio::time::interval(Duration::from_secs(5)); timeout.tick().await; loop { diff --git a/src/transfer.rs b/src/transfer.rs index 1724df3..a153dc2 100644 --- a/src/transfer.rs +++ b/src/transfer.rs @@ -13,7 +13,7 @@ use serde::{Deserialize, Serialize}; use tokio::sync::mpsc::unbounded_channel; use crate::{ - JocalService, ReceiveDialog, ReceiveRequest, TransferEvent, + JocalEvent, JocalService, ReceiveDialog, ReceiveRequest, error::{LocalSendError, Result}, models::{Device, FileMetadata}, }; @@ -239,7 +239,7 @@ pub async fn handle_prepare_upload( match service .transfer_event_tx - .send(TransferEvent::ReceiveRequest { id, request }) + .send(JocalEvent::ReceiveRequest { id, request }) { Ok(_) => {} Err(e) => { @@ -354,7 +354,7 @@ pub async fn handle_receive_upload( } if let Ok(id) = Julid::from_str(session_id) { - service.send_event(TransferEvent::Received(id)); + service.send_event(JocalEvent::ReceivedInbound(id)); }; StatusCode::OK.into_response() @@ -384,7 +384,7 @@ pub async fn handle_cancel( session.status = SessionStatus::Cancelled; if let Ok(id) = Julid::from_str(¶ms.session_id) { - service.send_event(TransferEvent::Cancelled(id)); + service.send_event(JocalEvent::Cancelled(id)); }; StatusCode::OK.into_response()