From ffab0f261a63e2d15e4a5caaff6df8e38be75c8d Mon Sep 17 00:00:00 2001 From: Joe Ardent Date: Sun, 10 Aug 2025 12:23:29 -0700 Subject: [PATCH] don't block the main thread on event handling --- src/app/mod.rs | 1 - src/http_server.rs | 24 ++++++++++-------------- src/lib.rs | 20 +++++++++----------- src/main.rs | 24 ++++++++++++------------ src/transfer.rs | 19 ++++++++++++------- 5 files changed, 43 insertions(+), 45 deletions(-) diff --git a/src/app/mod.rs b/src/app/mod.rs index e1debe5..eb7b070 100644 --- a/src/app/mod.rs +++ b/src/app/mod.rs @@ -25,7 +25,6 @@ pub struct Peer { pub struct App { pub service: JocalService, pub events: EventStream, - // addr -> (alias, fingerprint) pub peers: Vec, pub peer_state: ListState, pub receive_requests: BTreeMap, diff --git a/src/http_server.rs b/src/http_server.rs index 13e8b2c..20954ab 100644 --- a/src/http_server.rs +++ b/src/http_server.rs @@ -1,4 +1,4 @@ -use std::{net::SocketAddr, path::Path, time::Duration}; +use std::{net::SocketAddr, path::Path}; use axum::{ Json, Router, @@ -6,7 +6,6 @@ use axum::{ routing::{get, post}, }; use axum_server::{Handle, tls_rustls::RustlsConfig}; -use tokio::sync::mpsc; use tokio_rustls::rustls::{ ServerConfig, pki_types::{CertificateDer, PrivateKeyDer, pem::PemObject}, @@ -16,11 +15,11 @@ use tower_http::limit::RequestBodyLimitLayer; use crate::{ JocalService, discovery::register_device, - transfer::{prepare_upload, receive_upload}, + transfer::{handle_prepare_upload, handle_receive_upload}, }; impl JocalService { - pub async fn start_http_server(&self, stop_rx: mpsc::Receiver<()>) -> crate::error::Result<()> { + pub async fn start_http_server(&self) -> crate::error::Result<()> { let app = self.create_router(); // TODO: make addr config let addr = SocketAddr::from(([0, 0, 0, 0], self.config.device.port)); @@ -29,8 +28,8 @@ impl JocalService { let ssl_config = rustls_server_config(key, cert); let handle = Handle::new(); - - tokio::spawn(shutdown(handle.clone(), stop_rx)); + self.http_handle.get_or_init(|| handle.clone()); + log::info!("starting http server"); axum_server::bind_rustls(addr, ssl_config) .handle(handle) @@ -54,8 +53,11 @@ impl JocalService { "/api/localsend/v1/info", get(move || async move { Json(d2) }), ) - .route("/api/localsend/v2/prepare-upload", post(prepare_upload)) - .route("/api/localsend/v2/upload", post(receive_upload)) + .route( + "/api/localsend/v2/prepare-upload", + post(handle_prepare_upload), + ) + .route("/api/localsend/v2/upload", post(handle_receive_upload)) .layer(DefaultBodyLimit::disable()) .layer(RequestBodyLimitLayer::new(1024 * 1024 * 1024)) .with_state(self.clone()) @@ -86,9 +88,3 @@ fn rustls_server_config(key: impl AsRef, cert: impl AsRef) -> Rustls RustlsConfig::from_config(config.into()) } - -async fn shutdown(handle: Handle, mut rx: mpsc::Receiver<()>) { - let _ = rx.recv().await; - log::info!("shutting down http server"); - handle.graceful_shutdown(Some(Duration::from_secs(5))); -} diff --git a/src/lib.rs b/src/lib.rs index 87fe66e..c9b71a2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -10,6 +10,7 @@ use std::{ fmt::Debug, net::{Ipv4Addr, SocketAddr, SocketAddrV4}, sync::{Arc, OnceLock}, + time::Duration, }; pub use config::Config; @@ -78,7 +79,7 @@ pub struct JocalService { pub socket: Arc, pub client: reqwest::Client, pub config: Config, - shutdown_sender: OnceLock, + http_handle: Arc>, // the receiving end will be held by the application so it can update the UI based on backend // events transfer_event_tx: UnboundedSender, @@ -107,7 +108,7 @@ impl JocalService { peers: Default::default(), sessions: Default::default(), running_state: Default::default(), - shutdown_sender: Default::default(), + http_handle: Default::default(), }) } @@ -115,10 +116,8 @@ impl JocalService { let service = self.clone(); handles.spawn({ - let (tx, shutdown_rx) = mpsc::channel(1); - let _ = self.shutdown_sender.set(tx); async move { - if let Err(e) = service.start_http_server(shutdown_rx).await { + if let Err(e) = service.start_http_server().await { error!("HTTP server error: {e}"); } Listeners::Http @@ -146,7 +145,7 @@ impl JocalService { if let Err(e) = service.announce(None).await { error!("Announcement error: {e}"); } - tokio::time::sleep(std::time::Duration::from_secs(5)).await; + tokio::time::sleep(Duration::from_secs(5)).await; } Listeners::Udp } @@ -156,12 +155,11 @@ impl JocalService { pub async fn stop(&self) { let mut rstate = self.running_state.lock().await; *rstate = RunningState::Stopping; - let _ = self - .shutdown_sender + log::info!("shutting down http server"); + self.http_handle .get() - .expect("Could not get stop signal transmitter") - .send(()) - .await; + .expect("missing http handle for shutdown") + .graceful_shutdown(Some(Duration::from_secs(5))); } pub async fn refresh_peers(&self) { diff --git a/src/main.rs b/src/main.rs index d23eaaf..f50d375 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,4 +1,4 @@ -use std::{path::Path, str::FromStr}; +use std::{path::Path, str::FromStr, time::Duration}; use clap::Parser; use jocalsend::{Config, JocalService, Listeners, error::Result}; @@ -59,9 +59,16 @@ async fn start_and_run(terminal: &mut DefaultTerminal, config: Config) -> Result let mut handles = JoinSet::new(); app.service.start(&mut handles).await; + let mut alarm = tokio::time::interval(Duration::from_millis(200)); loop { terminal.draw(|frame| app.draw(frame))?; - app.handle_events().await?; + + tokio::select! { + res = app.handle_events() => { + res?; + } + _ = alarm.tick() => {} + } if app.screen() == CurrentScreen::Stopping { break; @@ -97,7 +104,7 @@ async fn start_and_run(terminal: &mut DefaultTerminal, config: Config) -> Result } async fn shutdown(handles: &mut JoinSet) { - let mut alarm = tokio::time::interval(tokio::time::Duration::from_secs(5)); + let mut alarm = tokio::time::interval(Duration::from_secs(5)); alarm.tick().await; loop { tokio::select! { @@ -120,14 +127,8 @@ async fn shutdown(handles: &mut JoinSet) { } fn set_file_selection(path: &Path, explorer: &mut FileExplorer) { - if path.is_absolute() { - let parent = path.parent().map(|f| f.to_path_buf()).unwrap_or("/".into()); - let _ = explorer.set_cwd(parent); - } else { - let parent = path.parent().map(|f| f.to_path_buf()).unwrap_or("/".into()); - let _ = explorer.set_cwd(parent); - }; - + let parent = path.parent().map(|f| f.to_path_buf()).unwrap_or("/".into()); + let _ = explorer.set_cwd(parent); let files = explorer.files(); let mut idx = None; for (i, f) in files.iter().enumerate() { @@ -136,7 +137,6 @@ fn set_file_selection(path: &Path, explorer: &mut FileExplorer) { break; } } - if let Some(idx) = idx { explorer.set_selected_idx(idx); } diff --git a/src/transfer.rs b/src/transfer.rs index 8e7152f..9d7b9b6 100644 --- a/src/transfer.rs +++ b/src/transfer.rs @@ -62,6 +62,8 @@ impl JocalService { return Err(LocalSendError::PeerNotFound); }; + log::debug!("preparing upload request"); + let request = self .client .post(format!( @@ -71,12 +73,12 @@ impl JocalService { .json(&PrepareUploadRequest { info: self.config.device.clone(), files: files.clone(), - }); + }) + .timeout(Duration::from_secs(30)); - let r = request.timeout(Duration::from_secs(30)).build().unwrap(); - debug!("sending '{r:?}' to peer at {addr:?}"); + debug!("sending '{request:?}' to peer at {addr:?}"); - let response = self.client.execute(r).await?; + let response = request.send().await?; debug!("Response: {response:?}"); @@ -230,12 +232,15 @@ impl JocalService { } } -pub async fn prepare_upload( +pub async fn handle_prepare_upload( State(service): State, ConnectInfo(addr): ConnectInfo, Json(req): Json, ) -> impl IntoResponse { - info!("Received upload request from alias: {}", req.info.alias); + info!( + "Received upload request from {} at {addr:?}", + req.info.alias + ); let id = Julid::new(); let (tx, mut rx) = unbounded_channel(); @@ -300,7 +305,7 @@ pub async fn prepare_upload( .into_response() } -pub async fn receive_upload( +pub async fn handle_receive_upload( Query(params): Query, State(service): State, body: Bytes,