rename joecalstate to joecalservice, prune stale upload requests
This commit is contained in:
parent
3e094c46dc
commit
d73076f197
7 changed files with 105 additions and 119 deletions
111
src/app/mod.rs
111
src/app/mod.rs
|
@ -1,9 +1,9 @@
|
|||
use std::{collections::BTreeMap, net::SocketAddr, sync::OnceLock, time::Duration};
|
||||
use std::{collections::BTreeMap, net::SocketAddr, time::Duration};
|
||||
|
||||
use crossterm::event::{Event, EventStream, KeyCode, KeyEvent, KeyEventKind};
|
||||
use futures::{FutureExt, StreamExt};
|
||||
use joecalsend::{
|
||||
Config, JoecalState, JoecalUploadRequest, Listeners, TransferEvent, UploadDialog,
|
||||
Config, JoecalService, JoecalUploadRequest, Listeners, TransferEvent, UploadDialog,
|
||||
error::Result, models::Device,
|
||||
};
|
||||
use julid::Julid;
|
||||
|
@ -19,7 +19,7 @@ pub mod widgets;
|
|||
pub type Peers = BTreeMap<SocketAddr, (String, String)>;
|
||||
|
||||
pub struct App {
|
||||
pub state: OnceLock<JoecalState>,
|
||||
pub service: JoecalService,
|
||||
pub screen: Vec<CurrentScreen>,
|
||||
pub events: EventStream,
|
||||
// addr -> (alias, fingerprint)
|
||||
|
@ -28,7 +28,7 @@ pub struct App {
|
|||
upload_state: TableState,
|
||||
// for getting messages back from the web server or web client about things we've done; the
|
||||
// other end is held by the state
|
||||
transfer_event_rx: OnceLock<UnboundedReceiver<TransferEvent>>,
|
||||
event_listener: UnboundedReceiver<TransferEvent>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
|
@ -40,68 +40,71 @@ pub enum CurrentScreen {
|
|||
Logging,
|
||||
}
|
||||
|
||||
impl Default for App {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
#[tokio::main]
|
||||
pub async fn start_and_run(
|
||||
terminal: &mut DefaultTerminal,
|
||||
config: Config,
|
||||
device: Device,
|
||||
) -> Result<()> {
|
||||
let (event_tx, event_listener) = unbounded_channel();
|
||||
|
||||
let service = JoecalService::new(device, event_tx)
|
||||
.await
|
||||
.expect("Could not create JoecalService");
|
||||
|
||||
let mut app = App::new(service, event_listener);
|
||||
|
||||
let mut handles = JoinSet::new();
|
||||
app.service.start(&config, &mut handles).await;
|
||||
loop {
|
||||
terminal.draw(|frame| app.draw(frame))?;
|
||||
app.handle_events().await?;
|
||||
|
||||
if let Some(&top) = app.screen.last()
|
||||
&& top == CurrentScreen::Stopping
|
||||
{
|
||||
app.service.stop().await;
|
||||
break;
|
||||
}
|
||||
|
||||
let peers = app.service.peers.lock().await;
|
||||
app.peers.clear();
|
||||
peers.iter().for_each(|(fingerprint, (addr, device))| {
|
||||
let alias = device.alias.clone();
|
||||
app.peers
|
||||
.insert(addr.to_owned(), (alias, fingerprint.to_owned()));
|
||||
});
|
||||
|
||||
let mut stale_uploads = Vec::new();
|
||||
let now = chrono::Utc::now().timestamp_millis() as u64;
|
||||
for (id, request) in app.uploads.iter() {
|
||||
if request.tx.is_closed() || (now - id.timestamp()) > 60_000 {
|
||||
stale_uploads.push(*id);
|
||||
}
|
||||
}
|
||||
for id in stale_uploads {
|
||||
app.uploads.remove(&id);
|
||||
}
|
||||
}
|
||||
|
||||
shutdown(&mut handles).await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
impl App {
|
||||
pub fn new() -> Self {
|
||||
pub fn new(service: JoecalService, event_listener: UnboundedReceiver<TransferEvent>) -> Self {
|
||||
App {
|
||||
service,
|
||||
event_listener,
|
||||
screen: vec![CurrentScreen::Main],
|
||||
state: Default::default(),
|
||||
events: Default::default(),
|
||||
peers: Default::default(),
|
||||
uploads: Default::default(),
|
||||
upload_state: Default::default(),
|
||||
transfer_event_rx: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
pub async fn start_and_run(
|
||||
&mut self,
|
||||
terminal: &mut DefaultTerminal,
|
||||
config: Config,
|
||||
device: Device,
|
||||
) -> Result<()> {
|
||||
let (transfer_event_tx, transfer_event_rx) = unbounded_channel();
|
||||
|
||||
let state = JoecalState::new(device, transfer_event_tx)
|
||||
.await
|
||||
.expect("Could not create JoecalState");
|
||||
|
||||
let _ = self.transfer_event_rx.set(transfer_event_rx);
|
||||
|
||||
let mut handles = JoinSet::new();
|
||||
state.start(&config, &mut handles).await;
|
||||
let _ = self.state.set(state);
|
||||
loop {
|
||||
terminal.draw(|frame| self.draw(frame))?;
|
||||
self.handle_events().await?;
|
||||
|
||||
if let Some(&top) = self.screen.last()
|
||||
&& top == CurrentScreen::Stopping
|
||||
{
|
||||
self.state.get().unwrap().stop().await;
|
||||
break;
|
||||
}
|
||||
|
||||
let peers = self.state.get().unwrap().peers.lock().await;
|
||||
self.peers.clear();
|
||||
peers.iter().for_each(|(fingerprint, (addr, device))| {
|
||||
let alias = device.alias.clone();
|
||||
self.peers
|
||||
.insert(addr.to_owned(), (alias, fingerprint.to_owned()));
|
||||
});
|
||||
}
|
||||
|
||||
shutdown(&mut handles).await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn handle_events(&mut self) -> Result<()> {
|
||||
tokio::select! {
|
||||
event = self.events.next().fuse() => {
|
||||
|
@ -116,7 +119,7 @@ impl App {
|
|||
}
|
||||
}
|
||||
}
|
||||
transfer_event = self.transfer_event_rx.get_mut().unwrap().recv() => {
|
||||
transfer_event = self.event_listener.recv() => {
|
||||
if let Some(event) = transfer_event {
|
||||
debug!("got transferr event {event:?}");
|
||||
match event {
|
||||
|
|
|
@ -9,11 +9,12 @@ use axum::{
|
|||
extract::{ConnectInfo, State},
|
||||
};
|
||||
use log::{debug, error, trace, warn};
|
||||
use network_interface::{Addr, NetworkInterface, NetworkInterfaceConfig, V4IfAddr};
|
||||
use tokio::net::UdpSocket;
|
||||
|
||||
use crate::{Config, JoecalState, RunningState, models::Device};
|
||||
use crate::{Config, JoecalService, RunningState, models::Device};
|
||||
|
||||
impl JoecalState {
|
||||
impl JoecalService {
|
||||
pub async fn announce(
|
||||
&self,
|
||||
socket: Option<SocketAddr>,
|
||||
|
@ -96,7 +97,7 @@ impl JoecalState {
|
|||
|
||||
/// Axum request handler for receiving other devices' registration requests.
|
||||
pub async fn register_device(
|
||||
State(state): State<JoecalState>,
|
||||
State(state): State<JoecalService>,
|
||||
ConnectInfo(addr): ConnectInfo<SocketAddr>,
|
||||
Json(device): Json<Device>,
|
||||
) -> Json<Device> {
|
||||
|
@ -134,3 +135,33 @@ async fn announce_multicast(
|
|||
socket.send_to(msg.as_bytes(), addr).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/*
|
||||
async fn announce_unicast(
|
||||
device: &Device,
|
||||
ip: Option<SocketAddr>,
|
||||
client: reqwest::Client,
|
||||
) -> crate::error::Result<()> {
|
||||
// for enumerating subnet peers when multicast fails (https://github.com/localsend/protocol?tab=readme-ov-file#32-http-legacy-mode)
|
||||
let std::net::IpAddr::V4(ip) = local_ip_address::local_ip()? else {
|
||||
unreachable!()
|
||||
};
|
||||
|
||||
let mut _network_ip = ip;
|
||||
let nifs = NetworkInterface::show()?;
|
||||
for addr in nifs.into_iter().flat_map(|i| i.addr) {
|
||||
if let Addr::V4(V4IfAddr {
|
||||
ip: ifip,
|
||||
netmask: Some(netmask),
|
||||
..
|
||||
}) = addr
|
||||
&& ip == ifip
|
||||
{
|
||||
_network_ip = ip & netmask;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
todo!()
|
||||
}
|
||||
*/
|
||||
|
|
|
@ -44,6 +44,9 @@ pub enum LocalSendError {
|
|||
|
||||
#[error("Error getting local IP")]
|
||||
IpAddrError(#[from] local_ip_address::Error),
|
||||
|
||||
#[error("Error getting network interface")]
|
||||
NetworkInterfaceError(#[from] network_interface::Error),
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, LocalSendError>;
|
||||
|
|
|
@ -9,12 +9,12 @@ use tokio::{net::TcpListener, sync::mpsc};
|
|||
use tower_http::limit::RequestBodyLimitLayer;
|
||||
|
||||
use crate::{
|
||||
Config, JoecalState,
|
||||
Config, JoecalService,
|
||||
discovery::register_device,
|
||||
transfer::{register_prepare_upload, register_upload},
|
||||
};
|
||||
|
||||
impl JoecalState {
|
||||
impl JoecalService {
|
||||
pub async fn start_http_server(
|
||||
&self,
|
||||
stop_rx: mpsc::Receiver<()>,
|
||||
|
|
33
src/lib.rs
33
src/lib.rs
|
@ -63,21 +63,20 @@ pub struct JoecalUploadRequest {
|
|||
|
||||
/// Contains the main network and backend state for an application session.
|
||||
#[derive(Clone)]
|
||||
pub struct JoecalState {
|
||||
pub struct JoecalService {
|
||||
pub device: Device,
|
||||
pub peers: Arc<Mutex<HashMap<String, (SocketAddr, Device)>>>,
|
||||
pub sessions: Arc<Mutex<HashMap<String, Session>>>, // Session ID to Session
|
||||
pub running_state: Arc<Mutex<RunningState>>,
|
||||
pub socket: Arc<UdpSocket>,
|
||||
pub client: reqwest::Client,
|
||||
pub upload_requests: Arc<Mutex<HashMap<Julid, JoecalUploadRequest>>>,
|
||||
shutdown_sender: OnceLock<ShutdownSender>,
|
||||
// the receiving end will be held by the application so it can update the UI based on backend
|
||||
// events
|
||||
transfer_event_tx: UnboundedSender<TransferEvent>,
|
||||
}
|
||||
|
||||
impl JoecalState {
|
||||
impl JoecalService {
|
||||
pub async fn new(
|
||||
device: Device,
|
||||
transfer_event_tx: UnboundedSender<TransferEvent>,
|
||||
|
@ -91,12 +90,11 @@ impl JoecalState {
|
|||
device,
|
||||
client: reqwest::Client::new(),
|
||||
socket: socket.into(),
|
||||
transfer_event_tx,
|
||||
peers: Default::default(),
|
||||
sessions: Default::default(),
|
||||
running_state: Default::default(),
|
||||
shutdown_sender: Default::default(),
|
||||
upload_requests: Default::default(),
|
||||
transfer_event_tx,
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -141,9 +139,6 @@ impl JoecalState {
|
|||
Listeners::Udp
|
||||
}
|
||||
});
|
||||
|
||||
// TODO: add a task that periodically clears out the upload requests if
|
||||
// they're too old; the keys are julids so they have the time in them
|
||||
}
|
||||
|
||||
pub async fn stop(&self) {
|
||||
|
@ -162,28 +157,6 @@ impl JoecalState {
|
|||
peers.clear();
|
||||
}
|
||||
|
||||
pub async fn get_upload_request(&self, id: Julid) -> Option<JoecalUploadRequest> {
|
||||
self.upload_requests.lock().await.get(&id).cloned()
|
||||
}
|
||||
|
||||
pub async fn clear_upload_request(&self, id: Julid) {
|
||||
let _ = self.upload_requests.lock().await.remove(&id);
|
||||
}
|
||||
|
||||
/// Add a transmitter for an upload request confirmation dialog that the
|
||||
/// application frontend can use to tell the Axum handler whether or not to
|
||||
/// accept the upload.
|
||||
///
|
||||
/// IMPORTANT! Be sure to call `clear_upload_request(id)` when you're done
|
||||
/// getting an answer back/before you exit!
|
||||
pub async fn add_upload_request(&self, id: Julid, request: JoecalUploadRequest) {
|
||||
self.upload_requests
|
||||
.lock()
|
||||
.await
|
||||
.entry(id)
|
||||
.insert_entry(request);
|
||||
}
|
||||
|
||||
pub fn send_event(&self, event: TransferEvent) {
|
||||
if let Err(e) = self.transfer_event_tx.send(event.clone()) {
|
||||
error!("got error sending transfer event '{event:?}': {e:?}");
|
||||
|
|
26
src/main.rs
26
src/main.rs
|
@ -1,7 +1,5 @@
|
|||
use app::App;
|
||||
use joecalsend::{Config, error, models::Device};
|
||||
use local_ip_address::local_ip;
|
||||
use network_interface::{Addr, NetworkInterface, NetworkInterfaceConfig, V4IfAddr};
|
||||
use tui_logger::{LevelFilter, init_logger, set_env_filter_from_env};
|
||||
|
||||
mod app;
|
||||
|
@ -9,10 +7,6 @@ mod app;
|
|||
fn main() -> error::Result<()> {
|
||||
let device = Device::default();
|
||||
|
||||
let std::net::IpAddr::V4(ip) = local_ip()? else {
|
||||
unreachable!()
|
||||
};
|
||||
|
||||
if std::env::var("RUST_LOG").is_err() {
|
||||
unsafe {
|
||||
std::env::set_var("RUST_LOG", "joecalsend");
|
||||
|
@ -21,28 +15,10 @@ fn main() -> error::Result<()> {
|
|||
init_logger(LevelFilter::Debug).map_err(|e| std::io::Error::other(format!("{e}")))?;
|
||||
set_env_filter_from_env(None);
|
||||
|
||||
// for enumerating subnet peers when multicast fails (https://github.com/localsend/protocol?tab=readme-ov-file#32-http-legacy-mode)
|
||||
let mut _network_ip = ip;
|
||||
let nifs = NetworkInterface::show().unwrap();
|
||||
for addr in nifs.into_iter().flat_map(|i| i.addr) {
|
||||
if let Addr::V4(V4IfAddr {
|
||||
ip: ifip,
|
||||
netmask: Some(netmask),
|
||||
..
|
||||
}) = addr
|
||||
&& ip == ifip
|
||||
{
|
||||
_network_ip = ip & netmask;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
let config = Config::default();
|
||||
|
||||
let mut app = App::new();
|
||||
|
||||
let mut terminal = ratatui::init();
|
||||
let result = app.start_and_run(&mut terminal, config, device);
|
||||
let result = app::start_and_run(&mut terminal, config, device);
|
||||
ratatui::restore();
|
||||
|
||||
result
|
||||
|
|
|
@ -13,7 +13,7 @@ use serde::{Deserialize, Serialize};
|
|||
use tokio::sync::mpsc::unbounded_channel;
|
||||
|
||||
use crate::{
|
||||
JoecalState, JoecalUploadRequest, TransferEvent, UploadDialog,
|
||||
JoecalService, JoecalUploadRequest, TransferEvent, UploadDialog,
|
||||
error::{LocalSendError, Result},
|
||||
models::{Device, FileMetadata},
|
||||
};
|
||||
|
@ -52,7 +52,7 @@ pub struct PrepareUploadRequest {
|
|||
pub files: HashMap<String, FileMetadata>,
|
||||
}
|
||||
|
||||
impl JoecalState {
|
||||
impl JoecalService {
|
||||
pub async fn prepare_upload(
|
||||
&self,
|
||||
peer: String,
|
||||
|
@ -190,7 +190,7 @@ impl JoecalState {
|
|||
}
|
||||
|
||||
pub async fn register_prepare_upload(
|
||||
State(state): State<JoecalState>,
|
||||
State(state): State<JoecalService>,
|
||||
ConnectInfo(addr): ConnectInfo<SocketAddr>,
|
||||
Json(req): Json<PrepareUploadRequest>,
|
||||
) -> impl IntoResponse {
|
||||
|
@ -263,7 +263,7 @@ pub async fn register_prepare_upload(
|
|||
|
||||
pub async fn register_upload(
|
||||
Query(params): Query<UploadParams>,
|
||||
State(state): State<JoecalState>,
|
||||
State(state): State<JoecalService>,
|
||||
Extension(download_dir): Extension<String>,
|
||||
body: Bytes,
|
||||
) -> impl IntoResponse {
|
||||
|
@ -339,7 +339,7 @@ pub struct UploadParams {
|
|||
|
||||
pub async fn register_cancel(
|
||||
Query(params): Query<CancelParams>,
|
||||
State(state): State<JoecalState>,
|
||||
State(state): State<JoecalService>,
|
||||
) -> impl IntoResponse {
|
||||
let mut sessions_lock = state.sessions.lock().await;
|
||||
let session = match sessions_lock.get_mut(¶ms.session_id) {
|
||||
|
|
Loading…
Reference in a new issue