shutdown works
This commit is contained in:
parent
9fa9df476d
commit
5e4e31d64f
3 changed files with 32 additions and 37 deletions
|
@ -33,12 +33,11 @@ impl JoecalState {
|
||||||
loop {
|
loop {
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
_ = timeout.tick() => {
|
_ = timeout.tick() => {
|
||||||
if let Ok(rstate) = self.running_state.try_lock()
|
let rstate = self.running_state.lock().await;
|
||||||
&& *rstate == RunningState::Stopping
|
if *rstate == RunningState::Stopping
|
||||||
{
|
{
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
dbg!("tick");
|
|
||||||
},
|
},
|
||||||
r = self.socket.recv_from(&mut buf) => {
|
r = self.socket.recv_from(&mut buf) => {
|
||||||
match r {
|
match r {
|
||||||
|
|
41
src/lib.rs
41
src/lib.rs
|
@ -7,7 +7,7 @@ pub mod transfer;
|
||||||
use std::{
|
use std::{
|
||||||
collections::HashMap,
|
collections::HashMap,
|
||||||
net::{Ipv4Addr, SocketAddr, SocketAddrV4},
|
net::{Ipv4Addr, SocketAddr, SocketAddrV4},
|
||||||
sync::Arc,
|
sync::{Arc, OnceLock},
|
||||||
time::Duration,
|
time::Duration,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -25,6 +25,8 @@ pub const MULTICAST_IP: Ipv4Addr = Ipv4Addr::new(224, 0, 0, 167);
|
||||||
pub const LISTENING_SOCKET_ADDR: SocketAddrV4 =
|
pub const LISTENING_SOCKET_ADDR: SocketAddrV4 =
|
||||||
SocketAddrV4::new(Ipv4Addr::from_bits(0), DEFAULT_PORT);
|
SocketAddrV4::new(Ipv4Addr::from_bits(0), DEFAULT_PORT);
|
||||||
|
|
||||||
|
type ShutdownSender = mpsc::Sender<()>;
|
||||||
|
|
||||||
/// Contains the main network and application state for an application session.
|
/// Contains the main network and application state for an application session.
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct JoecalState {
|
pub struct JoecalState {
|
||||||
|
@ -34,7 +36,7 @@ pub struct JoecalState {
|
||||||
pub running_state: Arc<Mutex<RunningState>>,
|
pub running_state: Arc<Mutex<RunningState>>,
|
||||||
pub socket: Arc<UdpSocket>,
|
pub socket: Arc<UdpSocket>,
|
||||||
pub client: reqwest::Client,
|
pub client: reqwest::Client,
|
||||||
stop_tx: std::sync::OnceLock<mpsc::Sender<()>>,
|
stop_tx: OnceLock<ShutdownSender>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl JoecalState {
|
impl JoecalState {
|
||||||
|
@ -62,10 +64,10 @@ impl JoecalState {
|
||||||
let state = self.clone();
|
let state = self.clone();
|
||||||
let konfig = config.clone();
|
let konfig = config.clone();
|
||||||
let server_handle = {
|
let server_handle = {
|
||||||
let (tx, rx) = mpsc::channel(1);
|
let (tx, shutdown_rx) = mpsc::channel(1);
|
||||||
self.stop_tx.get_or_init(|| tx);
|
self.stop_tx.get_or_init(|| tx);
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
if let Err(e) = state.start_http_server(rx, &konfig).await {
|
if let Err(e) = state.start_http_server(shutdown_rx, &konfig).await {
|
||||||
eprintln!("HTTP server error: {e}");
|
eprintln!("HTTP server error: {e}");
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
@ -85,12 +87,10 @@ impl JoecalState {
|
||||||
let announcement_handle = {
|
let announcement_handle = {
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
loop {
|
loop {
|
||||||
if let Ok(rstate) = state.running_state.try_lock()
|
let rstate = state.running_state.lock().await;
|
||||||
&& *rstate == RunningState::Stopping
|
if *rstate == RunningState::Stopping {
|
||||||
{
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Err(e) = state.announce(None, &config).await {
|
if let Err(e) = state.announce(None, &config).await {
|
||||||
eprintln!("Announcement error: {e}");
|
eprintln!("Announcement error: {e}");
|
||||||
}
|
}
|
||||||
|
@ -104,18 +104,17 @@ impl JoecalState {
|
||||||
|
|
||||||
pub async fn stop(&self) {
|
pub async fn stop(&self) {
|
||||||
loop {
|
loop {
|
||||||
if let Ok(mut rstate) = self.running_state.try_lock() {
|
let mut rstate = self.running_state.lock().await;
|
||||||
*rstate = RunningState::Stopping;
|
*rstate = RunningState::Stopping;
|
||||||
if self
|
if self
|
||||||
.stop_tx
|
.stop_tx
|
||||||
.get()
|
.get()
|
||||||
.expect("Could not get stop signal transmitter")
|
.expect("Could not get stop signal transmitter")
|
||||||
.send(())
|
.send(())
|
||||||
.await
|
.await
|
||||||
.is_ok()
|
.is_ok()
|
||||||
{
|
{
|
||||||
break;
|
break;
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
tokio::time::sleep(Duration::from_millis(777)).await;
|
tokio::time::sleep(Duration::from_millis(777)).await;
|
||||||
}
|
}
|
||||||
|
@ -131,8 +130,6 @@ impl JoecalState {
|
||||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||||
pub enum RunningState {
|
pub enum RunningState {
|
||||||
Running,
|
Running,
|
||||||
Sending,
|
|
||||||
Receiving,
|
|
||||||
Stopping,
|
Stopping,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
23
src/main.rs
23
src/main.rs
|
@ -46,7 +46,7 @@ async fn main() -> error::Result<()> {
|
||||||
let config = Config::default();
|
let config = Config::default();
|
||||||
let (h1, h2, h3) = state.start(&config).await.unwrap();
|
let (h1, h2, h3) = state.start(&config).await.unwrap();
|
||||||
|
|
||||||
let mut app = App::new(state.clone());
|
let mut app = App::new(state.clone()).await;
|
||||||
let mut terminal = ratatui::init();
|
let mut terminal = ratatui::init();
|
||||||
let result = app.run(&mut terminal).await;
|
let result = app.run(&mut terminal).await;
|
||||||
ratatui::restore();
|
ratatui::restore();
|
||||||
|
@ -58,20 +58,24 @@ async fn main() -> error::Result<()> {
|
||||||
|
|
||||||
struct App {
|
struct App {
|
||||||
state: JoecalState,
|
state: JoecalState,
|
||||||
|
rstate: RunningState,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl App {
|
impl App {
|
||||||
pub fn new(state: JoecalState) -> Self {
|
pub async fn new(state: JoecalState) -> Self {
|
||||||
App { state }
|
App {
|
||||||
|
state: state.clone(),
|
||||||
|
rstate: *state.running_state.lock().await,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn run(&mut self, terminal: &mut DefaultTerminal) -> io::Result<()> {
|
pub async fn run(&mut self, terminal: &mut DefaultTerminal) -> io::Result<()> {
|
||||||
loop {
|
loop {
|
||||||
terminal.draw(|frame| self.draw(frame))?;
|
terminal.draw(|frame| self.draw(frame))?;
|
||||||
self.handle_events().await?;
|
self.handle_events().await?;
|
||||||
if let Ok(rstate) = self.state.running_state.try_lock()
|
let rstate = self.state.running_state.lock().await;
|
||||||
&& *rstate == RunningState::Stopping
|
self.rstate = *rstate;
|
||||||
{
|
if *rstate == RunningState::Stopping {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -127,12 +131,7 @@ impl Widget for &App {
|
||||||
.title_bottom(instructions.centered())
|
.title_bottom(instructions.centered())
|
||||||
.border_set(border::THICK);
|
.border_set(border::THICK);
|
||||||
|
|
||||||
let rs = self
|
let rs = format!("{:?}", self.rstate);
|
||||||
.state
|
|
||||||
.running_state
|
|
||||||
.try_lock()
|
|
||||||
.map(|s| format!("{s:?}"))
|
|
||||||
.unwrap_or("Just a moment...".into());
|
|
||||||
let state_text = Text::from(vec![Line::from(vec!["runstate: ".into(), rs.yellow()])]);
|
let state_text = Text::from(vec![Line::from(vec!["runstate: ".into(), rs.yellow()])]);
|
||||||
|
|
||||||
Paragraph::new(state_text)
|
Paragraph::new(state_text)
|
||||||
|
|
Loading…
Reference in a new issue