do blocking requests in spawned task

This commit is contained in:
Joe Ardent 2025-08-14 16:25:32 -07:00
parent 56bc8e2fcb
commit 7eece474a3
6 changed files with 180 additions and 178 deletions

View file

@ -2,14 +2,11 @@ use std::{
collections::BTreeMap, collections::BTreeMap,
net::SocketAddr, net::SocketAddr,
path::{Path, PathBuf}, path::{Path, PathBuf},
time::Duration,
}; };
use crossterm::event::{Event, EventStream, KeyCode, KeyEvent, KeyEventKind}; use crossterm::event::{Event, EventStream, KeyCode, KeyEvent, KeyEventKind};
use futures::{FutureExt, StreamExt}; use futures::{FutureExt, StreamExt};
use jocalsend::{ use jocalsend::{JocalEvent, JocalService, ReceiveDialog, ReceiveRequest, error::Result};
DEFAULT_INTERVAL, JocalEvent, JocalService, ReceiveDialog, ReceiveRequest, error::Result,
};
use julid::Julid; use julid::Julid;
use log::{LevelFilter, debug, error, warn}; use log::{LevelFilter, debug, error, warn};
use ratatui::{ use ratatui::{
@ -117,16 +114,18 @@ impl App {
} }
transfer_event = self.event_listener.recv().fuse() => { transfer_event = self.event_listener.recv().fuse() => {
if let Some(event) = transfer_event { if let Some(event) = transfer_event {
debug!("got transferr event {event:?}"); log::trace!("got JocalEvent {event:?}");
match event { match event {
JocalEvent::ReceiveRequest { id, request } => { JocalEvent::ReceiveRequest { id, request } => {
self.receive_requests.insert(id, request); self.receive_requests.insert(id, request);
} }
JocalEvent::Cancelled(id) | JocalEvent::ReceivedInbound(id) => { JocalEvent::Cancelled { session_id: id } | JocalEvent::ReceivedInbound(id) => {
self.receive_requests.remove(&id); self.receive_requests.remove(&id);
} }
JocalEvent::SendApproved(_id) => todo!(), JocalEvent::SendApproved(id) => log::info!("remote recipient approved outbound transfer {id}"),
JocalEvent::SendDenied => todo!(), JocalEvent::SendDenied => log::warn!("outbound transfer request has been denied"),
JocalEvent::SendSuccess { content, session: _session } => log::info!("successfully sent {content}"),
JocalEvent::SendFailed { error } => log::error!("could not send content: {error}"),
JocalEvent::Tick => {} JocalEvent::Tick => {}
} }
} }

View file

@ -6,8 +6,10 @@ use std::{
use axum::{ use axum::{
Json, Json,
extract::{ConnectInfo, State}, extract::{ConnectInfo, State},
response::IntoResponse,
}; };
use log::{debug, error, trace, warn}; use log::{debug, error, trace, warn};
use reqwest::StatusCode;
use tokio::net::UdpSocket; use tokio::net::UdpSocket;
use crate::{Config, DEFAULT_INTERVAL, JocalService, RunningState, models::Device}; use crate::{Config, DEFAULT_INTERVAL, JocalService, RunningState, models::Device};
@ -61,7 +63,7 @@ impl JocalService {
async fn process_device(&self, message: &str, src: SocketAddr, config: &Config) { async fn process_device(&self, message: &str, src: SocketAddr, config: &Config) {
if let Ok(device) = serde_json::from_str::<Device>(message) { if let Ok(device) = serde_json::from_str::<Device>(message) {
if device.fingerprint == self.config.device.fingerprint { if device == self.config.device {
return; return;
} }
@ -98,7 +100,10 @@ pub async fn register_device(
State(service): State<JocalService>, State(service): State<JocalService>,
ConnectInfo(addr): ConnectInfo<SocketAddr>, ConnectInfo(addr): ConnectInfo<SocketAddr>,
Json(device): Json<Device>, Json(device): Json<Device>,
) -> Json<Device> { ) -> impl IntoResponse {
if device == service.config.device {
return StatusCode::ALREADY_REPORTED.into_response();
}
let mut addr = addr; let mut addr = addr;
addr.set_port(service.config.device.port); addr.set_port(service.config.device.port);
service service
@ -106,7 +111,7 @@ pub async fn register_device(
.lock() .lock()
.await .await
.insert(device.fingerprint.clone(), (addr, device.clone())); .insert(device.fingerprint.clone(), (addr, device.clone()));
Json(device) Json(device).into_response()
} }
//-************************************************************************ //-************************************************************************

View file

@ -9,6 +9,7 @@ use std::{
collections::BTreeMap, collections::BTreeMap,
fmt::Debug, fmt::Debug,
net::{Ipv4Addr, SocketAddr, SocketAddrV4}, net::{Ipv4Addr, SocketAddr, SocketAddrV4},
path::PathBuf,
sync::{Arc, OnceLock}, sync::{Arc, OnceLock},
time::Duration, time::Duration,
}; };
@ -50,12 +51,20 @@ pub enum ReceiveDialog {
Deny, Deny,
} }
#[derive(Debug)]
pub enum SendingType {
File(PathBuf),
Text(String),
}
#[derive(Debug, Clone, PartialEq, Eq)] #[derive(Debug, Clone, PartialEq, Eq)]
pub enum JocalEvent { pub enum JocalEvent {
ReceivedInbound(Julid), ReceivedInbound(Julid),
SendApproved(String), SendApproved(String),
SendDenied, SendDenied,
Cancelled(Julid), SendSuccess { content: String, session: String },
SendFailed { error: String },
Cancelled { session_id: Julid },
ReceiveRequest { id: Julid, request: ReceiveRequest }, ReceiveRequest { id: Julid, request: ReceiveRequest },
Tick, Tick,
} }
@ -93,7 +102,7 @@ pub struct JocalService {
pub socket: Arc<UdpSocket>, pub socket: Arc<UdpSocket>,
pub client: reqwest::Client, pub client: reqwest::Client,
pub config: Config, pub config: Config,
http_handle: OnceLock<axum_server::Handle>, pub http_handle: Arc<OnceLock<axum_server::Handle>>,
// the receiving end will be held by the application so it can update the UI based on backend // the receiving end will be held by the application so it can update the UI based on backend
// events // events
transfer_event_tx: UnboundedSender<JocalEvent>, transfer_event_tx: UnboundedSender<JocalEvent>,

View file

@ -55,28 +55,20 @@ async fn start_and_run(terminal: &mut DefaultTerminal, config: Config) -> Result
let mut handles = JoinSet::new(); let mut handles = JoinSet::new();
app.service.start(&mut handles).await; app.service.start(&mut handles).await;
let mut tick = tokio::time::interval(DEFAULT_INTERVAL);
let shutdown = shutdown(&mut handles); let shutdown = shutdown(&mut handles);
let mut shutdown = std::pin::pin!(shutdown); let mut shutdown = std::pin::pin!(shutdown);
loop { loop {
terminal.draw(|frame| app.draw(frame))?; terminal.draw(|frame| app.draw(frame))?;
tokio::select! {
res = app.handle_events() => {
res?;
}
_ = tick.tick() => {}
}
if app.screen() == CurrentScreen::Stopping { if app.screen() == CurrentScreen::Stopping {
log::info!("shutting down");
tokio::select! { tokio::select! {
_ = shutdown.as_mut() => { _ = shutdown.as_mut() => {
break; break;
} }
_ = tick.tick() => {} _ = tokio::time::sleep(DEFAULT_INTERVAL) => {}
}
} }
} else {
app.handle_events().await?;
let peers = app.service.peers.lock().await; let peers = app.service.peers.lock().await;
app.peers.clear(); app.peers.clear();
@ -101,6 +93,7 @@ async fn start_and_run(terminal: &mut DefaultTerminal, config: Config) -> Result
app.receive_requests.remove(&id); app.receive_requests.remove(&id);
} }
} }
}
Ok(()) Ok(())
} }

View file

@ -81,7 +81,7 @@ impl FileMetadata {
} }
} }
#[derive(Debug, Serialize, Deserialize, Clone)] #[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
#[serde(rename_all = "lowercase")] #[serde(rename_all = "lowercase")]
pub enum DeviceType { pub enum DeviceType {
Mobile, Mobile,
@ -110,6 +110,14 @@ pub struct Device {
pub announce: Option<bool>, pub announce: Option<bool>,
} }
impl PartialEq for Device {
fn eq(&self, other: &Self) -> bool {
self.fingerprint == other.fingerprint
}
}
impl Eq for Device {}
#[derive(Debug, Serialize, Deserialize, Clone)] #[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(rename_all = "lowercase")] #[serde(rename_all = "lowercase")]
pub enum Protocol { pub enum Protocol {

View file

@ -9,11 +9,12 @@ use axum::{
}; };
use julid::Julid; use julid::Julid;
use log::{debug, error, info, warn}; use log::{debug, error, info, warn};
use reqwest::Client;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use tokio::sync::mpsc::{UnboundedSender, unbounded_channel}; use tokio::sync::mpsc::{UnboundedSender, unbounded_channel};
use crate::{ use crate::{
JocalEvent, JocalService, Peers, ReceiveDialog, ReceiveRequest, Sessions, JocalEvent, JocalService, Peers, ReceiveDialog, ReceiveRequest, SendingType, Sessions,
error::{LocalSendError, Result}, error::{LocalSendError, Result},
models::{Device, FileMetadata}, models::{Device, FileMetadata},
}; };
@ -53,112 +54,14 @@ pub struct PrepareUploadRequest {
} }
impl JocalService { impl JocalService {
pub async fn prepare_upload(
&self,
peer: &str,
files: BTreeMap<String, FileMetadata>,
) -> Result<PrepareUploadResponse> {
let Some((addr, device)) = self.peers.lock().await.get(peer).cloned() else {
return Err(LocalSendError::PeerNotFound);
};
log::debug!("preparing upload request");
let request = self
.client
.post(format!(
"{}://{}/api/localsend/v2/prepare-upload",
device.protocol, addr
))
.json(&PrepareUploadRequest {
info: self.config.device.clone(),
files: files.clone(),
})
.timeout(Duration::from_secs(30));
debug!("sending '{request:?}' to peer at {addr:?}");
let response = request.send().await?;
debug!("Response: {response:?}");
let response: PrepareUploadResponse = match response.json().await {
Err(e) => {
error!("got error deserializing response: {e:?}");
return Err(LocalSendError::RequestError(e));
}
Ok(r) => r,
};
debug!("decoded response: {response:?}");
let session = Session {
session_id: response.session_id.clone(),
files,
file_tokens: response.files.clone(),
receiver: device,
sender: self.config.device.clone(),
status: SessionStatus::Active,
addr,
};
self.sessions
.lock()
.await
.insert(response.session_id.clone(), session);
Ok(response)
}
pub async fn send_file(&self, peer: &str, file_path: PathBuf) -> Result<()> { pub async fn send_file(&self, peer: &str, file_path: PathBuf) -> Result<()> {
let file_metadata = FileMetadata::from_path(&file_path)?; let content = SendingType::File(file_path);
let mut files = BTreeMap::new(); self.send_content(peer, content).await
files.insert(file_metadata.id.clone(), file_metadata.clone());
let prepare_response = self.prepare_upload(peer, files).await?;
let token = prepare_response
.files
.get(&file_metadata.id)
.ok_or(LocalSendError::InvalidToken)?;
let file_contents = tokio::fs::read(&file_path).await?;
let bytes = Bytes::from(file_contents);
self.send_bytes(
&prepare_response.session_id,
&file_metadata.id,
token,
bytes,
)
.await?;
Ok(())
} }
pub async fn send_text(&self, peer: &str, text: &str) -> Result<()> { pub async fn send_text(&self, peer: &str, text: &str) -> Result<()> {
let file_metadata = FileMetadata::from_text(text)?; let content = SendingType::Text(text.to_owned());
let mut files = BTreeMap::new(); self.send_content(peer, content).await
files.insert(file_metadata.id.clone(), file_metadata.clone());
let prepare_response = self.prepare_upload(peer, files).await?;
let token = prepare_response
.files
.get(&file_metadata.id)
.ok_or(LocalSendError::InvalidToken)?;
let bytes = Bytes::from(text.to_owned());
self.send_bytes(
&prepare_response.session_id,
&file_metadata.id,
token,
bytes,
)
.await?;
Ok(())
} }
pub async fn cancel_upload(&self, session_id: &str) -> Result<()> { pub async fn cancel_upload(&self, session_id: &str) -> Result<()> {
@ -183,38 +86,86 @@ impl JocalService {
Ok(()) Ok(())
} }
async fn send_bytes( // spawns a tokio task to wait for responses
&self, async fn send_content(&self, peer: &str, content: SendingType) -> Result<()> {
session_id: &str, let (metadata, bytes) = match content {
content_id: &str, SendingType::File(path) => {
token: &String, let contents = tokio::fs::read(&path).await?;
body: Bytes, let bytes = Bytes::from(contents);
) -> Result<()> { (FileMetadata::from_path(&path)?, bytes)
let sessions = self.sessions.lock().await; }
let session = sessions.get(session_id).unwrap(); SendingType::Text(text) => (FileMetadata::from_text(&text)?, Bytes::from(text)),
};
if session.status != SessionStatus::Active { let mut files = BTreeMap::new();
return Err(LocalSendError::SessionInactive); files.insert(metadata.id.clone(), metadata.clone());
let ourself = self.config.device.clone();
let client = self.client.clone();
let tx = self.transfer_event_tx.clone();
let peer = peer.to_string();
let sessions = self.sessions.clone();
let peers = self.peers.clone();
tokio::task::spawn(async move {
fn send_tx(msg: JocalEvent, tx: &UnboundedSender<JocalEvent>) {
if let Err(e) = tx.send(msg.clone()) {
log::error!("got error sending {msg:?} to frontend: {e:?}");
}
} }
if session.file_tokens.get(content_id) != Some(token) { let prepare_response =
return Err(LocalSendError::InvalidToken); do_prepare_upload(ourself, &client, &peer, &peers, &sessions, files).await;
let prepare_response = match prepare_response {
Ok(r) => r,
Err(e) => {
log::debug!("got error from remote receiver: {e:?}");
send_tx(JocalEvent::SendDenied, &tx);
return;
} }
};
let request = self.client send_tx(JocalEvent::SendApproved(metadata.id.clone()), &tx);
.post(format!(
"{}://{}/api/localsend/v2/upload?sessionId={session_id}&fileId={content_id}&token={token}",
session.receiver.protocol, session.addr))
.body(body).build()?;
debug!("Uploading bytes: {request:?}"); let token = match prepare_response.files.get(&metadata.id) {
let response = self.client.execute(request).await?; Some(t) => t,
None => {
if response.status() != 200 { log::warn!("");
warn!("Upload failed: {response:?}"); send_tx(
return Err(LocalSendError::UploadFailed); JocalEvent::SendFailed {
error: "missing token in prepare response from remote".into(),
},
&tx,
);
return;
} }
};
let content_id = &metadata.id;
let session_id = prepare_response.session_id;
let resp = do_send_bytes(sessions, client, &session_id, content_id, token, bytes).await;
match resp {
Ok(_) => {
send_tx(
JocalEvent::SendSuccess {
content: content_id.to_owned(),
session: session_id,
},
&tx,
);
}
Err(e) => {
send_tx(
JocalEvent::SendFailed {
error: format!("{e:?}"),
},
&tx,
);
}
}
});
Ok(()) Ok(())
} }
} }
@ -384,7 +335,7 @@ pub async fn handle_cancel(
session.status = SessionStatus::Cancelled; session.status = SessionStatus::Cancelled;
if let Ok(id) = Julid::from_str(&params.session_id) { if let Ok(id) = Julid::from_str(&params.session_id) {
service.send_event(JocalEvent::Cancelled(id)); service.send_event(JocalEvent::Cancelled { session_id: id });
}; };
StatusCode::OK.into_response() StatusCode::OK.into_response()
@ -397,13 +348,50 @@ pub struct CancelParams {
session_id: String, session_id: String,
} }
// free function that can be called inside a future in tokio::task::spawn()
async fn do_send_bytes(
sessions: Sessions,
client: Client,
session_id: &str,
content_id: &str,
token: &String,
body: Bytes,
) -> Result<()> {
let sessions = sessions.lock().await;
let session = sessions.get(session_id).unwrap();
if session.status != SessionStatus::Active {
return Err(LocalSendError::SessionInactive);
}
if session.file_tokens.get(content_id) != Some(token) {
return Err(LocalSendError::InvalidToken);
}
let request = client
.post(format!(
"{}://{}/api/localsend/v2/upload?sessionId={session_id}&fileId={content_id}&token={token}",
session.receiver.protocol, session.addr))
.body(body);
debug!("Uploading bytes: {request:?}");
let response = request.send().await?;
if response.status() != 200 {
log::trace!("non-200 remote response: {response:?}");
return Err(LocalSendError::UploadFailed);
}
Ok(())
}
// free function that can be called inside a future in tokio::task::spawn()
async fn do_prepare_upload( async fn do_prepare_upload(
ourself: Device, ourself: Device,
client: reqwest::Client, client: &reqwest::Client,
tx: UnboundedSender<JocalEvent>,
peer: &str, peer: &str,
peers: Peers, peers: &Peers,
sessions: Sessions, sessions: &Sessions,
files: BTreeMap<String, FileMetadata>, files: BTreeMap<String, FileMetadata>,
) -> Result<PrepareUploadResponse> { ) -> Result<PrepareUploadResponse> {
let Some((addr, device)) = peers.lock().await.get(peer).cloned() else { let Some((addr, device)) = peers.lock().await.get(peer).cloned() else {