From e92af7cf925055540d12f034674af45772936522 Mon Sep 17 00:00:00 2001 From: Joe Ardent Date: Tue, 4 Jun 2024 10:21:36 -0700 Subject: [PATCH] remove Network struct --- gg-broadcast/src/main.rs | 2 - gg-echo/src/main.rs | 4 -- gg-g_counter/src/main.rs | 3 -- gg-uid/src/main.rs | 3 -- nebkor-maelstrom/README.md | 10 ++--- nebkor-maelstrom/src/lib.rs | 81 ++++++++++++------------------------- 6 files changed, 28 insertions(+), 75 deletions(-) diff --git a/gg-broadcast/src/main.rs b/gg-broadcast/src/main.rs index 1842a0d..f76f493 100644 --- a/gg-broadcast/src/main.rs +++ b/gg-broadcast/src/main.rs @@ -1,6 +1,5 @@ use std::{ collections::{HashMap, HashSet}, - sync::{Arc, Mutex}, thread, time::Duration, }; @@ -10,7 +9,6 @@ use rand::Rng; fn main() { let node = BCaster::default(); - let node = Arc::new(Mutex::new(node)); let runner = Runner::new(node); diff --git a/gg-echo/src/main.rs b/gg-echo/src/main.rs index f781cfe..681d898 100644 --- a/gg-echo/src/main.rs +++ b/gg-echo/src/main.rs @@ -1,5 +1,3 @@ -use std::sync::{Arc, Mutex}; - use nebkor_maelstrom::{Body, Message, Node, Runner}; struct Echo; @@ -17,8 +15,6 @@ impl Node for Echo { fn main() { let node = Echo; - let node = Arc::new(Mutex::new(node)); - let runner = Runner::new(node); runner.run(None); diff --git a/gg-g_counter/src/main.rs b/gg-g_counter/src/main.rs index ef3cc66..4e5b32c 100644 --- a/gg-g_counter/src/main.rs +++ b/gg-g_counter/src/main.rs @@ -1,5 +1,3 @@ -use std::sync::{Arc, Mutex}; - use nebkor_maelstrom::{mk_payload, Body, Message, Node, Runner}; const KEY: &str = "COUNTER"; @@ -9,7 +7,6 @@ struct Counter; fn main() { let node = Counter; - let node = Arc::new(Mutex::new(node)); let runner = Runner::new(node); diff --git a/gg-uid/src/main.rs b/gg-uid/src/main.rs index fa795a4..4d3f712 100644 --- a/gg-uid/src/main.rs +++ b/gg-uid/src/main.rs @@ -1,10 +1,7 @@ -use std::sync::{Arc, Mutex}; - use nebkor_maelstrom::{protocol::Payload, Body, Message, Node, Runner}; fn main() { let node = GenUid; - let node = Arc::new(Mutex::new(node)); let runner = Runner::new(node); diff --git a/nebkor-maelstrom/README.md b/nebkor-maelstrom/README.md index 7de1e10..dc03b8d 100644 --- a/nebkor-maelstrom/README.md +++ b/nebkor-maelstrom/README.md @@ -11,8 +11,6 @@ distributed actors. It has three dependencies: For a simple example, see the [gg-echo](https://git.kittencollective.com/nebkor/chatty-catties/src/branch/main/gg-echo/src/main.rs) program: ``` rust -use std::sync::{Arc, Mutex}; - use nebkor_maelstrom::{Body, Message, Node, Runner}; struct Echo; @@ -30,8 +28,6 @@ impl Node for Echo { fn main() { let node = Echo; - let node = Arc::new(Mutex::new(node)); - let runner = Runner::new(node); runner.run(None); @@ -44,9 +40,9 @@ Create a struct and implement `nebkor_maelstrom::Node` for it, which involves a `handle(&mut self, &Runner, Message)`. This method is passed a `Runner` which contains methods like `send`, `reply`, and `rpc`. -In your main function, instantiate that struct and wrap it in an `Arc>`, then pass that into -`Runner::new()` to get a Runner. The `run()` method takes an optional closure that will be run when -the `init` Message is received; see the +In your main function, instantiate that struct and pass that into `Runner::new()` to get a +Runner. The `run()` method takes an optional closure that will be run when the `init` Message is +received; see the [broadcast](https://git.kittencollective.com/nebkor/chatty-catties/src/commit/fff7fdc2d52f7d4c2d4d9c581ea16cdf0e1e3f30/gg-broadcast/src/main.rs#L18-L33) program for an example of that, where it spawns a thread to send periodic messages to the node. diff --git a/nebkor-maelstrom/src/lib.rs b/nebkor-maelstrom/src/lib.rs index fe12e56..62dccde 100644 --- a/nebkor-maelstrom/src/lib.rs +++ b/nebkor-maelstrom/src/lib.rs @@ -25,61 +25,33 @@ pub trait Node { fn handle(&mut self, runner: &Runner, msg: Message); } -#[derive(Debug, Clone)] -pub struct Network { - promises: Arc>>>, - node_output_tx: Sender, -} - -impl Network { - pub fn new() -> (Self, Receiver) { - let (node_output_tx, node_output_rx) = channel(); - let net = Self { - node_output_tx, - promises: Arc::new(Mutex::new(HashMap::default())), - }; - (net, node_output_rx) - } - - pub fn send(&self, msg: Message) { - self.node_output_tx.send(msg).unwrap(); - } - - pub fn rpc(&self, msg: Message) -> RpcPromise { - let (tx, rx) = channel(); - { - let msg_id = msg.body.msg_id; - let mut g = self.promises.lock().unwrap(); - g.insert(msg_id, tx); - } - self.send(msg); - rx - } -} - pub struct Runner { node: NodeyNodeFace, node_id: OnceLock, nodes: OnceLock>, - network: OnceLock, backdoor: OnceLock>, + promises: Arc>>>, + outbound_tx: OnceLock>, } impl Runner { - pub fn new(node: NodeyNodeFace) -> Self { + pub fn new(node: N) -> Self { + let node = Arc::new(Mutex::new(node)); Runner { node, nodes: OnceLock::new(), node_id: OnceLock::new(), - network: OnceLock::new(), backdoor: OnceLock::new(), + outbound_tx: OnceLock::new(), + promises: Default::default(), } } pub fn run(&self, on_init: Option) { - let (stdin_tx, stdin_rx) = channel(); - let (stdout_tx, stdout_rx) = channel(); + let (outbound_tx, outbound_rx) = channel(); + let _ = self.outbound_tx.get_or_init(|| outbound_tx); + let (stdin_tx, stdin_rx) = channel(); thread::spawn(move || { let stdin = std::io::stdin().lock().lines(); for line in stdin { @@ -87,6 +59,7 @@ impl Runner { } }); + let (stdout_tx, stdout_rx) = channel(); thread::spawn(move || { let mut stdout = std::io::stdout().lock(); for msg in stdout_rx { @@ -94,30 +67,19 @@ impl Runner { } }); - self.run_internal(stdout_tx, stdin_rx, on_init); + self.run_output(stdout_tx, outbound_rx); + self.run_input(stdin_rx, on_init); } - fn run_internal( - &self, - stdout_tx: Sender, - stdin_rx: Receiver, - on_init: Option, - ) { - let (network, node_receiver) = Network::new(); - let _ = self.network.get_or_init(|| network.clone()); - self.run_output(stdout_tx, node_receiver); - self.run_input(stdin_rx, network, on_init); - } - - fn run_input(&self, stdin_rx: Receiver, network: Network, on_init: Option) { + fn run_input(&self, stdin_rx: Receiver, on_init: Option) { let (json_tx, json_rx) = channel(); let _ = self.backdoor.get_or_init(|| json_tx.clone()); - + let proms = self.promises.clone(); thread::spawn(move || { for line in stdin_rx { let msg: Message = serde_json::from_str(&line).unwrap(); let irt = msg.body.in_reply_to; - if let Some(tx) = network.promises.lock().unwrap().remove(&irt) { + if let Some(tx) = proms.lock().unwrap().remove(&irt) { tx.send(msg).unwrap(); } else { json_tx.send(msg).unwrap(); @@ -126,7 +88,6 @@ impl Runner { }); for msg in json_rx { - let mut node = self.node.lock().unwrap(); if msg.body.typ.as_str() == "init" { self.init(&msg); let body = Body::from_type("init_ok"); @@ -135,6 +96,7 @@ impl Runner { on_init(self); } } else { + let mut node = self.node.lock().unwrap(); node.handle(self, msg); } } @@ -212,7 +174,7 @@ impl Runner { dest: dest.to_string(), body, }; - self.network.get().unwrap().send(msg); + self.outbound_tx.get().unwrap().send(msg).unwrap(); } pub fn rpc(&self, dest: &str, body: Body) -> RpcPromise { @@ -225,7 +187,14 @@ impl Runner { dest: dest.to_string(), body, }; - self.network.get().unwrap().rpc(msg) + let (tx, rx) = channel(); + { + let msg_id = msg.body.msg_id; + let mut g = self.promises.lock().unwrap(); + g.insert(msg_id, tx); + } + self.outbound_tx.get().unwrap().send(msg).unwrap(); + rx } }