From 7eece474a326091c8d1ac354908aa2212a9ba024 Mon Sep 17 00:00:00 2001 From: Joe Ardent Date: Thu, 14 Aug 2025 16:25:32 -0700 Subject: [PATCH] do blocking requests in spawned task --- src/app/mod.rs | 15 ++- src/discovery.rs | 11 +- src/lib.rs | 13 ++- src/main.rs | 53 +++++----- src/models.rs | 10 +- src/transfer.rs | 256 ++++++++++++++++++++++------------------------- 6 files changed, 180 insertions(+), 178 deletions(-) diff --git a/src/app/mod.rs b/src/app/mod.rs index 2241dfa..4ba9afb 100644 --- a/src/app/mod.rs +++ b/src/app/mod.rs @@ -2,14 +2,11 @@ use std::{ collections::BTreeMap, net::SocketAddr, path::{Path, PathBuf}, - time::Duration, }; use crossterm::event::{Event, EventStream, KeyCode, KeyEvent, KeyEventKind}; use futures::{FutureExt, StreamExt}; -use jocalsend::{ - DEFAULT_INTERVAL, JocalEvent, JocalService, ReceiveDialog, ReceiveRequest, error::Result, -}; +use jocalsend::{JocalEvent, JocalService, ReceiveDialog, ReceiveRequest, error::Result}; use julid::Julid; use log::{LevelFilter, debug, error, warn}; use ratatui::{ @@ -117,16 +114,18 @@ impl App { } transfer_event = self.event_listener.recv().fuse() => { if let Some(event) = transfer_event { - debug!("got transferr event {event:?}"); + log::trace!("got JocalEvent {event:?}"); match event { JocalEvent::ReceiveRequest { id, request } => { self.receive_requests.insert(id, request); } - JocalEvent::Cancelled(id) | JocalEvent::ReceivedInbound(id) => { + JocalEvent::Cancelled { session_id: id } | JocalEvent::ReceivedInbound(id) => { self.receive_requests.remove(&id); } - JocalEvent::SendApproved(_id) => todo!(), - JocalEvent::SendDenied => todo!(), + JocalEvent::SendApproved(id) => log::info!("remote recipient approved outbound transfer {id}"), + JocalEvent::SendDenied => log::warn!("outbound transfer request has been denied"), + JocalEvent::SendSuccess { content, session: _session } => log::info!("successfully sent {content}"), + JocalEvent::SendFailed { error } => log::error!("could not send content: {error}"), JocalEvent::Tick => {} } } diff --git a/src/discovery.rs b/src/discovery.rs index 08c327c..9d6872c 100644 --- a/src/discovery.rs +++ b/src/discovery.rs @@ -6,8 +6,10 @@ use std::{ use axum::{ Json, extract::{ConnectInfo, State}, + response::IntoResponse, }; use log::{debug, error, trace, warn}; +use reqwest::StatusCode; use tokio::net::UdpSocket; use crate::{Config, DEFAULT_INTERVAL, JocalService, RunningState, models::Device}; @@ -61,7 +63,7 @@ impl JocalService { async fn process_device(&self, message: &str, src: SocketAddr, config: &Config) { if let Ok(device) = serde_json::from_str::(message) { - if device.fingerprint == self.config.device.fingerprint { + if device == self.config.device { return; } @@ -98,7 +100,10 @@ pub async fn register_device( State(service): State, ConnectInfo(addr): ConnectInfo, Json(device): Json, -) -> Json { +) -> impl IntoResponse { + if device == service.config.device { + return StatusCode::ALREADY_REPORTED.into_response(); + } let mut addr = addr; addr.set_port(service.config.device.port); service @@ -106,7 +111,7 @@ pub async fn register_device( .lock() .await .insert(device.fingerprint.clone(), (addr, device.clone())); - Json(device) + Json(device).into_response() } //-************************************************************************ diff --git a/src/lib.rs b/src/lib.rs index 4673e38..1d5559a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -9,6 +9,7 @@ use std::{ collections::BTreeMap, fmt::Debug, net::{Ipv4Addr, SocketAddr, SocketAddrV4}, + path::PathBuf, sync::{Arc, OnceLock}, time::Duration, }; @@ -50,12 +51,20 @@ pub enum ReceiveDialog { Deny, } +#[derive(Debug)] +pub enum SendingType { + File(PathBuf), + Text(String), +} + #[derive(Debug, Clone, PartialEq, Eq)] pub enum JocalEvent { ReceivedInbound(Julid), SendApproved(String), SendDenied, - Cancelled(Julid), + SendSuccess { content: String, session: String }, + SendFailed { error: String }, + Cancelled { session_id: Julid }, ReceiveRequest { id: Julid, request: ReceiveRequest }, Tick, } @@ -93,7 +102,7 @@ pub struct JocalService { pub socket: Arc, pub client: reqwest::Client, pub config: Config, - http_handle: OnceLock, + pub http_handle: Arc>, // the receiving end will be held by the application so it can update the UI based on backend // events transfer_event_tx: UnboundedSender, diff --git a/src/main.rs b/src/main.rs index 2ee9dcc..a1c3f56 100644 --- a/src/main.rs +++ b/src/main.rs @@ -55,50 +55,43 @@ async fn start_and_run(terminal: &mut DefaultTerminal, config: Config) -> Result let mut handles = JoinSet::new(); app.service.start(&mut handles).await; - let mut tick = tokio::time::interval(DEFAULT_INTERVAL); let shutdown = shutdown(&mut handles); let mut shutdown = std::pin::pin!(shutdown); loop { terminal.draw(|frame| app.draw(frame))?; - tokio::select! { - res = app.handle_events() => { - res?; - } - _ = tick.tick() => {} - } - if app.screen() == CurrentScreen::Stopping { - log::info!("shutting down"); tokio::select! { _ = shutdown.as_mut() => { break; } - _ = tick.tick() => {} + _ = tokio::time::sleep(DEFAULT_INTERVAL) => {} } - } + } else { + app.handle_events().await?; - let peers = app.service.peers.lock().await; - app.peers.clear(); - peers.iter().for_each(|(fingerprint, (addr, device))| { - let alias = device.alias.clone(); - let peer = Peer { - alias, - fingerprint: fingerprint.to_owned(), - addr: addr.to_owned(), - }; - app.peers.push(peer); - }); + let peers = app.service.peers.lock().await; + app.peers.clear(); + peers.iter().for_each(|(fingerprint, (addr, device))| { + let alias = device.alias.clone(); + let peer = Peer { + alias, + fingerprint: fingerprint.to_owned(), + addr: addr.to_owned(), + }; + app.peers.push(peer); + }); - let mut stale_rx_requests = Vec::with_capacity(app.receive_requests.len()); - let now = chrono::Utc::now().timestamp_millis() as u64; - for (id, request) in app.receive_requests.iter() { - if request.tx.is_closed() || (now - id.timestamp()) > 60_000 { - stale_rx_requests.push(*id); + let mut stale_rx_requests = Vec::with_capacity(app.receive_requests.len()); + let now = chrono::Utc::now().timestamp_millis() as u64; + for (id, request) in app.receive_requests.iter() { + if request.tx.is_closed() || (now - id.timestamp()) > 60_000 { + stale_rx_requests.push(*id); + } + } + for id in stale_rx_requests { + app.receive_requests.remove(&id); } - } - for id in stale_rx_requests { - app.receive_requests.remove(&id); } } diff --git a/src/models.rs b/src/models.rs index 03a0b70..5106cf4 100644 --- a/src/models.rs +++ b/src/models.rs @@ -81,7 +81,7 @@ impl FileMetadata { } } -#[derive(Debug, Serialize, Deserialize, Clone)] +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] #[serde(rename_all = "lowercase")] pub enum DeviceType { Mobile, @@ -110,6 +110,14 @@ pub struct Device { pub announce: Option, } +impl PartialEq for Device { + fn eq(&self, other: &Self) -> bool { + self.fingerprint == other.fingerprint + } +} + +impl Eq for Device {} + #[derive(Debug, Serialize, Deserialize, Clone)] #[serde(rename_all = "lowercase")] pub enum Protocol { diff --git a/src/transfer.rs b/src/transfer.rs index 7cd4a8e..0cd62a1 100644 --- a/src/transfer.rs +++ b/src/transfer.rs @@ -9,11 +9,12 @@ use axum::{ }; use julid::Julid; use log::{debug, error, info, warn}; +use reqwest::Client; use serde::{Deserialize, Serialize}; use tokio::sync::mpsc::{UnboundedSender, unbounded_channel}; use crate::{ - JocalEvent, JocalService, Peers, ReceiveDialog, ReceiveRequest, Sessions, + JocalEvent, JocalService, Peers, ReceiveDialog, ReceiveRequest, SendingType, Sessions, error::{LocalSendError, Result}, models::{Device, FileMetadata}, }; @@ -53,112 +54,14 @@ pub struct PrepareUploadRequest { } impl JocalService { - pub async fn prepare_upload( - &self, - peer: &str, - files: BTreeMap, - ) -> Result { - let Some((addr, device)) = self.peers.lock().await.get(peer).cloned() else { - return Err(LocalSendError::PeerNotFound); - }; - - log::debug!("preparing upload request"); - - let request = self - .client - .post(format!( - "{}://{}/api/localsend/v2/prepare-upload", - device.protocol, addr - )) - .json(&PrepareUploadRequest { - info: self.config.device.clone(), - files: files.clone(), - }) - .timeout(Duration::from_secs(30)); - - debug!("sending '{request:?}' to peer at {addr:?}"); - - let response = request.send().await?; - - debug!("Response: {response:?}"); - - let response: PrepareUploadResponse = match response.json().await { - Err(e) => { - error!("got error deserializing response: {e:?}"); - return Err(LocalSendError::RequestError(e)); - } - Ok(r) => r, - }; - - debug!("decoded response: {response:?}"); - - let session = Session { - session_id: response.session_id.clone(), - files, - file_tokens: response.files.clone(), - receiver: device, - sender: self.config.device.clone(), - status: SessionStatus::Active, - addr, - }; - - self.sessions - .lock() - .await - .insert(response.session_id.clone(), session); - - Ok(response) - } - pub async fn send_file(&self, peer: &str, file_path: PathBuf) -> Result<()> { - let file_metadata = FileMetadata::from_path(&file_path)?; - let mut files = BTreeMap::new(); - files.insert(file_metadata.id.clone(), file_metadata.clone()); - - let prepare_response = self.prepare_upload(peer, files).await?; - - let token = prepare_response - .files - .get(&file_metadata.id) - .ok_or(LocalSendError::InvalidToken)?; - - let file_contents = tokio::fs::read(&file_path).await?; - let bytes = Bytes::from(file_contents); - - self.send_bytes( - &prepare_response.session_id, - &file_metadata.id, - token, - bytes, - ) - .await?; - - Ok(()) + let content = SendingType::File(file_path); + self.send_content(peer, content).await } pub async fn send_text(&self, peer: &str, text: &str) -> Result<()> { - let file_metadata = FileMetadata::from_text(text)?; - let mut files = BTreeMap::new(); - files.insert(file_metadata.id.clone(), file_metadata.clone()); - - let prepare_response = self.prepare_upload(peer, files).await?; - - let token = prepare_response - .files - .get(&file_metadata.id) - .ok_or(LocalSendError::InvalidToken)?; - - let bytes = Bytes::from(text.to_owned()); - - self.send_bytes( - &prepare_response.session_id, - &file_metadata.id, - token, - bytes, - ) - .await?; - - Ok(()) + let content = SendingType::Text(text.to_owned()); + self.send_content(peer, content).await } pub async fn cancel_upload(&self, session_id: &str) -> Result<()> { @@ -183,38 +86,86 @@ impl JocalService { Ok(()) } - async fn send_bytes( - &self, - session_id: &str, - content_id: &str, - token: &String, - body: Bytes, - ) -> Result<()> { - let sessions = self.sessions.lock().await; - let session = sessions.get(session_id).unwrap(); + // spawns a tokio task to wait for responses + async fn send_content(&self, peer: &str, content: SendingType) -> Result<()> { + let (metadata, bytes) = match content { + SendingType::File(path) => { + let contents = tokio::fs::read(&path).await?; + let bytes = Bytes::from(contents); + (FileMetadata::from_path(&path)?, bytes) + } + SendingType::Text(text) => (FileMetadata::from_text(&text)?, Bytes::from(text)), + }; - if session.status != SessionStatus::Active { - return Err(LocalSendError::SessionInactive); - } + let mut files = BTreeMap::new(); + files.insert(metadata.id.clone(), metadata.clone()); - if session.file_tokens.get(content_id) != Some(token) { - return Err(LocalSendError::InvalidToken); - } + let ourself = self.config.device.clone(); + let client = self.client.clone(); + let tx = self.transfer_event_tx.clone(); + let peer = peer.to_string(); + let sessions = self.sessions.clone(); + let peers = self.peers.clone(); - let request = self.client - .post(format!( - "{}://{}/api/localsend/v2/upload?sessionId={session_id}&fileId={content_id}&token={token}", - session.receiver.protocol, session.addr)) - .body(body).build()?; + tokio::task::spawn(async move { + fn send_tx(msg: JocalEvent, tx: &UnboundedSender) { + if let Err(e) = tx.send(msg.clone()) { + log::error!("got error sending {msg:?} to frontend: {e:?}"); + } + } - debug!("Uploading bytes: {request:?}"); - let response = self.client.execute(request).await?; + let prepare_response = + do_prepare_upload(ourself, &client, &peer, &peers, &sessions, files).await; - if response.status() != 200 { - warn!("Upload failed: {response:?}"); - return Err(LocalSendError::UploadFailed); - } + let prepare_response = match prepare_response { + Ok(r) => r, + Err(e) => { + log::debug!("got error from remote receiver: {e:?}"); + send_tx(JocalEvent::SendDenied, &tx); + return; + } + }; + send_tx(JocalEvent::SendApproved(metadata.id.clone()), &tx); + + let token = match prepare_response.files.get(&metadata.id) { + Some(t) => t, + None => { + log::warn!(""); + send_tx( + JocalEvent::SendFailed { + error: "missing token in prepare response from remote".into(), + }, + &tx, + ); + return; + } + }; + + let content_id = &metadata.id; + let session_id = prepare_response.session_id; + let resp = do_send_bytes(sessions, client, &session_id, content_id, token, bytes).await; + + match resp { + Ok(_) => { + send_tx( + JocalEvent::SendSuccess { + content: content_id.to_owned(), + session: session_id, + }, + &tx, + ); + } + Err(e) => { + send_tx( + JocalEvent::SendFailed { + error: format!("{e:?}"), + }, + &tx, + ); + } + } + }); Ok(()) } } @@ -384,7 +335,7 @@ pub async fn handle_cancel( session.status = SessionStatus::Cancelled; if let Ok(id) = Julid::from_str(¶ms.session_id) { - service.send_event(JocalEvent::Cancelled(id)); + service.send_event(JocalEvent::Cancelled { session_id: id }); }; StatusCode::OK.into_response() @@ -397,13 +348,50 @@ pub struct CancelParams { session_id: String, } +// free function that can be called inside a future in tokio::task::spawn() +async fn do_send_bytes( + sessions: Sessions, + client: Client, + session_id: &str, + content_id: &str, + token: &String, + body: Bytes, +) -> Result<()> { + let sessions = sessions.lock().await; + let session = sessions.get(session_id).unwrap(); + + if session.status != SessionStatus::Active { + return Err(LocalSendError::SessionInactive); + } + + if session.file_tokens.get(content_id) != Some(token) { + return Err(LocalSendError::InvalidToken); + } + + let request = client + .post(format!( + "{}://{}/api/localsend/v2/upload?sessionId={session_id}&fileId={content_id}&token={token}", + session.receiver.protocol, session.addr)) + .body(body); + + debug!("Uploading bytes: {request:?}"); + let response = request.send().await?; + + if response.status() != 200 { + log::trace!("non-200 remote response: {response:?}"); + return Err(LocalSendError::UploadFailed); + } + + Ok(()) +} + +// free function that can be called inside a future in tokio::task::spawn() async fn do_prepare_upload( ourself: Device, - client: reqwest::Client, - tx: UnboundedSender, + client: &reqwest::Client, peer: &str, - peers: Peers, - sessions: Sessions, + peers: &Peers, + sessions: &Sessions, files: BTreeMap, ) -> Result { let Some((addr, device)) = peers.lock().await.get(peer).cloned() else {