use std::{ io::{Stdout, Write}, sync::{ atomic::{AtomicU64, AtomicUsize, Ordering}, Arc, Mutex, OnceLock, }, }; pub mod protocol; pub use protocol::{Body, Message}; pub type DynNode = Arc>; pub trait Node { fn handle(&mut self, runner: &Runner, msg: &Message); } pub struct Runner { msg_id: AtomicU64, node: DynNode, node_id: OnceLock, nodes: OnceLock>, steps: AtomicUsize, output: Arc>, } impl Runner { pub fn new(node: DynNode, output: Stdout) -> Self { Runner { node, msg_id: AtomicU64::new(1), nodes: OnceLock::new(), node_id: OnceLock::new(), steps: AtomicUsize::new(0), output: Arc::new(Mutex::new(output)), } } pub fn run(&self, input: std::sync::mpsc::Receiver) { for msg in input.iter() { self.steps.fetch_add(1, Ordering::SeqCst); let typ = &msg.body.typ; if let "init" = typ.as_str() { self.init(&msg); let body = Body::from_type("init_ok"); self.reply(&msg, body); } else { let mut n = self.node.lock().unwrap(); n.handle(self, &msg); } } } pub fn node_id(&self) -> String { self.node_id.get().cloned().unwrap_or_default() } pub fn next_msg_id(&self) -> u64 { self.msg_id.fetch_add(1, Ordering::SeqCst) } pub fn cur_msg_id(&self) -> u64 { self.msg_id.load(Ordering::SeqCst) } pub fn nodes(&self) -> &[String] { self.nodes.get().unwrap() } pub fn steps(&self) -> usize { self.steps.load(Ordering::SeqCst) } pub fn init(&self, msg: &Message) { let node_id = msg .body .payload .get("node_id") .unwrap() .as_str() .unwrap() .to_owned(); let nodes = msg .body .payload .get("node_ids") .unwrap() .as_array() .unwrap() .iter() .map(|s| s.as_str().unwrap().to_string()) .collect::>(); let _ = self.node_id.get_or_init(|| node_id.to_owned()); let _ = self.nodes.get_or_init(|| nodes.to_vec()); } pub fn reply(&self, req: &Message, body: Body) { let mut body = body; let dest = req.src.clone(); let in_reply_to = req.body.msg_id; body.in_reply_to = in_reply_to; self.send(&dest, body); } pub fn send(&self, dest: &str, body: Body) { let mut body = body; if body.msg_id == 0 { body.msg_id = self.next_msg_id(); } let msg = Message { src: self.node_id().to_string(), dest: dest.to_string(), body, }; let msg = serde_json::to_string(&msg).unwrap(); self.writeln(&msg); } fn writeln(&self, msg: &str) { let mut out = self.output.lock().unwrap(); let msg = format!("{msg}\n"); out.write_all(msg.as_bytes()).unwrap(); out.flush().unwrap(); } }