don't block the main thread on event handling

This commit is contained in:
Joe Ardent 2025-08-10 12:23:29 -07:00
parent d84a046ec9
commit ffab0f261a
5 changed files with 43 additions and 45 deletions

View file

@ -25,7 +25,6 @@ pub struct Peer {
pub struct App { pub struct App {
pub service: JocalService, pub service: JocalService,
pub events: EventStream, pub events: EventStream,
// addr -> (alias, fingerprint)
pub peers: Vec<Peer>, pub peers: Vec<Peer>,
pub peer_state: ListState, pub peer_state: ListState,
pub receive_requests: BTreeMap<Julid, ReceiveRequest>, pub receive_requests: BTreeMap<Julid, ReceiveRequest>,

View file

@ -1,4 +1,4 @@
use std::{net::SocketAddr, path::Path, time::Duration}; use std::{net::SocketAddr, path::Path};
use axum::{ use axum::{
Json, Router, Json, Router,
@ -6,7 +6,6 @@ use axum::{
routing::{get, post}, routing::{get, post},
}; };
use axum_server::{Handle, tls_rustls::RustlsConfig}; use axum_server::{Handle, tls_rustls::RustlsConfig};
use tokio::sync::mpsc;
use tokio_rustls::rustls::{ use tokio_rustls::rustls::{
ServerConfig, ServerConfig,
pki_types::{CertificateDer, PrivateKeyDer, pem::PemObject}, pki_types::{CertificateDer, PrivateKeyDer, pem::PemObject},
@ -16,11 +15,11 @@ use tower_http::limit::RequestBodyLimitLayer;
use crate::{ use crate::{
JocalService, JocalService,
discovery::register_device, discovery::register_device,
transfer::{prepare_upload, receive_upload}, transfer::{handle_prepare_upload, handle_receive_upload},
}; };
impl JocalService { impl JocalService {
pub async fn start_http_server(&self, stop_rx: mpsc::Receiver<()>) -> crate::error::Result<()> { pub async fn start_http_server(&self) -> crate::error::Result<()> {
let app = self.create_router(); let app = self.create_router();
// TODO: make addr config // TODO: make addr config
let addr = SocketAddr::from(([0, 0, 0, 0], self.config.device.port)); let addr = SocketAddr::from(([0, 0, 0, 0], self.config.device.port));
@ -29,8 +28,8 @@ impl JocalService {
let ssl_config = rustls_server_config(key, cert); let ssl_config = rustls_server_config(key, cert);
let handle = Handle::new(); let handle = Handle::new();
self.http_handle.get_or_init(|| handle.clone());
tokio::spawn(shutdown(handle.clone(), stop_rx)); log::info!("starting http server");
axum_server::bind_rustls(addr, ssl_config) axum_server::bind_rustls(addr, ssl_config)
.handle(handle) .handle(handle)
@ -54,8 +53,11 @@ impl JocalService {
"/api/localsend/v1/info", "/api/localsend/v1/info",
get(move || async move { Json(d2) }), get(move || async move { Json(d2) }),
) )
.route("/api/localsend/v2/prepare-upload", post(prepare_upload)) .route(
.route("/api/localsend/v2/upload", post(receive_upload)) "/api/localsend/v2/prepare-upload",
post(handle_prepare_upload),
)
.route("/api/localsend/v2/upload", post(handle_receive_upload))
.layer(DefaultBodyLimit::disable()) .layer(DefaultBodyLimit::disable())
.layer(RequestBodyLimitLayer::new(1024 * 1024 * 1024)) .layer(RequestBodyLimitLayer::new(1024 * 1024 * 1024))
.with_state(self.clone()) .with_state(self.clone())
@ -86,9 +88,3 @@ fn rustls_server_config(key: impl AsRef<Path>, cert: impl AsRef<Path>) -> Rustls
RustlsConfig::from_config(config.into()) RustlsConfig::from_config(config.into())
} }
async fn shutdown(handle: Handle, mut rx: mpsc::Receiver<()>) {
let _ = rx.recv().await;
log::info!("shutting down http server");
handle.graceful_shutdown(Some(Duration::from_secs(5)));
}

View file

@ -10,6 +10,7 @@ use std::{
fmt::Debug, fmt::Debug,
net::{Ipv4Addr, SocketAddr, SocketAddrV4}, net::{Ipv4Addr, SocketAddr, SocketAddrV4},
sync::{Arc, OnceLock}, sync::{Arc, OnceLock},
time::Duration,
}; };
pub use config::Config; pub use config::Config;
@ -78,7 +79,7 @@ pub struct JocalService {
pub socket: Arc<UdpSocket>, pub socket: Arc<UdpSocket>,
pub client: reqwest::Client, pub client: reqwest::Client,
pub config: Config, pub config: Config,
shutdown_sender: OnceLock<ShutdownSender>, http_handle: Arc<OnceLock<axum_server::Handle>>,
// the receiving end will be held by the application so it can update the UI based on backend // the receiving end will be held by the application so it can update the UI based on backend
// events // events
transfer_event_tx: UnboundedSender<TransferEvent>, transfer_event_tx: UnboundedSender<TransferEvent>,
@ -107,7 +108,7 @@ impl JocalService {
peers: Default::default(), peers: Default::default(),
sessions: Default::default(), sessions: Default::default(),
running_state: Default::default(), running_state: Default::default(),
shutdown_sender: Default::default(), http_handle: Default::default(),
}) })
} }
@ -115,10 +116,8 @@ impl JocalService {
let service = self.clone(); let service = self.clone();
handles.spawn({ handles.spawn({
let (tx, shutdown_rx) = mpsc::channel(1);
let _ = self.shutdown_sender.set(tx);
async move { async move {
if let Err(e) = service.start_http_server(shutdown_rx).await { if let Err(e) = service.start_http_server().await {
error!("HTTP server error: {e}"); error!("HTTP server error: {e}");
} }
Listeners::Http Listeners::Http
@ -146,7 +145,7 @@ impl JocalService {
if let Err(e) = service.announce(None).await { if let Err(e) = service.announce(None).await {
error!("Announcement error: {e}"); error!("Announcement error: {e}");
} }
tokio::time::sleep(std::time::Duration::from_secs(5)).await; tokio::time::sleep(Duration::from_secs(5)).await;
} }
Listeners::Udp Listeners::Udp
} }
@ -156,12 +155,11 @@ impl JocalService {
pub async fn stop(&self) { pub async fn stop(&self) {
let mut rstate = self.running_state.lock().await; let mut rstate = self.running_state.lock().await;
*rstate = RunningState::Stopping; *rstate = RunningState::Stopping;
let _ = self log::info!("shutting down http server");
.shutdown_sender self.http_handle
.get() .get()
.expect("Could not get stop signal transmitter") .expect("missing http handle for shutdown")
.send(()) .graceful_shutdown(Some(Duration::from_secs(5)));
.await;
} }
pub async fn refresh_peers(&self) { pub async fn refresh_peers(&self) {

View file

@ -1,4 +1,4 @@
use std::{path::Path, str::FromStr}; use std::{path::Path, str::FromStr, time::Duration};
use clap::Parser; use clap::Parser;
use jocalsend::{Config, JocalService, Listeners, error::Result}; use jocalsend::{Config, JocalService, Listeners, error::Result};
@ -59,9 +59,16 @@ async fn start_and_run(terminal: &mut DefaultTerminal, config: Config) -> Result
let mut handles = JoinSet::new(); let mut handles = JoinSet::new();
app.service.start(&mut handles).await; app.service.start(&mut handles).await;
let mut alarm = tokio::time::interval(Duration::from_millis(200));
loop { loop {
terminal.draw(|frame| app.draw(frame))?; terminal.draw(|frame| app.draw(frame))?;
app.handle_events().await?;
tokio::select! {
res = app.handle_events() => {
res?;
}
_ = alarm.tick() => {}
}
if app.screen() == CurrentScreen::Stopping { if app.screen() == CurrentScreen::Stopping {
break; break;
@ -97,7 +104,7 @@ async fn start_and_run(terminal: &mut DefaultTerminal, config: Config) -> Result
} }
async fn shutdown(handles: &mut JoinSet<Listeners>) { async fn shutdown(handles: &mut JoinSet<Listeners>) {
let mut alarm = tokio::time::interval(tokio::time::Duration::from_secs(5)); let mut alarm = tokio::time::interval(Duration::from_secs(5));
alarm.tick().await; alarm.tick().await;
loop { loop {
tokio::select! { tokio::select! {
@ -120,14 +127,8 @@ async fn shutdown(handles: &mut JoinSet<Listeners>) {
} }
fn set_file_selection(path: &Path, explorer: &mut FileExplorer) { fn set_file_selection(path: &Path, explorer: &mut FileExplorer) {
if path.is_absolute() {
let parent = path.parent().map(|f| f.to_path_buf()).unwrap_or("/".into()); let parent = path.parent().map(|f| f.to_path_buf()).unwrap_or("/".into());
let _ = explorer.set_cwd(parent); let _ = explorer.set_cwd(parent);
} else {
let parent = path.parent().map(|f| f.to_path_buf()).unwrap_or("/".into());
let _ = explorer.set_cwd(parent);
};
let files = explorer.files(); let files = explorer.files();
let mut idx = None; let mut idx = None;
for (i, f) in files.iter().enumerate() { for (i, f) in files.iter().enumerate() {
@ -136,7 +137,6 @@ fn set_file_selection(path: &Path, explorer: &mut FileExplorer) {
break; break;
} }
} }
if let Some(idx) = idx { if let Some(idx) = idx {
explorer.set_selected_idx(idx); explorer.set_selected_idx(idx);
} }

View file

@ -62,6 +62,8 @@ impl JocalService {
return Err(LocalSendError::PeerNotFound); return Err(LocalSendError::PeerNotFound);
}; };
log::debug!("preparing upload request");
let request = self let request = self
.client .client
.post(format!( .post(format!(
@ -71,12 +73,12 @@ impl JocalService {
.json(&PrepareUploadRequest { .json(&PrepareUploadRequest {
info: self.config.device.clone(), info: self.config.device.clone(),
files: files.clone(), files: files.clone(),
}); })
.timeout(Duration::from_secs(30));
let r = request.timeout(Duration::from_secs(30)).build().unwrap(); debug!("sending '{request:?}' to peer at {addr:?}");
debug!("sending '{r:?}' to peer at {addr:?}");
let response = self.client.execute(r).await?; let response = request.send().await?;
debug!("Response: {response:?}"); debug!("Response: {response:?}");
@ -230,12 +232,15 @@ impl JocalService {
} }
} }
pub async fn prepare_upload( pub async fn handle_prepare_upload(
State(service): State<JocalService>, State(service): State<JocalService>,
ConnectInfo(addr): ConnectInfo<SocketAddr>, ConnectInfo(addr): ConnectInfo<SocketAddr>,
Json(req): Json<PrepareUploadRequest>, Json(req): Json<PrepareUploadRequest>,
) -> impl IntoResponse { ) -> impl IntoResponse {
info!("Received upload request from alias: {}", req.info.alias); info!(
"Received upload request from {} at {addr:?}",
req.info.alias
);
let id = Julid::new(); let id = Julid::new();
let (tx, mut rx) = unbounded_channel(); let (tx, mut rx) = unbounded_channel();
@ -300,7 +305,7 @@ pub async fn prepare_upload(
.into_response() .into_response()
} }
pub async fn receive_upload( pub async fn handle_receive_upload(
Query(params): Query<UploadParams>, Query(params): Query<UploadParams>,
State(service): State<JocalService>, State(service): State<JocalService>,
body: Bytes, body: Bytes,