use std::{ collections::HashMap, io::{BufRead, Write}, sync::{ atomic::{AtomicU64, Ordering}, mpsc::{channel, Receiver, Sender}, Arc, Mutex, OnceLock, }, thread::{self}, }; pub use serde_json::Value; pub mod protocol; pub use protocol::{Body, ErrorCode, Message, Payload}; pub mod kv; pub type NodeyNodeFace = Arc>; pub type OnInit = Box; pub type RpcPromise = Receiver; pub type RpcResult = std::result::Result, ErrorCode>; pub trait Node { fn handle(&mut self, runner: &Runner, msg: Message); } pub struct Runner { node: NodeyNodeFace, node_id: OnceLock, nodes: OnceLock>, backdoor: OnceLock>, promises: Arc>>>, outbound_tx: OnceLock>, msg_id: AtomicU64, } impl Runner { pub fn new(node: N) -> Self { let node = Arc::new(Mutex::new(node)); Runner { node, nodes: OnceLock::new(), node_id: OnceLock::new(), backdoor: OnceLock::new(), outbound_tx: OnceLock::new(), promises: Default::default(), msg_id: AtomicU64::new(1), } } /// Start processing messages from stdin and sending them to your node. The `on_init` argument /// is an optional callback that will be called with `&self` after the `init` message from /// Maelstrom has been processed. /// /// # Examples /// /// ```no_run /// use nebkor_maelstrom::{Body, Message, Node, Runner}; /// struct Foo; /// impl Node for Foo {fn handle(&mut self, _runner: &Runner, _msg: Message) { /* empty impl here*/ }} /// /// let runner = Runner::new(Foo); /// /// let on_init = |rnr: &Runner| { /// eprintln!("received the `init` message!"); /// let msg = Message { body: Body::from_type("ignore_me"), ..Default::default() }; /// // send `msg` to the node to be processed by its `handle()` method /// rnr.get_backdoor().send(msg).unwrap(); /// }; /// let on_init = Box::new(on_init); /// /// runner.run(Some(on_init)); /// ``` pub fn run(&self, on_init: Option) { let (stdin_tx, stdin_rx) = channel(); thread::spawn(move || { let stdin = std::io::stdin().lock().lines(); for line in stdin { stdin_tx.send(line.unwrap()).unwrap(); } }); let (stdout_tx, stdout_rx) = channel(); thread::spawn(move || { let mut stdout = std::io::stdout().lock(); for msg in stdout_rx { writeln!(&mut stdout, "{msg}").unwrap(); } }); let (outbound_tx, outbound_rx) = channel(); let _ = self.outbound_tx.get_or_init(|| outbound_tx); thread::spawn(move || { while let Ok(msg) = outbound_rx.recv() { let msg = serde_json::to_string(&msg).unwrap(); stdout_tx.send(msg).unwrap(); } }); self.process_input(stdin_rx, on_init); } /// Get a Sender that will send Messages to the node as input. Useful for triggering periodic /// behavior from a separate thread, or for sending a Message to the node from `on_init`. See /// the `broadcast` example for a use of it. pub fn get_backdoor(&self) -> Sender { self.backdoor.get().unwrap().clone() } pub fn node_id(&self) -> &str { self.node_id.get().unwrap() } pub fn next_msg_id(&self) -> u64 { self.msg_id.fetch_add(1, Ordering::SeqCst) } /// A list of all nodes in the network, including this one. pub fn nodes(&self) -> &[String] { self.nodes.get().unwrap() } /// Construct a new `Message` from `body` and send it to `req.src`. pub fn reply(&self, req: &Message, body: Body) { let mut body = body; let dest = req.src.as_str(); let in_reply_to = req.body.msg_id; body.in_reply_to = in_reply_to; self.send(dest, body); } /// Construct a new `Message` from `body` and send it to `dest`. pub fn send(&self, dest: &str, body: Body) { let msg = self.mk_msg(dest, body); self.outbound_tx.get().unwrap().send(msg).unwrap(); } /// Returns a Receiver that will receive the reply from the request. pub fn rpc(&self, dest: &str, body: Body) -> RpcPromise { let msg = self.mk_msg(dest, body); let (tx, rx) = channel(); { let msg_id = msg.body.msg_id; let mut g = self.promises.lock().unwrap(); g.insert(msg_id, tx); } self.outbound_tx.get().unwrap().send(msg).unwrap(); rx } // internal methods fn init(&self, msg: &Message) { let node_id = msg .body .payload .get("node_id") .unwrap() .as_str() .unwrap() .to_owned(); let nodes = msg .body .payload .get("node_ids") .unwrap() .as_array() .unwrap() .iter() .map(|s| s.as_str().unwrap().to_string()) .collect(); let _ = self.node_id.get_or_init(|| node_id); let _ = self.nodes.get_or_init(|| nodes); } fn process_input(&self, stdin_rx: Receiver, on_init: Option) { let (json_tx, json_rx) = channel(); let _ = self.backdoor.get_or_init(|| json_tx.clone()); let proms = self.promises.clone(); thread::spawn(move || { for line in stdin_rx { let msg: Message = serde_json::from_str(&line).unwrap(); let irt = msg.body.in_reply_to; if let Some(tx) = proms.lock().unwrap().remove(&irt) { tx.send(msg).unwrap(); } else { json_tx.send(msg).unwrap(); } } }); let msg = json_rx.recv().unwrap(); { self.init(&msg); let body = Body::from_type("init_ok"); self.reply(&msg, body); if let Some(on_init) = on_init { on_init(self); } } let mut node = self.node.lock().unwrap(); for msg in json_rx { node.handle(self, msg); } } fn mk_msg(&self, dest: &str, body: Body) -> Message { let mut body = body; if body.msg_id == 0 { body.msg_id = self.next_msg_id(); } Message::from_dest(dest) .with_body(body) .with_src(self.node_id()) } } pub fn check_err(msg: &Message) -> RpcResult { if msg.body.typ.as_str() == "error" { let error = msg.body.code.unwrap(); return Err(error); } Ok(None) } pub fn mk_payload(payload: &[(&str, Value)]) -> Payload { payload .iter() .map(|p| (p.0.to_string(), p.1.clone())) .collect() }