From e5150af666e0531a3a1615254718b63df117abb9 Mon Sep 17 00:00:00 2001 From: Joe Ardent Date: Mon, 10 Jun 2024 12:34:24 -0700 Subject: [PATCH] simplify IO. --- src/kv.rs | 4 ++-- src/lib.rs | 55 +++++++++++++++++++++++++----------------------------- 2 files changed, 27 insertions(+), 32 deletions(-) diff --git a/src/kv.rs b/src/kv.rs index 434a397..ad67ede 100644 --- a/src/kv.rs +++ b/src/kv.rs @@ -1,4 +1,4 @@ -use serde_json::Value; +use crate::Value; use crate::{check_err, mk_payload, Body, RpcResult, Runner}; @@ -37,7 +37,7 @@ impl Kv { /// let kv = Kv::seq(); /// let result = kv.read(&runner, "MY_KEY"); /// // if "MY_KEY" had previously been written into the store, then the Result will be `Ok` - /// // and the body will be `Some(Value)`. + /// // and the body will always be `Some(Value)`. /// assert!(result.is_ok() && result.unwrap().is_some()); /// ``` pub fn read(&self, runner: &Runner, key: &str) -> RpcResult { diff --git a/src/lib.rs b/src/lib.rs index 2244ec6..1e95a88 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -58,13 +58,13 @@ impl Runner { /// ```no_run /// use nebkor_maelstrom::{Body, Message, Node, Runner}; /// struct Foo; - /// impl Node for Foo {fn handle(&mut self, _runner: &Runner, _msg: Message) { /* empty impl here*/ }} + /// impl Node for Foo {fn handle(&mut self, _runner: &Runner, _msg: Message) { /* empty impl */ }} /// /// let runner = Runner::new(Foo); /// /// let on_init = |rnr: &Runner| { /// eprintln!("received the `init` message!"); - /// let msg = Message { body: Body::from_type("ignore_me"), ..Default::default() }; + /// let msg = Message { body: Body::from_type("yo_yo_yo"), ..Default::default() }; /// // send `msg` to the node to be processed by its `handle()` method /// rnr.get_backdoor().send(msg).unwrap(); /// }; @@ -73,32 +73,19 @@ impl Runner { /// runner.run(Some(on_init)); /// ``` pub fn run(&self, on_init: Option) { - let (stdin_tx, stdin_rx) = channel(); - thread::spawn(move || { - let stdin = std::io::stdin().lock().lines(); - for line in stdin { - stdin_tx.send(line.unwrap()).unwrap(); - } - }); + let (outbound_tx, outbound_rx) = channel(); + let _ = self.outbound_tx.get_or_init(|| outbound_tx); - let (stdout_tx, stdout_rx) = channel(); + // decouple processing output from handling messages thread::spawn(move || { let mut stdout = std::io::stdout().lock(); - for msg in stdout_rx { + while let Ok(msg) = outbound_rx.recv() { + let msg = serde_json::to_string(&msg).unwrap(); writeln!(&mut stdout, "{msg}").unwrap(); } }); - let (outbound_tx, outbound_rx) = channel(); - let _ = self.outbound_tx.get_or_init(|| outbound_tx); - thread::spawn(move || { - while let Ok(msg) = outbound_rx.recv() { - let msg = serde_json::to_string(&msg).unwrap(); - stdout_tx.send(msg).unwrap(); - } - }); - - self.process_input(stdin_rx, on_init); + self.process_input(on_init); } /// Get a Sender that will send Messages to the node as input. Useful for triggering periodic @@ -175,23 +162,30 @@ impl Runner { let _ = self.nodes.get_or_init(|| nodes); } - fn process_input(&self, stdin_rx: Receiver, on_init: Option) { - let (json_tx, json_rx) = channel(); - let _ = self.backdoor.get_or_init(|| json_tx.clone()); + fn process_input(&self, on_init: Option) { + // for sending Messages to the node's inputs + let (encoded_input_tx, encoded_input_rx) = channel(); + let _ = self.backdoor.get_or_init(|| encoded_input_tx.clone()); + + // decouple stdin from processing let proms = self.promises.clone(); thread::spawn(move || { - for line in stdin_rx { + let stdin = std::io::stdin().lock(); + for line in stdin.lines().map_while(std::result::Result::ok) { let msg: Message = serde_json::from_str(&line).unwrap(); let irt = msg.body.in_reply_to; - if let Some(tx) = proms.lock().unwrap().remove(&irt) { - tx.send(msg).unwrap(); + if let Some(promise) = proms.lock().unwrap().remove(&irt) { + // this is the result of an RPC call + promise.send(msg).unwrap(); } else { - json_tx.send(msg).unwrap(); + // just let the node's `handle()` method handle it + encoded_input_tx.send(msg).unwrap(); } } }); - let msg = json_rx.recv().unwrap(); + // first Message is always `init`: + let msg = encoded_input_rx.recv().unwrap(); { self.init(&msg); let body = Body::from_type("init_ok"); @@ -201,8 +195,9 @@ impl Runner { } } + // every other message is for the node's handle() method let mut node = self.node.lock().unwrap(); - for msg in json_rx { + for msg in encoded_input_rx { node.handle(self, msg); } }