pub mod discovery; pub mod error; pub mod http_server; pub mod models; pub mod transfer; use std::{ collections::HashMap, net::{Ipv4Addr, SocketAddr, SocketAddrV4}, sync::{Arc, OnceLock}, time::Duration, }; use models::Device; use serde::{Deserialize, Serialize}; use tokio::{ net::UdpSocket, sync::{Mutex, mpsc}, task::JoinHandle, }; use transfer::Session; pub const DEFAULT_PORT: u16 = 53317; pub const MULTICAST_IP: Ipv4Addr = Ipv4Addr::new(224, 0, 0, 167); pub const LISTENING_SOCKET_ADDR: SocketAddrV4 = SocketAddrV4::new(Ipv4Addr::from_bits(0), DEFAULT_PORT); type ShutdownSender = mpsc::Sender<()>; /// Contains the main network and backend state for an application session. #[derive(Clone)] pub struct JoecalState { pub device: Device, pub peers: Arc>>, pub sessions: Arc>>, // Session ID to Session pub running_state: Arc>, pub socket: Arc, pub client: reqwest::Client, stop_tx: OnceLock, } impl JoecalState { pub async fn new(device: Device) -> crate::error::Result { let socket = UdpSocket::bind(LISTENING_SOCKET_ADDR).await?; socket.set_multicast_loop_v4(true)?; socket.set_multicast_ttl_v4(2)?; // one hop out from localnet socket.join_multicast_v4(MULTICAST_IP, Ipv4Addr::from_bits(0))?; Ok(Self { device, client: reqwest::Client::new(), socket: socket.into(), peers: Default::default(), sessions: Default::default(), running_state: Default::default(), stop_tx: Default::default(), }) } pub async fn start( &self, config: &Config, ) -> crate::error::Result<(JoinHandle<()>, JoinHandle<()>, JoinHandle<()>)> { let state = self.clone(); let konfig = config.clone(); let server_handle = { let (tx, shutdown_rx) = mpsc::channel(1); self.stop_tx.get_or_init(|| tx); tokio::spawn(async move { if let Err(e) = state.start_http_server(shutdown_rx, &konfig).await { eprintln!("HTTP server error: {e}"); } }) }; let state = self.clone(); let konfig = config.clone(); let udp_handle = { tokio::spawn(async move { if let Err(e) = state.listen_multicast(&konfig).await { eprintln!("UDP listener error: {e}"); } }) }; let state = self.clone(); let config = config.clone(); let announcement_handle = { tokio::spawn(async move { loop { let rstate = state.running_state.lock().await; if *rstate == RunningState::Stopping { break; } if let Err(e) = state.announce(None, &config).await { eprintln!("Announcement error: {e}"); } tokio::time::sleep(std::time::Duration::from_secs(5)).await; } }) }; Ok((server_handle, udp_handle, announcement_handle)) } pub async fn stop(&self) { loop { let mut rstate = self.running_state.lock().await; *rstate = RunningState::Stopping; if self .stop_tx .get() .expect("Could not get stop signal transmitter") .send(()) .await .is_ok() { break; } else { tokio::time::sleep(Duration::from_millis(777)).await; } } } pub async fn refresh_peers(&self) { let mut peers = self.peers.lock().await; peers.clear(); } } #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum RunningState { Running, Stopping, } impl Default for RunningState { fn default() -> Self { Self::Running } } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Config { pub multicast_addr: SocketAddrV4, pub port: u16, pub download_dir: String, } impl Default for Config { fn default() -> Self { let home = std::env::home_dir().unwrap_or("/tmp".into()); let dd = home.join("joecalsend-downloads"); Self { multicast_addr: SocketAddrV4::new(MULTICAST_IP, DEFAULT_PORT), port: DEFAULT_PORT, download_dir: dd.to_string_lossy().into(), } } }