From f42dcef0db9d4acc020b146a6ccc2647bbe1f468 Mon Sep 17 00:00:00 2001 From: Joe Ardent Date: Sun, 21 Sep 2025 12:26:20 -0700 Subject: [PATCH] use rwlock instead of mutex, 2x faster transfer ensues --- src/discovery.rs | 6 +++--- src/lib.rs | 16 ++++++++-------- src/main.rs | 2 +- src/transfer.rs | 29 ++++++++++++++++------------- 4 files changed, 28 insertions(+), 25 deletions(-) diff --git a/src/discovery.rs b/src/discovery.rs index f89bfc7..de02fe5 100644 --- a/src/discovery.rs +++ b/src/discovery.rs @@ -37,7 +37,7 @@ impl JocalService { tokio::select! { _ = timeout.tick() => { let rstate = { - *self.running_state.lock().await + *self.running_state.read().await }; if rstate == RunningState::Stopping { @@ -71,7 +71,7 @@ impl JocalService { src.set_port(device.port); // Update the port to the one the device sent { - let mut peers = self.peers.lock().await; + let mut peers = self.peers.write().await; peers.insert(device.fingerprint.clone(), (src, device.clone())); } @@ -108,7 +108,7 @@ pub async fn register_device( addr.set_port(service.config.device.port); service .peers - .lock() + .write() .await .insert(device.fingerprint.clone(), (addr, device.clone())); Json(device).into_response() diff --git a/src/lib.rs b/src/lib.rs index 9bba52c..e9d21c5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -21,7 +21,7 @@ use models::{Device, FileMetadata}; use tokio::{ net::UdpSocket, sync::{ - Mutex, + RwLock, mpsc::{self, UnboundedReceiver, UnboundedSender}, }, task::JoinSet, @@ -32,8 +32,8 @@ pub const DEFAULT_PORT: u16 = 53317; pub const MULTICAST_IP: Ipv4Addr = Ipv4Addr::new(224, 0, 0, 167); pub const DEFAULT_INTERVAL: Duration = Duration::from_millis(100); -pub type Peers = Arc>>; -pub type Sessions = Arc>>; // Session ID to Session +pub type Peers = Arc>>; +pub type Sessions = Arc>>; // Session ID to Session #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum JocalTasks { @@ -96,7 +96,7 @@ impl Debug for ReceiveRequest { pub struct JocalService { pub peers: Peers, pub sessions: Sessions, - pub running_state: Arc>, + pub running_state: Arc>, pub socket: Arc, pub client: reqwest::Client, pub config: Config, @@ -167,7 +167,7 @@ impl JocalService { .send(JocalEvent::Tick) .unwrap_or_else(|e| log::warn!("could not send tick event: {e:?}")); - let rstate = service.running_state.lock().await; + let rstate = service.running_state.read().await; if *rstate == RunningState::Stopping { break; } @@ -184,7 +184,7 @@ impl JocalService { tokio::time::sleep(Duration::from_secs(2)).await; - let rstate = service.running_state.lock().await; + let rstate = service.running_state.read().await; if *rstate == RunningState::Stopping { break; } @@ -195,7 +195,7 @@ impl JocalService { pub async fn stop(&self) { { - let mut rstate = self.running_state.lock().await; + let mut rstate = self.running_state.write().await; *rstate = RunningState::Stopping; } log::info!("shutting down http server"); @@ -206,7 +206,7 @@ impl JocalService { } pub async fn clear_peers(&self) { - let mut peers = self.peers.lock().await; + let mut peers = self.peers.write().await; peers.clear(); } diff --git a/src/main.rs b/src/main.rs index 38fb569..ed45503 100644 --- a/src/main.rs +++ b/src/main.rs @@ -68,7 +68,7 @@ async fn start_and_run(terminal: &mut DefaultTerminal, config: Config) -> Result } else { app.handle_events().await?; - let peers = app.service.peers.lock().await; + let peers = app.service.peers.read().await; app.peers.clear(); peers.iter().for_each(|(fingerprint, (addr, device))| { let alias = device.alias.clone(); diff --git a/src/transfer.rs b/src/transfer.rs index a8658c5..d034a68 100644 --- a/src/transfer.rs +++ b/src/transfer.rs @@ -19,7 +19,7 @@ use crate::{ models::{Device, FileMetadata}, }; -#[derive(Deserialize, Serialize)] +#[derive(Deserialize, Serialize, Clone)] pub struct Session { pub session_id: String, pub files: BTreeMap, @@ -30,7 +30,7 @@ pub struct Session { pub addr: SocketAddr, } -#[derive(PartialEq, Deserialize, Serialize)] +#[derive(PartialEq, Deserialize, Serialize, Clone, Copy)] pub enum SessionStatus { Pending, Active, @@ -65,7 +65,7 @@ impl JocalService { } pub async fn cancel_upload(&self, session_id: &str) -> Result<()> { - let sessions = self.sessions.lock().await; + let sessions = self.sessions.read().await; let session = sessions .get(session_id) .ok_or(LocalSendError::SessionNotFound)?; @@ -146,7 +146,7 @@ impl JocalService { log::info!( "sending {content_id} to {}", peers - .lock() + .read() .await .get(&peer) .map(|(_, peer)| peer.alias.as_str()) @@ -237,7 +237,7 @@ pub async fn handle_prepare_upload( service .sessions - .lock() + .write() .await .insert(session_id.clone(), session); @@ -262,10 +262,13 @@ pub async fn handle_receive_upload( let token = ¶ms.token; // Get session and validate - let mut sessions_lock = service.sessions.lock().await; - let session = match sessions_lock.get_mut(session_id) { - Some(session) => session, - None => return StatusCode::BAD_REQUEST.into_response(), + + let session = { + let lock = service.sessions.read().await; + match lock.get(session_id).cloned() { + Some(session) => session, + None => return StatusCode::BAD_REQUEST.into_response(), + } }; if session.status != SessionStatus::Active { @@ -326,7 +329,7 @@ pub async fn handle_cancel( Query(params): Query, State(service): State, ) -> impl IntoResponse { - let mut sessions_lock = service.sessions.lock().await; + let mut sessions_lock = service.sessions.write().await; let session = match sessions_lock.get_mut(¶ms.session_id) { Some(session) => session, None => return StatusCode::BAD_REQUEST.into_response(), @@ -359,7 +362,7 @@ async fn do_send_bytes( token: &String, body: Bytes, ) -> Result<()> { - let sessions = sessions.lock().await; + let sessions = sessions.read().await; let session = sessions.get(session_id).unwrap(); if session.status != SessionStatus::Active { @@ -396,7 +399,7 @@ async fn do_prepare_upload( sessions: &Sessions, files: BTreeMap, ) -> Result { - let Some((addr, device)) = peers.lock().await.get(peer).cloned() else { + let Some((addr, device)) = peers.read().await.get(peer).cloned() else { return Err(LocalSendError::PeerNotFound); }; @@ -441,7 +444,7 @@ async fn do_prepare_upload( }; sessions - .lock() + .write() .await .insert(response.session_id.clone(), session);