joecalsend/src/discovery.rs
2025-07-28 15:51:00 -07:00

132 lines
4 KiB
Rust

use std::{
net::{SocketAddr, SocketAddrV4},
sync::Arc,
time::Duration,
};
use axum::{
Json,
extract::{ConnectInfo, State},
};
use tokio::net::UdpSocket;
use crate::{Config, JoecalState, RunningState, models::Device};
impl JoecalState {
pub async fn announce(
&self,
socket: Option<SocketAddr>,
config: &Config,
) -> crate::error::Result<()> {
announce_http(&self.device, socket, self.client.clone()).await?;
announce_multicast(&self.device, config.multicast_addr, self.socket.clone()).await?;
Ok(())
}
pub async fn listen_multicast(&self, config: &Config) -> crate::error::Result<()> {
let mut buf = [0; 65536];
let mut timeout = tokio::time::interval(Duration::from_secs(5));
timeout.tick().await;
loop {
tokio::select! {
_ = timeout.tick() => {
let rstate = {
*self.running_state.lock().await
};
if rstate == RunningState::Stopping
{
break;
}
},
r = self.socket.recv_from(&mut buf) => {
match r {
Ok((size, src)) => {
let received_msg = String::from_utf8_lossy(&buf[..size]);
self.process_device(&received_msg, src, config).await;
}
Err(e) => {
eprintln!("Error receiving message: {e}");
}
}
}
}
}
Ok(())
}
async fn process_device(&self, message: &str, src: SocketAddr, config: &Config) {
if let Ok(device) = serde_json::from_str::<Device>(message) {
if device.fingerprint == self.device.fingerprint {
return;
}
let mut src = src;
src.set_port(device.port); // Update the port to the one the device sent
{
let mut peers = self.peers.lock().await;
peers.insert(device.fingerprint.clone(), (src, device.clone()));
}
if device.announce != Some(true) {
return;
}
// Announce in return upon receiving a valid device message and it wants
// announcements
if let Err(e) =
announce_multicast(&device, config.multicast_addr, self.socket.clone()).await
{
eprintln!("Error during multicast announcement: {e}");
}
if let Err(e) = announce_http(&device, Some(src), self.client.clone()).await {
eprintln!("Error during HTTP announcement: {e}");
};
} else {
eprintln!("Received invalid message: {message}");
}
}
}
/// Axum request handler for receiving other devices' registration requests.
pub async fn register_device(
State(state): State<JoecalState>,
ConnectInfo(addr): ConnectInfo<SocketAddr>,
Json(device): Json<Device>,
) -> Json<Device> {
let mut addr = addr;
addr.set_port(state.device.port);
state
.peers
.lock()
.await
.insert(device.fingerprint.clone(), (addr, device.clone()));
Json(device)
}
//-************************************************************************
// private helpers
//-************************************************************************
async fn announce_http(
device: &Device,
ip: Option<SocketAddr>,
client: reqwest::Client,
) -> crate::error::Result<()> {
if let Some(ip) = ip {
let url = format!("http://{ip}/api/localsend/v2/register");
client.post(&url).json(device).send().await?;
}
Ok(())
}
async fn announce_multicast(
device: &Device,
addr: SocketAddrV4,
socket: Arc<UdpSocket>,
) -> crate::error::Result<()> {
let msg = device.to_json()?;
socket.send_to(msg.as_bytes(), addr).await?;
Ok(())
}