diff --git a/gg-g_counter/src/main.rs b/gg-g_counter/src/main.rs index 6ee5d04..2482829 100644 --- a/gg-g_counter/src/main.rs +++ b/gg-g_counter/src/main.rs @@ -44,8 +44,8 @@ impl Node for Counter { let v = kv.read(runner, KEY).unwrap().unwrap(); if kv.cas(runner, KEY, v.clone(), v.clone(), false).is_ok() { rounds += 1; + val = v; } - val = v; } let body = Body::from_type("read_ok").with_payload(mk_payload(&[("value", val)])); diff --git a/nebkor-maelstrom/src/lib.rs b/nebkor-maelstrom/src/lib.rs index a4dded8..45ab037 100644 --- a/nebkor-maelstrom/src/lib.rs +++ b/nebkor-maelstrom/src/lib.rs @@ -50,9 +50,6 @@ impl Runner { } pub fn run(&self, on_init: Option) { - let (outbound_tx, outbound_rx) = channel(); - let _ = self.outbound_tx.get_or_init(|| outbound_tx); - let (stdin_tx, stdin_rx) = channel(); thread::spawn(move || { let stdin = std::io::stdin().lock().lines(); @@ -69,8 +66,16 @@ impl Runner { } }); - self.run_output(stdout_tx, outbound_rx); - self.run_input(stdin_rx, on_init); + let (outbound_tx, outbound_rx) = channel(); + let _ = self.outbound_tx.get_or_init(|| outbound_tx); + thread::spawn(move || { + while let Ok(msg) = outbound_rx.recv() { + let msg = serde_json::to_string(&msg).unwrap(); + stdout_tx.send(msg).unwrap(); + } + }); + + self.process_input(stdin_rx, on_init); } pub fn get_backdoor(&self) -> Sender { @@ -144,7 +149,7 @@ impl Runner { let _ = self.nodes.get_or_init(|| nodes); } - fn run_input(&self, stdin_rx: Receiver, on_init: Option) { + fn process_input(&self, stdin_rx: Receiver, on_init: Option) { let (json_tx, json_rx) = channel(); let _ = self.backdoor.get_or_init(|| json_tx.clone()); let proms = self.promises.clone(); @@ -160,28 +165,20 @@ impl Runner { } }); - for msg in json_rx { - if msg.body.typ.as_str() == "init" { - self.init(&msg); - let body = Body::from_type("init_ok"); - self.reply(&msg, body); - if let Some(ref on_init) = on_init { - on_init(self); - } - } else { - let mut node = self.node.lock().unwrap(); - node.handle(self, msg); + let msg = json_rx.recv().unwrap(); + { + self.init(&msg); + let body = Body::from_type("init_ok"); + self.reply(&msg, body); + if let Some(on_init) = on_init { + on_init(self); } } - } - fn run_output(&self, stdout_tx: Sender, node_output_rx: Receiver) { - thread::spawn(move || { - while let Ok(msg) = node_output_rx.recv() { - let msg = serde_json::to_string(&msg).unwrap(); - stdout_tx.send(msg).unwrap(); - } - }); + let mut node = self.node.lock().unwrap(); + for msg in json_rx { + node.handle(self, msg); + } } fn mk_msg(&self, dest: &str, body: Body) -> Message {