diff --git a/src/discovery.rs b/src/discovery.rs new file mode 100644 index 0000000..d1eb83d --- /dev/null +++ b/src/discovery.rs @@ -0,0 +1,121 @@ +use std::{ + net::{SocketAddr, SocketAddrV4}, + sync::Arc, +}; + +use axum::{ + Json, + extract::{ConnectInfo, State}, +}; +use tokio::net::UdpSocket; + +use crate::{Config, JoecalState, RunningState, models::device::DeviceInfo}; + +impl JoecalState { + pub async fn announce( + &self, + socket: Option, + config: &Config, + ) -> crate::error::Result<()> { + announce_http(&self.device, socket, self.client.clone()).await?; + announce_multicast(&self.device, config.multicast_addr, self.socket.clone()).await?; + Ok(()) + } + + pub async fn listen_multicast(&self, config: &Config) -> crate::error::Result<()> { + let mut buf = [0; 65536]; + println!("Socket local addr: {:?}", self.socket.local_addr()?); + println!("Listening on multicast addr: {}", config.multicast_addr); + + loop { + match self.socket.recv_from(&mut buf).await { + Ok((size, src)) => { + let received_msg = String::from_utf8_lossy(&buf[..size]); + self.process_device(&received_msg, src, config).await; + } + Err(e) => { + eprintln!("Error receiving message: {e}"); + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + } + } + if let Ok(state) = self.running_state.try_lock() + && *state == RunningState::Stopping + { + break; + } + } + Ok(()) + } + + async fn process_device(&self, message: &str, src: SocketAddr, config: &Config) { + if let Ok(device) = serde_json::from_str::(message) { + if device.fingerprint == self.device.fingerprint { + return; + } + + let mut src = src; + src.set_port(device.port); // Update the port to the one the device sent + + let mut peers = self.peers.lock().await; + peers.insert(device.fingerprint.clone(), (src, device.clone())); + + if device.announce != Some(true) { + return; + } + + // Announce in return upon receiving a valid device message and it wants + // announcements + if let Err(e) = + announce_multicast(&device, config.multicast_addr, self.socket.clone()).await + { + eprintln!("Error during multicast announcement: {e}"); + } + if let Err(e) = announce_http(&device, Some(src), self.client.clone()).await { + eprintln!("Error during HTTP announcement: {e}"); + }; + } else { + eprintln!("Received invalid message: {message}"); + } + } +} + +/// Axum request handler for receiving other devices' registration requests. +pub async fn register_device( + State(state): State, + ConnectInfo(addr): ConnectInfo, + Json(device): Json, +) -> Json { + let mut addr = addr; + addr.set_port(state.device.port); + state + .peers + .lock() + .await + .insert(device.fingerprint.clone(), (addr, device.clone())); + Json(device) +} + +//-************************************************************************ +// private helpers +//-************************************************************************ +async fn announce_http( + device: &DeviceInfo, + ip: Option, + client: Arc, +) -> crate::error::Result<()> { + if let Some(ip) = ip { + let url = format!("http://{ip}/api/localsend/v2/register"); + client.post(&url).json(device).send().await?; + } + Ok(()) +} + +async fn announce_multicast( + device: &DeviceInfo, + addr: SocketAddrV4, + socket: Arc, +) -> crate::error::Result<()> { + let msg = device.to_json()?; + socket.send_to(msg.as_bytes(), addr).await?; + Ok(()) +} diff --git a/src/discovery/http.rs b/src/discovery/http.rs deleted file mode 100644 index 8219f3a..0000000 --- a/src/discovery/http.rs +++ /dev/null @@ -1,55 +0,0 @@ -use std::{net::SocketAddr, sync::Arc}; - -use axum::{ - Json, - extract::{ConnectInfo, State}, -}; - -use crate::{Client, JoecalState, models::device::DeviceInfo}; - -impl Client { - pub async fn announce_http(&self, ip: Option) -> crate::error::Result<()> { - if let Some(ip) = ip { - let url = format!("http://{ip}/api/localsend/v2/register"); - let client = reqwest::Client::new(); - client.post(&url).json(&self.device).send().await?; - } - Ok(()) - } - - pub async fn announce_http_legacy(&self) -> crate::error::Result<()> { - // send the reqwest to all local ip addresses from 192.168.0.0 to - // 192.168.255.255 - let mut address_list = Vec::new(); - for j in 0..256 { - for k in 0..256 { - address_list.push(format!("192.168.{j:03}.{k}:53317")); - } - } - - for ip in address_list { - let url = format!("http://{ip}/api/localsend/v2/register"); - self.http_client - .post(&url) - .json(&self.device) - .send() - .await?; - } - Ok(()) - } -} - -pub async fn register_device( - State(state): State>, - ConnectInfo(addr): ConnectInfo, - Json(device): Json, -) -> Json { - let mut addr = addr; - addr.set_port(state.device.port); - state - .peers - .lock() - .await - .insert(device.fingerprint.clone(), (addr, device.clone())); - Json(device) -} diff --git a/src/discovery/mod.rs b/src/discovery/mod.rs deleted file mode 100644 index a37f2ee..0000000 --- a/src/discovery/mod.rs +++ /dev/null @@ -1,43 +0,0 @@ -use std::net::SocketAddr; - -use crate::{Client, models::device::DeviceInfo}; - -pub mod http; -pub mod multicast; - -impl Client { - pub async fn announce(&self, socket: Option) -> crate::error::Result<()> { - self.announce_http(socket).await?; - self.announce_multicast().await?; - Ok(()) - } - - async fn process_device(&self, message: &str, src: SocketAddr) { - if let Ok(device) = serde_json::from_str::(message) { - if device.fingerprint == self.device.fingerprint { - return; - } - - let mut src = src; - src.set_port(device.port); // Update the port to the one the device sent - - let mut peers = self.peers.lock().await; - peers.insert(device.fingerprint.clone(), (src, device.clone())); - - if device.announce != Some(true) { - return; - } - - // Announce in return upon receiving a valid device message and it wants - // announcements - if let Err(e) = self.announce_multicast().await { - eprintln!("Error during multicast announcement: {e}"); - } - if let Err(e) = self.announce_http(Some(src)).await { - eprintln!("Error during HTTP announcement: {e}"); - }; - } else { - eprintln!("Received invalid message: {message}"); - } - } -} diff --git a/src/discovery/multicast.rs b/src/discovery/multicast.rs deleted file mode 100644 index 538beda..0000000 --- a/src/discovery/multicast.rs +++ /dev/null @@ -1,29 +0,0 @@ -use crate::Client; - -impl Client { - pub async fn announce_multicast(&self) -> crate::error::Result<()> { - let msg = self.device.to_json()?; - let addr = self.multicast_addr; - self.socket.send_to(msg.as_bytes(), addr).await?; - Ok(()) - } - - pub async fn listen_multicast(&self) -> crate::error::Result<()> { - let mut buf = [0; 65536]; - println!("Socket local addr: {:?}", self.socket.local_addr()?); - println!("Listening on multicast addr: {}", self.multicast_addr); - - loop { - match self.socket.recv_from(&mut buf).await { - Ok((size, src)) => { - let received_msg = String::from_utf8_lossy(&buf[..size]); - self.process_device(&received_msg, src).await; - } - Err(e) => { - eprintln!("Error receiving message: {e}"); - tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; - } - } - } - } -} diff --git a/src/lib.rs b/src/lib.rs index f17ccc8..f259ec8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -10,111 +10,116 @@ use std::{ sync::Arc, }; +use serde::{Deserialize, Serialize}; use tokio::{net::UdpSocket, sync::Mutex, task::JoinHandle}; use transfer::session::Session; use crate::models::device::DeviceInfo; -#[derive(Clone)] -pub struct Client { - pub device: DeviceInfo, - pub socket: Arc, - pub multicast_addr: SocketAddrV4, - pub port: u16, - pub peers: Arc>>, - pub sessions: Arc>>, // Session ID to Session - pub http_client: reqwest::Client, - pub download_dir: String, -} +pub const DEFAULT_PORT: u16 = 53317; +pub const MULTICAST_IP: Ipv4Addr = Ipv4Addr::new(224, 0, 0, 167); +pub const LISTENING_SOCKET_ADDR: SocketAddrV4 = + SocketAddrV4::new(Ipv4Addr::from_bits(0), DEFAULT_PORT); #[derive(Clone)] pub struct JoecalState { pub device: DeviceInfo, pub peers: Arc>>, pub sessions: Arc>>, // Session ID to Session + pub running_state: Arc>, + pub socket: Arc, + pub client: Arc, } -impl Client { - pub async fn default() -> crate::error::Result { - let device = DeviceInfo::default(); - let socket = UdpSocket::bind("0.0.0.0:53317").await?; +impl JoecalState { + pub async fn new(device: DeviceInfo) -> crate::error::Result { + let socket = UdpSocket::bind(LISTENING_SOCKET_ADDR).await?; socket.set_multicast_loop_v4(true)?; - socket.set_multicast_ttl_v4(255)?; - socket.join_multicast_v4(Ipv4Addr::new(224, 0, 0, 167), Ipv4Addr::new(0, 0, 0, 0))?; - let multicast_addr = SocketAddrV4::new(Ipv4Addr::new(224, 0, 0, 167), 53317); - let port = 53317; - let peers = Arc::new(Mutex::new(HashMap::new())); - let http_client = reqwest::Client::new(); - let sessions = Arc::new(Mutex::new(HashMap::new())); - let download_dir = "/home/localsend".to_string(); + socket.set_multicast_ttl_v4(2)?; // one hop out from localnet + socket.join_multicast_v4(MULTICAST_IP, Ipv4Addr::from_bits(0))?; Ok(Self { device, + peers: Default::default(), + sessions: Default::default(), + running_state: Default::default(), socket: socket.into(), - multicast_addr, - port, - peers, - http_client, - sessions, - download_dir, + client: reqwest::Client::new().into(), }) } +} - pub async fn with_config( - info: DeviceInfo, - port: u16, - download_dir: String, - ) -> crate::error::Result { - let socket = UdpSocket::bind(format!("0.0.0.0:{}", port.clone())).await?; - socket.set_multicast_loop_v4(true)?; - socket.set_multicast_ttl_v4(255)?; - socket.join_multicast_v4(Ipv4Addr::new(224, 0, 0, 167), Ipv4Addr::new(0, 0, 0, 0))?; - let multicast_addr = SocketAddrV4::new(Ipv4Addr::new(224, 0, 0, 167), port); - let peers = Arc::new(Mutex::new(HashMap::new())); - let http_client = reqwest::Client::new(); - let sessions = Arc::new(Mutex::new(HashMap::new())); +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum RunningState { + Running, + Sending, + Receiving, + Stopping, +} - Ok(Self { - device: info, - socket: socket.into(), - multicast_addr, - port, - peers, - http_client, - sessions, - download_dir, - }) +impl Default for RunningState { + fn default() -> Self { + Self::Running } +} +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Config { + pub multicast_addr: SocketAddrV4, + pub port: u16, + pub download_dir: String, +} + +impl Default for Config { + fn default() -> Self { + let home = std::env::home_dir().unwrap_or("/tmp".into()); + let dd = home.join("joecalsend-downloads"); + Self { + multicast_addr: SocketAddrV4::new(MULTICAST_IP, DEFAULT_PORT), + port: DEFAULT_PORT, + download_dir: dd.to_string_lossy().into(), + } + } +} + +impl JoecalState { pub async fn start( &self, + config: &Config, ) -> crate::error::Result<(JoinHandle<()>, JoinHandle<()>, JoinHandle<()>)> { + let state = self.clone(); + let konfig = config.clone(); let server_handle = { - let client = self.clone(); tokio::spawn(async move { - if let Err(e) = client.start_http_server().await { + if let Err(e) = state.start_http_server(&konfig).await { eprintln!("HTTP server error: {e}"); } }) }; - + let state = self.clone(); + let konfig = config.clone(); let udp_handle = { - let client = self.clone(); tokio::spawn(async move { - if let Err(e) = client.listen_multicast().await { + if let Err(e) = state.listen_multicast(&konfig).await { eprintln!("UDP listener error: {e}"); } }) }; + let state = self.clone(); + let config = config.clone(); let announcement_handle = { - let client = self.clone(); tokio::spawn(async move { loop { - if let Err(e) = client.announce(None).await { + if let Err(e) = state.announce(None, &config).await { eprintln!("Announcement error: {e}"); } tokio::time::sleep(std::time::Duration::from_secs(5)).await; + if let Ok(lock) = state.running_state.try_lock() + && *lock == RunningState::Stopping + { + break; + } } }) }; diff --git a/src/main.rs b/src/main.rs index 6c275fd..ff62d58 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,6 +1,6 @@ -use std::{io, sync::Arc}; +use std::io; -use joecalsend::{Client, JoecalState, error, models::device::DeviceInfo}; +use joecalsend::{Config, JoecalState, RunningState, error, models::device::DeviceInfo}; use local_ip_address::local_ip; use network_interface::{Addr, NetworkInterface, NetworkInterfaceConfig, V4IfAddr}; use ratatui::{ @@ -40,12 +40,13 @@ async fn main() -> error::Result<()> { } dbg!(network_ip); - let client = Client::with_config(device, 53317, "/home/ardent/joecalsend".into()) + let state = JoecalState::new(device) .await - .unwrap(); - let (h1, h2, h3) = client.start().await.unwrap(); + .expect("Could not create application session"); + let config = Config::default(); + let (h1, h2, h3) = state.start(&config).await.unwrap(); - let mut app = App::new(Arc::new(client.clone())); + let mut app = App::new(state.clone()); let mut terminal = ratatui::init(); let result = app.run(&mut terminal); ratatui::restore(); @@ -55,34 +56,23 @@ async fn main() -> error::Result<()> { } struct App { - client: Arc, - runstate: AppRunState, -} - -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -enum AppRunState { - Running, - Stopping, -} - -impl Default for AppRunState { - fn default() -> Self { - Self::Running - } + state: JoecalState, } impl App { - pub fn new(client: Arc) -> Self { - App { - client, - runstate: AppRunState::Running, - } + pub fn new(state: JoecalState) -> Self { + App { state } } pub fn run(&mut self, terminal: &mut DefaultTerminal) -> io::Result<()> { - while AppRunState::Running == self.runstate { + loop { terminal.draw(|frame| self.draw(frame))?; self.handle_events()?; + if let Ok(lock) = self.state.running_state.try_lock() + && *lock == RunningState::Stopping + { + break; + } } Ok(()) } @@ -114,7 +104,12 @@ impl App { } fn exit(&mut self) { - self.runstate = AppRunState::Stopping + loop { + if let Ok(mut lock) = self.state.running_state.try_lock() { + *lock = RunningState::Stopping; + break; + } + } } } @@ -136,7 +131,12 @@ impl Widget for &App { .title_bottom(instructions.centered()) .border_set(border::THICK); - let rs = format!("{:?}", self.runstate); + let rs = self + .state + .running_state + .try_lock() + .map(|s| format!("{s:?}")) + .unwrap_or("Just a moment...".into()); let state_text = Text::from(vec![Line::from(vec!["runstate: ".into(), rs.yellow()])]); Paragraph::new(state_text) diff --git a/src/server/http.rs b/src/server/http.rs index 3202389..dcfe4b3 100644 --- a/src/server/http.rs +++ b/src/server/http.rs @@ -1,4 +1,4 @@ -use std::{net::SocketAddr, sync::Arc}; +use std::net::SocketAddr; use axum::{ Extension, Json, Router, @@ -9,15 +9,15 @@ use tokio::net::TcpListener; use tower_http::limit::RequestBodyLimitLayer; use crate::{ - Client, JoecalState, - discovery::http::register_device, + Config, JoecalState, + discovery::register_device, transfer::upload::{register_prepare_upload, register_upload}, }; -impl Client { - pub async fn start_http_server(&self) -> crate::error::Result<()> { - let app = self.create_router(); - let addr = SocketAddr::from(([0, 0, 0, 0], self.port)); +impl JoecalState { + pub async fn start_http_server(&self, config: &Config) -> crate::error::Result<()> { + let app = self.create_router(&config); + let addr = SocketAddr::from(([0, 0, 0, 0], config.port)); let listener = TcpListener::bind(&addr).await?; println!("HTTP server listening on {addr}"); @@ -30,15 +30,8 @@ impl Client { Ok(()) } - fn create_router(&self) -> Router { - let peers = self.peers.clone(); + fn create_router(&self, config: &Config) -> Router { let device = self.device.clone(); - let state = Arc::new(JoecalState { - peers, - device: device.clone(), - sessions: self.sessions.clone(), - }); - Router::new() .route("/api/localsend/v2/register", post(register_device)) .route( @@ -55,7 +48,7 @@ impl Client { .route("/api/localsend/v2/upload", post(register_upload)) .layer(DefaultBodyLimit::disable()) .layer(RequestBodyLimitLayer::new(1024 * 1024 * 1024)) - .layer(Extension(self.download_dir.clone())) - .with_state(state) + .layer(Extension(config.download_dir.clone())) + .with_state(self.clone()) } } diff --git a/src/transfer/upload.rs b/src/transfer/upload.rs index 8562af4..836d6a4 100644 --- a/src/transfer/upload.rs +++ b/src/transfer/upload.rs @@ -1,4 +1,4 @@ -use std::{collections::HashMap, net::SocketAddr, path::PathBuf, sync::Arc}; +use std::{collections::HashMap, net::SocketAddr, path::PathBuf}; use axum::{ Extension, Json, @@ -12,7 +12,7 @@ use native_dialog::MessageDialogBuilder; use serde::{Deserialize, Serialize}; use crate::{ - Client, JoecalState, + JoecalState, error::{LocalSendError, Result}, models::{device::DeviceInfo, file::FileMetadata}, transfer::session::{Session, SessionStatus}, @@ -32,7 +32,7 @@ pub struct PrepareUploadRequest { pub files: HashMap, } -impl Client { +impl JoecalState { pub async fn prepare_upload( &self, peer: String, @@ -46,8 +46,8 @@ impl Client { println!("Peer: {peer:?}"); let response = self - .http_client - .post(&format!( + .client + .post(format!( "{}://{}/api/localsend/v2/prepare-upload", peer.1.protocol, peer.0.clone() @@ -99,8 +99,7 @@ impl Client { return Err(LocalSendError::InvalidToken); } - let request = self - .http_client + let request = self.client .post(format!( "{}://{}/api/localsend/v2/upload?sessionId={session_id}&fileId={file_id}&token={token}", session.receiver.protocol, session.addr)) @@ -155,7 +154,7 @@ impl Client { let session = sessions.get(&session_id).unwrap(); let request = self - .http_client + .client .post(format!( "{}://{}/api/localsend/v2/cancel?sessionId={}", session.receiver.protocol, session.addr, session_id @@ -172,7 +171,7 @@ impl Client { } pub async fn register_prepare_upload( - State(state): State>, + State(state): State, ConnectInfo(addr): ConnectInfo, Json(req): Json, ) -> impl IntoResponse { @@ -225,7 +224,7 @@ pub async fn register_prepare_upload( pub async fn register_upload( Query(params): Query, - State(state): State>, + State(state): State, Extension(download_dir): Extension, body: Bytes, ) -> impl IntoResponse {