use rwlock instead of mutex, 2x faster transfer ensues

This commit is contained in:
Joe Ardent 2025-09-21 12:26:20 -07:00
parent 00092dc97b
commit f42dcef0db
4 changed files with 28 additions and 25 deletions

View file

@ -37,7 +37,7 @@ impl JocalService {
tokio::select! { tokio::select! {
_ = timeout.tick() => { _ = timeout.tick() => {
let rstate = { let rstate = {
*self.running_state.lock().await *self.running_state.read().await
}; };
if rstate == RunningState::Stopping if rstate == RunningState::Stopping
{ {
@ -71,7 +71,7 @@ impl JocalService {
src.set_port(device.port); // Update the port to the one the device sent src.set_port(device.port); // Update the port to the one the device sent
{ {
let mut peers = self.peers.lock().await; let mut peers = self.peers.write().await;
peers.insert(device.fingerprint.clone(), (src, device.clone())); peers.insert(device.fingerprint.clone(), (src, device.clone()));
} }
@ -108,7 +108,7 @@ pub async fn register_device(
addr.set_port(service.config.device.port); addr.set_port(service.config.device.port);
service service
.peers .peers
.lock() .write()
.await .await
.insert(device.fingerprint.clone(), (addr, device.clone())); .insert(device.fingerprint.clone(), (addr, device.clone()));
Json(device).into_response() Json(device).into_response()

View file

@ -21,7 +21,7 @@ use models::{Device, FileMetadata};
use tokio::{ use tokio::{
net::UdpSocket, net::UdpSocket,
sync::{ sync::{
Mutex, RwLock,
mpsc::{self, UnboundedReceiver, UnboundedSender}, mpsc::{self, UnboundedReceiver, UnboundedSender},
}, },
task::JoinSet, task::JoinSet,
@ -32,8 +32,8 @@ pub const DEFAULT_PORT: u16 = 53317;
pub const MULTICAST_IP: Ipv4Addr = Ipv4Addr::new(224, 0, 0, 167); pub const MULTICAST_IP: Ipv4Addr = Ipv4Addr::new(224, 0, 0, 167);
pub const DEFAULT_INTERVAL: Duration = Duration::from_millis(100); pub const DEFAULT_INTERVAL: Duration = Duration::from_millis(100);
pub type Peers = Arc<Mutex<BTreeMap<String, (SocketAddr, Device)>>>; pub type Peers = Arc<RwLock<BTreeMap<String, (SocketAddr, Device)>>>;
pub type Sessions = Arc<Mutex<BTreeMap<String, Session>>>; // Session ID to Session pub type Sessions = Arc<RwLock<BTreeMap<String, Session>>>; // Session ID to Session
#[derive(Debug, Clone, Copy, PartialEq, Eq)] #[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum JocalTasks { pub enum JocalTasks {
@ -96,7 +96,7 @@ impl Debug for ReceiveRequest {
pub struct JocalService { pub struct JocalService {
pub peers: Peers, pub peers: Peers,
pub sessions: Sessions, pub sessions: Sessions,
pub running_state: Arc<Mutex<RunningState>>, pub running_state: Arc<RwLock<RunningState>>,
pub socket: Arc<UdpSocket>, pub socket: Arc<UdpSocket>,
pub client: reqwest::Client, pub client: reqwest::Client,
pub config: Config, pub config: Config,
@ -167,7 +167,7 @@ impl JocalService {
.send(JocalEvent::Tick) .send(JocalEvent::Tick)
.unwrap_or_else(|e| log::warn!("could not send tick event: {e:?}")); .unwrap_or_else(|e| log::warn!("could not send tick event: {e:?}"));
let rstate = service.running_state.lock().await; let rstate = service.running_state.read().await;
if *rstate == RunningState::Stopping { if *rstate == RunningState::Stopping {
break; break;
} }
@ -184,7 +184,7 @@ impl JocalService {
tokio::time::sleep(Duration::from_secs(2)).await; tokio::time::sleep(Duration::from_secs(2)).await;
let rstate = service.running_state.lock().await; let rstate = service.running_state.read().await;
if *rstate == RunningState::Stopping { if *rstate == RunningState::Stopping {
break; break;
} }
@ -195,7 +195,7 @@ impl JocalService {
pub async fn stop(&self) { pub async fn stop(&self) {
{ {
let mut rstate = self.running_state.lock().await; let mut rstate = self.running_state.write().await;
*rstate = RunningState::Stopping; *rstate = RunningState::Stopping;
} }
log::info!("shutting down http server"); log::info!("shutting down http server");
@ -206,7 +206,7 @@ impl JocalService {
} }
pub async fn clear_peers(&self) { pub async fn clear_peers(&self) {
let mut peers = self.peers.lock().await; let mut peers = self.peers.write().await;
peers.clear(); peers.clear();
} }

View file

@ -68,7 +68,7 @@ async fn start_and_run(terminal: &mut DefaultTerminal, config: Config) -> Result
} else { } else {
app.handle_events().await?; app.handle_events().await?;
let peers = app.service.peers.lock().await; let peers = app.service.peers.read().await;
app.peers.clear(); app.peers.clear();
peers.iter().for_each(|(fingerprint, (addr, device))| { peers.iter().for_each(|(fingerprint, (addr, device))| {
let alias = device.alias.clone(); let alias = device.alias.clone();

View file

@ -19,7 +19,7 @@ use crate::{
models::{Device, FileMetadata}, models::{Device, FileMetadata},
}; };
#[derive(Deserialize, Serialize)] #[derive(Deserialize, Serialize, Clone)]
pub struct Session { pub struct Session {
pub session_id: String, pub session_id: String,
pub files: BTreeMap<String, FileMetadata>, pub files: BTreeMap<String, FileMetadata>,
@ -30,7 +30,7 @@ pub struct Session {
pub addr: SocketAddr, pub addr: SocketAddr,
} }
#[derive(PartialEq, Deserialize, Serialize)] #[derive(PartialEq, Deserialize, Serialize, Clone, Copy)]
pub enum SessionStatus { pub enum SessionStatus {
Pending, Pending,
Active, Active,
@ -65,7 +65,7 @@ impl JocalService {
} }
pub async fn cancel_upload(&self, session_id: &str) -> Result<()> { pub async fn cancel_upload(&self, session_id: &str) -> Result<()> {
let sessions = self.sessions.lock().await; let sessions = self.sessions.read().await;
let session = sessions let session = sessions
.get(session_id) .get(session_id)
.ok_or(LocalSendError::SessionNotFound)?; .ok_or(LocalSendError::SessionNotFound)?;
@ -146,7 +146,7 @@ impl JocalService {
log::info!( log::info!(
"sending {content_id} to {}", "sending {content_id} to {}",
peers peers
.lock() .read()
.await .await
.get(&peer) .get(&peer)
.map(|(_, peer)| peer.alias.as_str()) .map(|(_, peer)| peer.alias.as_str())
@ -237,7 +237,7 @@ pub async fn handle_prepare_upload(
service service
.sessions .sessions
.lock() .write()
.await .await
.insert(session_id.clone(), session); .insert(session_id.clone(), session);
@ -262,10 +262,13 @@ pub async fn handle_receive_upload(
let token = &params.token; let token = &params.token;
// Get session and validate // Get session and validate
let mut sessions_lock = service.sessions.lock().await;
let session = match sessions_lock.get_mut(session_id) { let session = {
Some(session) => session, let lock = service.sessions.read().await;
None => return StatusCode::BAD_REQUEST.into_response(), match lock.get(session_id).cloned() {
Some(session) => session,
None => return StatusCode::BAD_REQUEST.into_response(),
}
}; };
if session.status != SessionStatus::Active { if session.status != SessionStatus::Active {
@ -326,7 +329,7 @@ pub async fn handle_cancel(
Query(params): Query<CancelParams>, Query(params): Query<CancelParams>,
State(service): State<JocalService>, State(service): State<JocalService>,
) -> impl IntoResponse { ) -> impl IntoResponse {
let mut sessions_lock = service.sessions.lock().await; let mut sessions_lock = service.sessions.write().await;
let session = match sessions_lock.get_mut(&params.session_id) { let session = match sessions_lock.get_mut(&params.session_id) {
Some(session) => session, Some(session) => session,
None => return StatusCode::BAD_REQUEST.into_response(), None => return StatusCode::BAD_REQUEST.into_response(),
@ -359,7 +362,7 @@ async fn do_send_bytes(
token: &String, token: &String,
body: Bytes, body: Bytes,
) -> Result<()> { ) -> Result<()> {
let sessions = sessions.lock().await; let sessions = sessions.read().await;
let session = sessions.get(session_id).unwrap(); let session = sessions.get(session_id).unwrap();
if session.status != SessionStatus::Active { if session.status != SessionStatus::Active {
@ -396,7 +399,7 @@ async fn do_prepare_upload(
sessions: &Sessions, sessions: &Sessions,
files: BTreeMap<String, FileMetadata>, files: BTreeMap<String, FileMetadata>,
) -> Result<PrepareUploadResponse> { ) -> Result<PrepareUploadResponse> {
let Some((addr, device)) = peers.lock().await.get(peer).cloned() else { let Some((addr, device)) = peers.read().await.get(peer).cloned() else {
return Err(LocalSendError::PeerNotFound); return Err(LocalSendError::PeerNotFound);
}; };
@ -441,7 +444,7 @@ async fn do_prepare_upload(
}; };
sessions sessions
.lock() .write()
.await .await
.insert(response.session_id.clone(), session); .insert(response.session_id.clone(), session);