diff --git a/src/app/mod.rs b/src/app/mod.rs index 64f479c..31236b9 100644 --- a/src/app/mod.rs +++ b/src/app/mod.rs @@ -1,9 +1,9 @@ -use std::{collections::BTreeMap, net::SocketAddr, sync::OnceLock, time::Duration}; +use std::{collections::BTreeMap, net::SocketAddr, time::Duration}; use crossterm::event::{Event, EventStream, KeyCode, KeyEvent, KeyEventKind}; use futures::{FutureExt, StreamExt}; use joecalsend::{ - Config, JoecalState, JoecalUploadRequest, Listeners, TransferEvent, UploadDialog, + Config, JoecalService, JoecalUploadRequest, Listeners, TransferEvent, UploadDialog, error::Result, models::Device, }; use julid::Julid; @@ -19,7 +19,7 @@ pub mod widgets; pub type Peers = BTreeMap; pub struct App { - pub state: OnceLock, + pub service: JoecalService, pub screen: Vec, pub events: EventStream, // addr -> (alias, fingerprint) @@ -28,7 +28,7 @@ pub struct App { upload_state: TableState, // for getting messages back from the web server or web client about things we've done; the // other end is held by the state - transfer_event_rx: OnceLock>, + event_listener: UnboundedReceiver, } #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -40,68 +40,71 @@ pub enum CurrentScreen { Logging, } -impl Default for App { - fn default() -> Self { - Self::new() +#[tokio::main] +pub async fn start_and_run( + terminal: &mut DefaultTerminal, + config: Config, + device: Device, +) -> Result<()> { + let (event_tx, event_listener) = unbounded_channel(); + + let service = JoecalService::new(device, event_tx) + .await + .expect("Could not create JoecalService"); + + let mut app = App::new(service, event_listener); + + let mut handles = JoinSet::new(); + app.service.start(&config, &mut handles).await; + loop { + terminal.draw(|frame| app.draw(frame))?; + app.handle_events().await?; + + if let Some(&top) = app.screen.last() + && top == CurrentScreen::Stopping + { + app.service.stop().await; + break; + } + + let peers = app.service.peers.lock().await; + app.peers.clear(); + peers.iter().for_each(|(fingerprint, (addr, device))| { + let alias = device.alias.clone(); + app.peers + .insert(addr.to_owned(), (alias, fingerprint.to_owned())); + }); + + let mut stale_uploads = Vec::new(); + let now = chrono::Utc::now().timestamp_millis() as u64; + for (id, request) in app.uploads.iter() { + if request.tx.is_closed() || (now - id.timestamp()) > 60_000 { + stale_uploads.push(*id); + } + } + for id in stale_uploads { + app.uploads.remove(&id); + } } + + shutdown(&mut handles).await; + + Ok(()) } impl App { - pub fn new() -> Self { + pub fn new(service: JoecalService, event_listener: UnboundedReceiver) -> Self { App { + service, + event_listener, screen: vec![CurrentScreen::Main], - state: Default::default(), events: Default::default(), peers: Default::default(), uploads: Default::default(), upload_state: Default::default(), - transfer_event_rx: Default::default(), } } - #[tokio::main] - pub async fn start_and_run( - &mut self, - terminal: &mut DefaultTerminal, - config: Config, - device: Device, - ) -> Result<()> { - let (transfer_event_tx, transfer_event_rx) = unbounded_channel(); - - let state = JoecalState::new(device, transfer_event_tx) - .await - .expect("Could not create JoecalState"); - - let _ = self.transfer_event_rx.set(transfer_event_rx); - - let mut handles = JoinSet::new(); - state.start(&config, &mut handles).await; - let _ = self.state.set(state); - loop { - terminal.draw(|frame| self.draw(frame))?; - self.handle_events().await?; - - if let Some(&top) = self.screen.last() - && top == CurrentScreen::Stopping - { - self.state.get().unwrap().stop().await; - break; - } - - let peers = self.state.get().unwrap().peers.lock().await; - self.peers.clear(); - peers.iter().for_each(|(fingerprint, (addr, device))| { - let alias = device.alias.clone(); - self.peers - .insert(addr.to_owned(), (alias, fingerprint.to_owned())); - }); - } - - shutdown(&mut handles).await; - - Ok(()) - } - async fn handle_events(&mut self) -> Result<()> { tokio::select! { event = self.events.next().fuse() => { @@ -116,7 +119,7 @@ impl App { } } } - transfer_event = self.transfer_event_rx.get_mut().unwrap().recv() => { + transfer_event = self.event_listener.recv() => { if let Some(event) = transfer_event { debug!("got transferr event {event:?}"); match event { diff --git a/src/discovery.rs b/src/discovery.rs index a99486f..77d49fb 100644 --- a/src/discovery.rs +++ b/src/discovery.rs @@ -9,11 +9,12 @@ use axum::{ extract::{ConnectInfo, State}, }; use log::{debug, error, trace, warn}; +use network_interface::{Addr, NetworkInterface, NetworkInterfaceConfig, V4IfAddr}; use tokio::net::UdpSocket; -use crate::{Config, JoecalState, RunningState, models::Device}; +use crate::{Config, JoecalService, RunningState, models::Device}; -impl JoecalState { +impl JoecalService { pub async fn announce( &self, socket: Option, @@ -96,7 +97,7 @@ impl JoecalState { /// Axum request handler for receiving other devices' registration requests. pub async fn register_device( - State(state): State, + State(state): State, ConnectInfo(addr): ConnectInfo, Json(device): Json, ) -> Json { @@ -134,3 +135,33 @@ async fn announce_multicast( socket.send_to(msg.as_bytes(), addr).await?; Ok(()) } + +/* +async fn announce_unicast( + device: &Device, + ip: Option, + client: reqwest::Client, +) -> crate::error::Result<()> { + // for enumerating subnet peers when multicast fails (https://github.com/localsend/protocol?tab=readme-ov-file#32-http-legacy-mode) + let std::net::IpAddr::V4(ip) = local_ip_address::local_ip()? else { + unreachable!() + }; + + let mut _network_ip = ip; + let nifs = NetworkInterface::show()?; + for addr in nifs.into_iter().flat_map(|i| i.addr) { + if let Addr::V4(V4IfAddr { + ip: ifip, + netmask: Some(netmask), + .. + }) = addr + && ip == ifip + { + _network_ip = ip & netmask; + break; + } + } + + todo!() +} +*/ diff --git a/src/error.rs b/src/error.rs index 5473ff8..dd9ed7b 100644 --- a/src/error.rs +++ b/src/error.rs @@ -44,6 +44,9 @@ pub enum LocalSendError { #[error("Error getting local IP")] IpAddrError(#[from] local_ip_address::Error), + + #[error("Error getting network interface")] + NetworkInterfaceError(#[from] network_interface::Error), } pub type Result = std::result::Result; diff --git a/src/http_server.rs b/src/http_server.rs index e1182f6..dda9c94 100644 --- a/src/http_server.rs +++ b/src/http_server.rs @@ -9,12 +9,12 @@ use tokio::{net::TcpListener, sync::mpsc}; use tower_http::limit::RequestBodyLimitLayer; use crate::{ - Config, JoecalState, + Config, JoecalService, discovery::register_device, transfer::{register_prepare_upload, register_upload}, }; -impl JoecalState { +impl JoecalService { pub async fn start_http_server( &self, stop_rx: mpsc::Receiver<()>, diff --git a/src/lib.rs b/src/lib.rs index 4fb7ef9..706f737 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -63,21 +63,20 @@ pub struct JoecalUploadRequest { /// Contains the main network and backend state for an application session. #[derive(Clone)] -pub struct JoecalState { +pub struct JoecalService { pub device: Device, pub peers: Arc>>, pub sessions: Arc>>, // Session ID to Session pub running_state: Arc>, pub socket: Arc, pub client: reqwest::Client, - pub upload_requests: Arc>>, shutdown_sender: OnceLock, // the receiving end will be held by the application so it can update the UI based on backend // events transfer_event_tx: UnboundedSender, } -impl JoecalState { +impl JoecalService { pub async fn new( device: Device, transfer_event_tx: UnboundedSender, @@ -91,12 +90,11 @@ impl JoecalState { device, client: reqwest::Client::new(), socket: socket.into(), + transfer_event_tx, peers: Default::default(), sessions: Default::default(), running_state: Default::default(), shutdown_sender: Default::default(), - upload_requests: Default::default(), - transfer_event_tx, }) } @@ -141,9 +139,6 @@ impl JoecalState { Listeners::Udp } }); - - // TODO: add a task that periodically clears out the upload requests if - // they're too old; the keys are julids so they have the time in them } pub async fn stop(&self) { @@ -162,28 +157,6 @@ impl JoecalState { peers.clear(); } - pub async fn get_upload_request(&self, id: Julid) -> Option { - self.upload_requests.lock().await.get(&id).cloned() - } - - pub async fn clear_upload_request(&self, id: Julid) { - let _ = self.upload_requests.lock().await.remove(&id); - } - - /// Add a transmitter for an upload request confirmation dialog that the - /// application frontend can use to tell the Axum handler whether or not to - /// accept the upload. - /// - /// IMPORTANT! Be sure to call `clear_upload_request(id)` when you're done - /// getting an answer back/before you exit! - pub async fn add_upload_request(&self, id: Julid, request: JoecalUploadRequest) { - self.upload_requests - .lock() - .await - .entry(id) - .insert_entry(request); - } - pub fn send_event(&self, event: TransferEvent) { if let Err(e) = self.transfer_event_tx.send(event.clone()) { error!("got error sending transfer event '{event:?}': {e:?}"); diff --git a/src/main.rs b/src/main.rs index 14d736b..9e2b506 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,7 +1,5 @@ -use app::App; use joecalsend::{Config, error, models::Device}; use local_ip_address::local_ip; -use network_interface::{Addr, NetworkInterface, NetworkInterfaceConfig, V4IfAddr}; use tui_logger::{LevelFilter, init_logger, set_env_filter_from_env}; mod app; @@ -9,10 +7,6 @@ mod app; fn main() -> error::Result<()> { let device = Device::default(); - let std::net::IpAddr::V4(ip) = local_ip()? else { - unreachable!() - }; - if std::env::var("RUST_LOG").is_err() { unsafe { std::env::set_var("RUST_LOG", "joecalsend"); @@ -21,28 +15,10 @@ fn main() -> error::Result<()> { init_logger(LevelFilter::Debug).map_err(|e| std::io::Error::other(format!("{e}")))?; set_env_filter_from_env(None); - // for enumerating subnet peers when multicast fails (https://github.com/localsend/protocol?tab=readme-ov-file#32-http-legacy-mode) - let mut _network_ip = ip; - let nifs = NetworkInterface::show().unwrap(); - for addr in nifs.into_iter().flat_map(|i| i.addr) { - if let Addr::V4(V4IfAddr { - ip: ifip, - netmask: Some(netmask), - .. - }) = addr - && ip == ifip - { - _network_ip = ip & netmask; - break; - } - } - let config = Config::default(); - let mut app = App::new(); - let mut terminal = ratatui::init(); - let result = app.start_and_run(&mut terminal, config, device); + let result = app::start_and_run(&mut terminal, config, device); ratatui::restore(); result diff --git a/src/transfer.rs b/src/transfer.rs index b4d5f02..b471dae 100644 --- a/src/transfer.rs +++ b/src/transfer.rs @@ -13,7 +13,7 @@ use serde::{Deserialize, Serialize}; use tokio::sync::mpsc::unbounded_channel; use crate::{ - JoecalState, JoecalUploadRequest, TransferEvent, UploadDialog, + JoecalService, JoecalUploadRequest, TransferEvent, UploadDialog, error::{LocalSendError, Result}, models::{Device, FileMetadata}, }; @@ -52,7 +52,7 @@ pub struct PrepareUploadRequest { pub files: HashMap, } -impl JoecalState { +impl JoecalService { pub async fn prepare_upload( &self, peer: String, @@ -190,7 +190,7 @@ impl JoecalState { } pub async fn register_prepare_upload( - State(state): State, + State(state): State, ConnectInfo(addr): ConnectInfo, Json(req): Json, ) -> impl IntoResponse { @@ -263,7 +263,7 @@ pub async fn register_prepare_upload( pub async fn register_upload( Query(params): Query, - State(state): State, + State(state): State, Extension(download_dir): Extension, body: Bytes, ) -> impl IntoResponse { @@ -339,7 +339,7 @@ pub struct UploadParams { pub async fn register_cancel( Query(params): Query, - State(state): State, + State(state): State, ) -> impl IntoResponse { let mut sessions_lock = state.sessions.lock().await; let session = match sessions_lock.get_mut(¶ms.session_id) {