Compare commits

..

2 commits

Author SHA1 Message Date
Joe Ardent
34e64f5e51 move stuff around a little 2025-08-02 11:28:57 -07:00
Joe Ardent
d73076f197 rename joecalstate to joecalservice, prune stale upload requests 2025-08-02 11:08:17 -07:00
7 changed files with 148 additions and 166 deletions

View file

@ -1,25 +1,19 @@
use std::{collections::BTreeMap, net::SocketAddr, sync::OnceLock, time::Duration};
use std::{collections::BTreeMap, net::SocketAddr, time::Duration};
use crossterm::event::{Event, EventStream, KeyCode, KeyEvent, KeyEventKind};
use futures::{FutureExt, StreamExt};
use joecalsend::{
Config, JoecalState, JoecalUploadRequest, Listeners, TransferEvent, UploadDialog,
error::Result, models::Device,
};
use joecalsend::{JoecalService, JoecalUploadRequest, TransferEvent, UploadDialog, error::Result};
use julid::Julid;
use log::{LevelFilter, debug, error, info, warn};
use ratatui::{DefaultTerminal, Frame, widgets::TableState};
use tokio::{
sync::mpsc::{UnboundedReceiver, unbounded_channel},
task::JoinSet,
};
use log::{LevelFilter, debug, error, warn};
use ratatui::{Frame, widgets::TableState};
use tokio::sync::mpsc::UnboundedReceiver;
pub mod widgets;
pub type Peers = BTreeMap<SocketAddr, (String, String)>;
pub struct App {
pub state: OnceLock<JoecalState>,
pub service: JoecalService,
pub screen: Vec<CurrentScreen>,
pub events: EventStream,
// addr -> (alias, fingerprint)
@ -28,7 +22,7 @@ pub struct App {
upload_state: TableState,
// for getting messages back from the web server or web client about things we've done; the
// other end is held by the state
transfer_event_rx: OnceLock<UnboundedReceiver<TransferEvent>>,
event_listener: UnboundedReceiver<TransferEvent>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
@ -40,69 +34,20 @@ pub enum CurrentScreen {
Logging,
}
impl Default for App {
fn default() -> Self {
Self::new()
}
}
impl App {
pub fn new() -> Self {
pub fn new(service: JoecalService, event_listener: UnboundedReceiver<TransferEvent>) -> Self {
App {
service,
event_listener,
screen: vec![CurrentScreen::Main],
state: Default::default(),
events: Default::default(),
peers: Default::default(),
uploads: Default::default(),
upload_state: Default::default(),
transfer_event_rx: Default::default(),
}
}
#[tokio::main]
pub async fn start_and_run(
&mut self,
terminal: &mut DefaultTerminal,
config: Config,
device: Device,
) -> Result<()> {
let (transfer_event_tx, transfer_event_rx) = unbounded_channel();
let state = JoecalState::new(device, transfer_event_tx)
.await
.expect("Could not create JoecalState");
let _ = self.transfer_event_rx.set(transfer_event_rx);
let mut handles = JoinSet::new();
state.start(&config, &mut handles).await;
let _ = self.state.set(state);
loop {
terminal.draw(|frame| self.draw(frame))?;
self.handle_events().await?;
if let Some(&top) = self.screen.last()
&& top == CurrentScreen::Stopping
{
self.state.get().unwrap().stop().await;
break;
}
let peers = self.state.get().unwrap().peers.lock().await;
self.peers.clear();
peers.iter().for_each(|(fingerprint, (addr, device))| {
let alias = device.alias.clone();
self.peers
.insert(addr.to_owned(), (alias, fingerprint.to_owned()));
});
}
shutdown(&mut handles).await;
Ok(())
}
async fn handle_events(&mut self) -> Result<()> {
pub async fn handle_events(&mut self) -> Result<()> {
tokio::select! {
event = self.events.next().fuse() => {
if let Some(Ok(evt)) = event {
@ -116,7 +61,7 @@ impl App {
}
}
}
transfer_event = self.transfer_event_rx.get_mut().unwrap().recv() => {
transfer_event = self.event_listener.recv() => {
if let Some(event) = transfer_event {
debug!("got transferr event {event:?}");
match event {
@ -135,7 +80,7 @@ impl App {
Ok(())
}
fn handle_key_event(&mut self, key_event: KeyEvent) {
pub fn handle_key_event(&mut self, key_event: KeyEvent) {
match self.screen.last().unwrap() {
CurrentScreen::Logging => match key_event.code {
KeyCode::Esc => self.pop(),
@ -164,7 +109,7 @@ impl App {
}
}
fn accept(&mut self) {
pub fn accept(&mut self) {
let Some(idx) = self.upload_state.selected() else {
return;
};
@ -182,7 +127,7 @@ impl App {
};
}
fn deny(&mut self) {
pub fn deny(&mut self) {
let Some(idx) = self.upload_state.selected() else {
return;
};
@ -201,15 +146,15 @@ impl App {
self.uploads.remove(key);
}
fn draw(&mut self, frame: &mut Frame) {
pub fn draw(&mut self, frame: &mut Frame) {
frame.render_widget(self, frame.area());
}
fn exit(&mut self) {
pub fn exit(&mut self) {
self.screen.push(CurrentScreen::Stopping);
}
fn send(&mut self) {
pub fn send(&mut self) {
let last = self.screen.last();
match last {
Some(CurrentScreen::Sending) => {}
@ -217,7 +162,7 @@ impl App {
}
}
fn recv(&mut self) {
pub fn recv(&mut self) {
let last = self.screen.last();
match last {
Some(CurrentScreen::Receiving) => {}
@ -225,7 +170,7 @@ impl App {
}
}
fn logs(&mut self) {
pub fn logs(&mut self) {
let last = self.screen.last();
match last {
Some(CurrentScreen::Logging) => {}
@ -233,7 +178,7 @@ impl App {
}
}
fn pop(&mut self) {
pub fn pop(&mut self) {
self.screen.pop();
if self.screen.last().is_none() {
self.screen.push(CurrentScreen::Main);
@ -250,26 +195,3 @@ fn change_log_level(delta: isize) {
log::set_max_level(level);
}
async fn shutdown(handles: &mut JoinSet<Listeners>) {
let mut alarm = tokio::time::interval(tokio::time::Duration::from_secs(5));
alarm.tick().await;
loop {
tokio::select! {
join_result = handles.join_next() => {
match join_result {
Some(handle) => match handle {
Ok(h) => info!("Stopped {h:?}"),
Err(e) => error!("Got error {e:?}"),
}
None => break,
}
}
_ = alarm.tick() => {
info!("Exit timeout reached, aborting all unjoined tasks");
handles.abort_all();
break;
},
}
}
}

View file

@ -11,9 +11,9 @@ use axum::{
use log::{debug, error, trace, warn};
use tokio::net::UdpSocket;
use crate::{Config, JoecalState, RunningState, models::Device};
use crate::{Config, JoecalService, RunningState, models::Device};
impl JoecalState {
impl JoecalService {
pub async fn announce(
&self,
socket: Option<SocketAddr>,
@ -96,7 +96,7 @@ impl JoecalState {
/// Axum request handler for receiving other devices' registration requests.
pub async fn register_device(
State(state): State<JoecalState>,
State(state): State<JoecalService>,
ConnectInfo(addr): ConnectInfo<SocketAddr>,
Json(device): Json<Device>,
) -> Json<Device> {
@ -134,3 +134,33 @@ async fn announce_multicast(
socket.send_to(msg.as_bytes(), addr).await?;
Ok(())
}
/*
async fn announce_unicast(
device: &Device,
ip: Option<SocketAddr>,
client: reqwest::Client,
) -> crate::error::Result<()> {
// for enumerating subnet peers when multicast fails (https://github.com/localsend/protocol?tab=readme-ov-file#32-http-legacy-mode)
let std::net::IpAddr::V4(ip) = local_ip_address::local_ip()? else {
unreachable!()
};
let mut _network_ip = ip;
let nifs = NetworkInterface::show()?;
for addr in nifs.into_iter().flat_map(|i| i.addr) {
if let Addr::V4(V4IfAddr {
ip: ifip,
netmask: Some(netmask),
..
}) = addr
&& ip == ifip
{
_network_ip = ip & netmask;
break;
}
}
todo!()
}
*/

View file

@ -44,6 +44,9 @@ pub enum LocalSendError {
#[error("Error getting local IP")]
IpAddrError(#[from] local_ip_address::Error),
#[error("Error getting network interface")]
NetworkInterfaceError(#[from] network_interface::Error),
}
pub type Result<T> = std::result::Result<T, LocalSendError>;

View file

@ -9,12 +9,12 @@ use tokio::{net::TcpListener, sync::mpsc};
use tower_http::limit::RequestBodyLimitLayer;
use crate::{
Config, JoecalState,
Config, JoecalService,
discovery::register_device,
transfer::{register_prepare_upload, register_upload},
};
impl JoecalState {
impl JoecalService {
pub async fn start_http_server(
&self,
stop_rx: mpsc::Receiver<()>,

View file

@ -63,21 +63,20 @@ pub struct JoecalUploadRequest {
/// Contains the main network and backend state for an application session.
#[derive(Clone)]
pub struct JoecalState {
pub struct JoecalService {
pub device: Device,
pub peers: Arc<Mutex<HashMap<String, (SocketAddr, Device)>>>,
pub sessions: Arc<Mutex<HashMap<String, Session>>>, // Session ID to Session
pub running_state: Arc<Mutex<RunningState>>,
pub socket: Arc<UdpSocket>,
pub client: reqwest::Client,
pub upload_requests: Arc<Mutex<HashMap<Julid, JoecalUploadRequest>>>,
shutdown_sender: OnceLock<ShutdownSender>,
// the receiving end will be held by the application so it can update the UI based on backend
// events
transfer_event_tx: UnboundedSender<TransferEvent>,
}
impl JoecalState {
impl JoecalService {
pub async fn new(
device: Device,
transfer_event_tx: UnboundedSender<TransferEvent>,
@ -91,12 +90,11 @@ impl JoecalState {
device,
client: reqwest::Client::new(),
socket: socket.into(),
transfer_event_tx,
peers: Default::default(),
sessions: Default::default(),
running_state: Default::default(),
shutdown_sender: Default::default(),
upload_requests: Default::default(),
transfer_event_tx,
})
}
@ -141,9 +139,6 @@ impl JoecalState {
Listeners::Udp
}
});
// TODO: add a task that periodically clears out the upload requests if
// they're too old; the keys are julids so they have the time in them
}
pub async fn stop(&self) {
@ -162,28 +157,6 @@ impl JoecalState {
peers.clear();
}
pub async fn get_upload_request(&self, id: Julid) -> Option<JoecalUploadRequest> {
self.upload_requests.lock().await.get(&id).cloned()
}
pub async fn clear_upload_request(&self, id: Julid) {
let _ = self.upload_requests.lock().await.remove(&id);
}
/// Add a transmitter for an upload request confirmation dialog that the
/// application frontend can use to tell the Axum handler whether or not to
/// accept the upload.
///
/// IMPORTANT! Be sure to call `clear_upload_request(id)` when you're done
/// getting an answer back/before you exit!
pub async fn add_upload_request(&self, id: Julid, request: JoecalUploadRequest) {
self.upload_requests
.lock()
.await
.entry(id)
.insert_entry(request);
}
pub fn send_event(&self, event: TransferEvent) {
if let Err(e) = self.transfer_event_tx.send(event.clone()) {
error!("got error sending transfer event '{event:?}': {e:?}");

View file

@ -1,18 +1,15 @@
use app::App;
use joecalsend::{Config, error, models::Device};
use local_ip_address::local_ip;
use network_interface::{Addr, NetworkInterface, NetworkInterfaceConfig, V4IfAddr};
use joecalsend::{Config, JoecalService, Listeners, error, models::Device};
use log::{error, info};
use ratatui::DefaultTerminal;
use tokio::{sync::mpsc::unbounded_channel, task::JoinSet};
use tui_logger::{LevelFilter, init_logger, set_env_filter_from_env};
mod app;
use app::{App, CurrentScreen};
fn main() -> error::Result<()> {
let device = Device::default();
let std::net::IpAddr::V4(ip) = local_ip()? else {
unreachable!()
};
if std::env::var("RUST_LOG").is_err() {
unsafe {
std::env::set_var("RUST_LOG", "joecalsend");
@ -21,29 +18,86 @@ fn main() -> error::Result<()> {
init_logger(LevelFilter::Debug).map_err(|e| std::io::Error::other(format!("{e}")))?;
set_env_filter_from_env(None);
// for enumerating subnet peers when multicast fails (https://github.com/localsend/protocol?tab=readme-ov-file#32-http-legacy-mode)
let mut _network_ip = ip;
let nifs = NetworkInterface::show().unwrap();
for addr in nifs.into_iter().flat_map(|i| i.addr) {
if let Addr::V4(V4IfAddr {
ip: ifip,
netmask: Some(netmask),
..
}) = addr
&& ip == ifip
{
_network_ip = ip & netmask;
break;
}
}
let config = Config::default();
let mut app = App::new();
let mut terminal = ratatui::init();
let result = app.start_and_run(&mut terminal, config, device);
let result = start_and_run(&mut terminal, config, device);
ratatui::restore();
result
}
#[tokio::main]
async fn start_and_run(
terminal: &mut DefaultTerminal,
config: Config,
device: Device,
) -> error::Result<()> {
let (event_tx, event_listener) = unbounded_channel();
let service = JoecalService::new(device, event_tx)
.await
.expect("Could not create JoecalService");
let mut app = App::new(service, event_listener);
let mut handles = JoinSet::new();
app.service.start(&config, &mut handles).await;
loop {
terminal.draw(|frame| app.draw(frame))?;
app.handle_events().await?;
if let Some(&top) = app.screen.last()
&& top == CurrentScreen::Stopping
{
app.service.stop().await;
break;
}
let peers = app.service.peers.lock().await;
app.peers.clear();
peers.iter().for_each(|(fingerprint, (addr, device))| {
let alias = device.alias.clone();
app.peers
.insert(addr.to_owned(), (alias, fingerprint.to_owned()));
});
let mut stale_uploads = Vec::with_capacity(app.uploads.len());
let now = chrono::Utc::now().timestamp_millis() as u64;
for (id, request) in app.uploads.iter() {
if request.tx.is_closed() || (now - id.timestamp()) > 60_000 {
stale_uploads.push(*id);
}
}
for id in stale_uploads {
app.uploads.remove(&id);
}
}
shutdown(&mut handles).await;
Ok(())
}
async fn shutdown(handles: &mut JoinSet<Listeners>) {
let mut alarm = tokio::time::interval(tokio::time::Duration::from_secs(5));
alarm.tick().await;
loop {
tokio::select! {
join_result = handles.join_next() => {
match join_result {
Some(handle) => match handle {
Ok(h) => info!("Stopped {h:?}"),
Err(e) => error!("Got error {e:?}"),
}
None => break,
}
}
_ = alarm.tick() => {
info!("Exit timeout reached, aborting all unjoined tasks");
handles.abort_all();
break;
},
}
}
}

View file

@ -13,7 +13,7 @@ use serde::{Deserialize, Serialize};
use tokio::sync::mpsc::unbounded_channel;
use crate::{
JoecalState, JoecalUploadRequest, TransferEvent, UploadDialog,
JoecalService, JoecalUploadRequest, TransferEvent, UploadDialog,
error::{LocalSendError, Result},
models::{Device, FileMetadata},
};
@ -52,7 +52,7 @@ pub struct PrepareUploadRequest {
pub files: HashMap<String, FileMetadata>,
}
impl JoecalState {
impl JoecalService {
pub async fn prepare_upload(
&self,
peer: String,
@ -190,7 +190,7 @@ impl JoecalState {
}
pub async fn register_prepare_upload(
State(state): State<JoecalState>,
State(state): State<JoecalService>,
ConnectInfo(addr): ConnectInfo<SocketAddr>,
Json(req): Json<PrepareUploadRequest>,
) -> impl IntoResponse {
@ -263,7 +263,7 @@ pub async fn register_prepare_upload(
pub async fn register_upload(
Query(params): Query<UploadParams>,
State(state): State<JoecalState>,
State(state): State<JoecalService>,
Extension(download_dir): Extension<String>,
body: Bytes,
) -> impl IntoResponse {
@ -339,7 +339,7 @@ pub struct UploadParams {
pub async fn register_cancel(
Query(params): Query<CancelParams>,
State(state): State<JoecalState>,
State(state): State<JoecalService>,
) -> impl IntoResponse {
let mut sessions_lock = state.sessions.lock().await;
let session = match sessions_lock.get_mut(&params.session_id) {