cleanup and renames ('state' to 'service')

This commit is contained in:
Joe Ardent 2025-08-03 13:46:03 -07:00
parent c8621da2f0
commit b486b33fa5
7 changed files with 56 additions and 54 deletions

View file

@ -1,11 +1,13 @@
use std::{collections::BTreeMap, net::SocketAddr, time::Duration}; use std::{collections::BTreeMap, net::SocketAddr, time::Duration};
use axum::body::Bytes;
use crossterm::event::{Event, EventStream, KeyCode, KeyEvent, KeyEventKind}; use crossterm::event::{Event, EventStream, KeyCode, KeyEvent, KeyEventKind};
use futures::{FutureExt, StreamExt}; use futures::{FutureExt, StreamExt};
use joecalsend::{JoecalService, ReceiveDialog, ReceiveRequest, TransferEvent, error::Result}; use joecalsend::{JoecalService, ReceiveDialog, ReceiveRequest, TransferEvent, error::Result};
use julid::Julid; use julid::Julid;
use log::{LevelFilter, debug, error, warn}; use log::{LevelFilter, debug, error, warn};
use ratatui::{Frame, widgets::TableState}; use ratatui::{Frame, widgets::TableState};
use ratatui_explorer::FileExplorer;
use tokio::sync::mpsc::UnboundedReceiver; use tokio::sync::mpsc::UnboundedReceiver;
pub mod widgets; pub mod widgets;
@ -19,10 +21,12 @@ pub struct App {
// addr -> (alias, fingerprint) // addr -> (alias, fingerprint)
pub peers: Peers, pub peers: Peers,
pub receive_requests: BTreeMap<Julid, ReceiveRequest>, pub receive_requests: BTreeMap<Julid, ReceiveRequest>,
upload_state: TableState, receiving_state: TableState,
// for getting messages back from the web server or web client about things we've done; the // for getting messages back from the web server or web client about things we've done; the
// other end is held by the service // other end is held by the service
event_listener: UnboundedReceiver<TransferEvent>, event_listener: UnboundedReceiver<TransferEvent>,
file_picker: FileExplorer,
content: Option<Bytes>,
} }
#[derive(Debug, Clone, Copy, PartialEq, Eq)] #[derive(Debug, Clone, Copy, PartialEq, Eq)]
@ -46,10 +50,12 @@ impl App {
service, service,
event_listener, event_listener,
screen: vec![CurrentScreen::Main], screen: vec![CurrentScreen::Main],
file_picker: FileExplorer::new().expect("could not create file explorer"),
content: None,
events: Default::default(), events: Default::default(),
peers: Default::default(), peers: Default::default(),
receive_requests: Default::default(), receive_requests: Default::default(),
upload_state: Default::default(), receiving_state: Default::default(),
} }
} }
@ -60,7 +66,7 @@ impl App {
match evt { match evt {
Event::Key(key) Event::Key(key)
if key.kind == KeyEventKind::Press if key.kind == KeyEventKind::Press
=> self.handle_key_event(key), => self.handle_key_event(key, evt),
Event::Mouse(_) => {} Event::Mouse(_) => {}
Event::Resize(_, _) => {} Event::Resize(_, _) => {}
_ => {} _ => {}
@ -86,7 +92,7 @@ impl App {
Ok(()) Ok(())
} }
pub fn handle_key_event(&mut self, key_event: KeyEvent) { pub fn handle_key_event(&mut self, key_event: KeyEvent, event: crossterm::event::Event) {
match self.screen.last_mut().unwrap() { match self.screen.last_mut().unwrap() {
CurrentScreen::Logging => match key_event.code { CurrentScreen::Logging => match key_event.code {
KeyCode::Esc => self.pop(), KeyCode::Esc => self.pop(),
@ -96,8 +102,8 @@ impl App {
_ => {} _ => {}
}, },
CurrentScreen::Receiving => match key_event.code { CurrentScreen::Receiving => match key_event.code {
KeyCode::Up => self.upload_state.select_previous(), KeyCode::Up => self.receiving_state.select_previous(),
KeyCode::Down => self.upload_state.select_next(), KeyCode::Down => self.receiving_state.select_next(),
KeyCode::Char('a') => self.accept(), KeyCode::Char('a') => self.accept(),
KeyCode::Char('d') => self.deny(), KeyCode::Char('d') => self.deny(),
KeyCode::Esc => self.pop(), KeyCode::Esc => self.pop(),
@ -110,7 +116,7 @@ impl App {
KeyCode::Char('q') => self.exit(), KeyCode::Char('q') => self.exit(),
KeyCode::Tab => *s = SendingScreen::Peers, KeyCode::Tab => *s = SendingScreen::Peers,
KeyCode::Enter => todo!("send the selected file or enter directory"), KeyCode::Enter => todo!("send the selected file or enter directory"),
_ => todo!("have the file picker handle it"), _ => self.file_picker.handle(&event).unwrap_or_default(),
}, },
SendingScreen::Peers => match key_event.code { SendingScreen::Peers => match key_event.code {
KeyCode::Esc => self.pop(), KeyCode::Esc => self.pop(),
@ -132,7 +138,7 @@ impl App {
} }
pub fn accept(&mut self) { pub fn accept(&mut self) {
let Some(idx) = self.upload_state.selected() else { let Some(idx) = self.receiving_state.selected() else {
return; return;
}; };
// keys are sorted, so we can use the table selection index // keys are sorted, so we can use the table selection index
@ -150,7 +156,7 @@ impl App {
} }
pub fn deny(&mut self) { pub fn deny(&mut self) {
let Some(idx) = self.upload_state.selected() else { let Some(idx) = self.receiving_state.selected() else {
return; return;
}; };
// keys are sorted, so we can use the table selection index // keys are sorted, so we can use the table selection index

View file

@ -126,7 +126,7 @@ impl Widget for &mut App {
NetworkInfoWidget.render(footer_left.inner(footer_margin), buf); NetworkInfoWidget.render(footer_left.inner(footer_margin), buf);
receive_requests( receive_requests(
&rx_reqs, &rx_reqs,
&mut self.upload_state, &mut self.receiving_state,
header_left.inner(header_margin), header_left.inner(header_margin),
buf, buf,
); );
@ -140,7 +140,7 @@ impl Widget for &mut App {
outer_frame(*current_screen, &CONTENT_RECEIVE_MENU, area, buf); outer_frame(*current_screen, &CONTENT_RECEIVE_MENU, area, buf);
receive_requests( receive_requests(
&rx_reqs, &rx_reqs,
&mut self.upload_state, &mut self.receiving_state,
area.inner(subscreen_margin), area.inner(subscreen_margin),
buf, buf,
); );

View file

@ -14,18 +14,19 @@ use tokio::net::UdpSocket;
use crate::{Config, JoecalService, RunningState, models::Device}; use crate::{Config, JoecalService, RunningState, models::Device};
impl JoecalService { impl JoecalService {
pub async fn announce( pub async fn announce(&self, socket: Option<SocketAddr>) -> crate::error::Result<()> {
&self,
socket: Option<SocketAddr>,
config: &Config,
) -> crate::error::Result<()> {
trace!("announcing"); trace!("announcing");
announce_http(&self.device, socket, self.client.clone()).await?; announce_http(&self.device, socket, self.client.clone()).await?;
announce_multicast(&self.device, config.multicast_addr, self.socket.clone()).await?; announce_multicast(
&self.device,
self.config.multicast_addr,
self.socket.clone(),
)
.await?;
Ok(()) Ok(())
} }
pub async fn listen_multicast(&self, config: &Config) -> crate::error::Result<()> { pub async fn listen_multicast(&self) -> crate::error::Result<()> {
let mut buf = [0; 65536]; let mut buf = [0; 65536];
let mut timeout = tokio::time::interval(Duration::from_secs(5)); let mut timeout = tokio::time::interval(Duration::from_secs(5));
@ -48,7 +49,7 @@ impl JoecalService {
match r { match r {
Ok((size, src)) => { Ok((size, src)) => {
let received_msg = String::from_utf8_lossy(&buf[..size]); let received_msg = String::from_utf8_lossy(&buf[..size]);
self.process_device(&received_msg, src, config).await; self.process_device(&received_msg, src, &self.config).await;
} }
Err(e) => { Err(e) => {
error!("Error receiving message: {e}"); error!("Error receiving message: {e}");

View file

@ -1,7 +1,7 @@
use std::net::SocketAddr; use std::net::SocketAddr;
use axum::{ use axum::{
Extension, Json, Router, Json, Router,
extract::DefaultBodyLimit, extract::DefaultBodyLimit,
routing::{get, post}, routing::{get, post},
}; };
@ -9,20 +9,16 @@ use tokio::{net::TcpListener, sync::mpsc};
use tower_http::limit::RequestBodyLimitLayer; use tower_http::limit::RequestBodyLimitLayer;
use crate::{ use crate::{
Config, JoecalService, JoecalService,
discovery::register_device, discovery::register_device,
transfer::{register_prepare_upload, register_upload}, transfer::{prepare_upload, receive_upload},
}; };
impl JoecalService { impl JoecalService {
pub async fn start_http_server( pub async fn start_http_server(&self, stop_rx: mpsc::Receiver<()>) -> crate::error::Result<()> {
&self, let app = self.create_router();
stop_rx: mpsc::Receiver<()>,
config: &Config,
) -> crate::error::Result<()> {
let app = self.create_router(config);
// TODO: make addr config // TODO: make addr config
let addr = SocketAddr::from(([0, 0, 0, 0], config.port)); let addr = SocketAddr::from(([0, 0, 0, 0], self.config.port));
let listener = TcpListener::bind(&addr).await?; let listener = TcpListener::bind(&addr).await?;
axum::serve( axum::serve(
@ -34,7 +30,7 @@ impl JoecalService {
Ok(()) Ok(())
} }
fn create_router(&self, config: &Config) -> Router { fn create_router(&self) -> Router {
let device = self.device.clone(); let device = self.device.clone();
Router::new() Router::new()
.route("/api/localsend/v2/register", post(register_device)) .route("/api/localsend/v2/register", post(register_device))
@ -42,14 +38,10 @@ impl JoecalService {
"/api/localsend/v2/info", "/api/localsend/v2/info",
get(move || async move { Json(device) }), get(move || async move { Json(device) }),
) )
.route( .route("/api/localsend/v2/prepare-upload", post(prepare_upload))
"/api/localsend/v2/prepare-upload", .route("/api/localsend/v2/upload", post(receive_upload))
post(register_prepare_upload),
)
.route("/api/localsend/v2/upload", post(register_upload))
.layer(DefaultBodyLimit::disable()) .layer(DefaultBodyLimit::disable())
.layer(RequestBodyLimitLayer::new(1024 * 1024 * 1024)) .layer(RequestBodyLimitLayer::new(1024 * 1024 * 1024))
.layer(Extension(config.download_dir.clone()))
.with_state(self.clone()) .with_state(self.clone())
} }
} }

View file

@ -67,6 +67,7 @@ pub struct JoecalService {
pub running_state: Arc<Mutex<RunningState>>, pub running_state: Arc<Mutex<RunningState>>,
pub socket: Arc<UdpSocket>, pub socket: Arc<UdpSocket>,
pub client: reqwest::Client, pub client: reqwest::Client,
pub config: Config,
shutdown_sender: OnceLock<ShutdownSender>, shutdown_sender: OnceLock<ShutdownSender>,
// the receiving end will be held by the application so it can update the UI based on backend // the receiving end will be held by the application so it can update the UI based on backend
// events // events
@ -76,6 +77,7 @@ pub struct JoecalService {
impl JoecalService { impl JoecalService {
pub async fn new( pub async fn new(
device: Device, device: Device,
config: Config,
transfer_event_tx: UnboundedSender<TransferEvent>, transfer_event_tx: UnboundedSender<TransferEvent>,
) -> crate::error::Result<Self> { ) -> crate::error::Result<Self> {
let socket = UdpSocket::bind(LISTENING_SOCKET_ADDR).await?; let socket = UdpSocket::bind(LISTENING_SOCKET_ADDR).await?;
@ -85,6 +87,7 @@ impl JoecalService {
Ok(Self { Ok(Self {
device, device,
config,
client: reqwest::Client::new(), client: reqwest::Client::new(),
socket: socket.into(), socket: socket.into(),
transfer_event_tx, transfer_event_tx,
@ -95,40 +98,39 @@ impl JoecalService {
}) })
} }
pub async fn start(&self, config: &Config, handles: &mut JoinSet<Listeners>) { pub async fn start(&self, handles: &mut JoinSet<Listeners>) {
let state = self.clone(); let service = self.clone();
let konfig = config.clone();
handles.spawn({ handles.spawn({
let (tx, shutdown_rx) = mpsc::channel(1); let (tx, shutdown_rx) = mpsc::channel(1);
let _ = self.shutdown_sender.set(tx); let _ = self.shutdown_sender.set(tx);
async move { async move {
if let Err(e) = state.start_http_server(shutdown_rx, &konfig).await { if let Err(e) = service.start_http_server(shutdown_rx).await {
error!("HTTP server error: {e}"); error!("HTTP server error: {e}");
} }
Listeners::Http Listeners::Http
} }
}); });
let state = self.clone(); let service = self.clone();
let konfig = config.clone();
handles.spawn({ handles.spawn({
async move { async move {
if let Err(e) = state.listen_multicast(&konfig).await { if let Err(e) = service.listen_multicast().await {
error!("UDP listener error: {e}"); error!("UDP listener error: {e}");
} }
Listeners::Multicast Listeners::Multicast
} }
}); });
let state = self.clone(); let service = self.clone();
let config = config.clone();
handles.spawn({ handles.spawn({
async move { async move {
loop { loop {
let rstate = state.running_state.lock().await; let rstate = service.running_state.lock().await;
if *rstate == RunningState::Stopping { if *rstate == RunningState::Stopping {
break; break;
} }
if let Err(e) = state.announce(None, &config).await { if let Err(e) = service.announce(None).await {
error!("Announcement error: {e}"); error!("Announcement error: {e}");
} }
tokio::time::sleep(std::time::Duration::from_secs(5)).await; tokio::time::sleep(std::time::Duration::from_secs(5)).await;

View file

@ -35,14 +35,14 @@ async fn start_and_run(
) -> error::Result<()> { ) -> error::Result<()> {
let (event_tx, event_listener) = unbounded_channel(); let (event_tx, event_listener) = unbounded_channel();
let service = JoecalService::new(device, event_tx) let service = JoecalService::new(device, config.clone(), event_tx)
.await .await
.expect("Could not create JoecalService"); .expect("Could not create JoecalService");
let mut app = App::new(service, event_listener); let mut app = App::new(service, event_listener);
let mut handles = JoinSet::new(); let mut handles = JoinSet::new();
app.service.start(&config, &mut handles).await; app.service.start(&mut handles).await;
loop { loop {
terminal.draw(|frame| app.draw(frame))?; terminal.draw(|frame| app.draw(frame))?;
app.handle_events().await?; app.handle_events().await?;

View file

@ -1,7 +1,7 @@
use std::{collections::HashMap, net::SocketAddr, path::PathBuf}; use std::{collections::HashMap, net::SocketAddr, path::PathBuf};
use axum::{ use axum::{
Extension, Json, Json,
body::Bytes, body::Bytes,
extract::{ConnectInfo, Query, State}, extract::{ConnectInfo, Query, State},
http::StatusCode, http::StatusCode,
@ -189,7 +189,7 @@ impl JoecalService {
} }
} }
pub async fn register_prepare_upload( pub async fn prepare_upload(
State(service): State<JoecalService>, State(service): State<JoecalService>,
ConnectInfo(addr): ConnectInfo<SocketAddr>, ConnectInfo(addr): ConnectInfo<SocketAddr>,
Json(req): Json<PrepareUploadRequest>, Json(req): Json<PrepareUploadRequest>,
@ -259,10 +259,9 @@ pub async fn register_prepare_upload(
.into_response() .into_response()
} }
pub async fn register_upload( pub async fn receive_upload(
Query(params): Query<UploadParams>, Query(params): Query<UploadParams>,
State(service): State<JoecalService>, State(service): State<JoecalService>,
Extension(download_dir): Extension<String>,
body: Bytes, body: Bytes,
) -> impl IntoResponse { ) -> impl IntoResponse {
// Extract query parameters // Extract query parameters
@ -298,8 +297,10 @@ pub async fn register_upload(
} }
}; };
let download_dir = &service.config.download_dir;
// Create directory if it doesn't exist // Create directory if it doesn't exist
if let Err(e) = tokio::fs::create_dir_all(&*download_dir).await { if let Err(e) = tokio::fs::create_dir_all(download_dir).await {
return ( return (
StatusCode::INTERNAL_SERVER_ERROR, StatusCode::INTERNAL_SERVER_ERROR,
format!("Failed to create directory: {e}"), format!("Failed to create directory: {e}"),