use std::{ net::{SocketAddr, SocketAddrV4}, sync::Arc, time::Duration, }; use axum::{ Json, extract::{ConnectInfo, State}, }; use tokio::net::UdpSocket; use crate::{Config, JoecalState, RunningState, models::Device}; impl JoecalState { pub async fn announce( &self, socket: Option, config: &Config, ) -> crate::error::Result<()> { announce_http(&self.device, socket, self.client.clone()).await?; announce_multicast(&self.device, config.multicast_addr, self.socket.clone()).await?; Ok(()) } pub async fn listen_multicast(&self, config: &Config) -> crate::error::Result<()> { let mut buf = [0; 65536]; println!("Socket local addr: {:?}", self.socket.local_addr()?); println!("Listening on multicast addr: {}", config.multicast_addr); let mut timeout = tokio::time::interval(Duration::from_secs(5)); loop { tokio::select! { _ = timeout.tick() => { let rstate = self.running_state.lock().await; if *rstate == RunningState::Stopping { break; } }, r = self.socket.recv_from(&mut buf) => { match r { Ok((size, src)) => { let received_msg = String::from_utf8_lossy(&buf[..size]); self.process_device(&received_msg, src, config).await; } Err(e) => { eprintln!("Error receiving message: {e}"); tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; } } } } } Ok(()) } async fn process_device(&self, message: &str, src: SocketAddr, config: &Config) { if let Ok(device) = serde_json::from_str::(message) { if device.fingerprint == self.device.fingerprint { return; } let mut src = src; src.set_port(device.port); // Update the port to the one the device sent let mut peers = self.peers.lock().await; peers.insert(device.fingerprint.clone(), (src, device.clone())); if device.announce != Some(true) { return; } // Announce in return upon receiving a valid device message and it wants // announcements if let Err(e) = announce_multicast(&device, config.multicast_addr, self.socket.clone()).await { eprintln!("Error during multicast announcement: {e}"); } if let Err(e) = announce_http(&device, Some(src), self.client.clone()).await { eprintln!("Error during HTTP announcement: {e}"); }; } else { eprintln!("Received invalid message: {message}"); } } } /// Axum request handler for receiving other devices' registration requests. pub async fn register_device( State(state): State, ConnectInfo(addr): ConnectInfo, Json(device): Json, ) -> Json { let mut addr = addr; addr.set_port(state.device.port); state .peers .lock() .await .insert(device.fingerprint.clone(), (addr, device.clone())); Json(device) } //-************************************************************************ // private helpers //-************************************************************************ async fn announce_http( device: &Device, ip: Option, client: reqwest::Client, ) -> crate::error::Result<()> { if let Some(ip) = ip { let url = format!("http://{ip}/api/localsend/v2/register"); client.post(&url).json(device).send().await?; } Ok(()) } async fn announce_multicast( device: &Device, addr: SocketAddrV4, socket: Arc, ) -> crate::error::Result<()> { let msg = device.to_json()?; socket.send_to(msg.as_bytes(), addr).await?; Ok(()) }