From 851e9a6711c6cc54625f96caadbc69266803115b Mon Sep 17 00:00:00 2001 From: Joe Ardent Date: Mon, 20 May 2024 12:42:11 -0700 Subject: [PATCH] Move all lib-y code into nebkor-maelstrom, have gg-echo use it fully. --- Cargo.lock | 20 +-- Cargo.toml | 2 +- README.md | 4 + gg-echo/Cargo.toml | 2 +- gg-echo/src/main.rs | 141 ++---------------- .../Cargo.toml | 5 +- nebkor-maelstrom/src/lib.rs | 117 +++++++++++++++ .../src/protocol.rs | 0 8 files changed, 152 insertions(+), 139 deletions(-) rename {maelstrom-protocol => nebkor-maelstrom}/Cargo.toml (64%) create mode 100644 nebkor-maelstrom/src/lib.rs rename maelstrom-protocol/src/lib.rs => nebkor-maelstrom/src/protocol.rs (100%) diff --git a/Cargo.lock b/Cargo.lock index 6255c6b..65ebd9c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -198,7 +198,7 @@ dependencies = [ name = "gg-echo" version = "0.0.1" dependencies = [ - "maelstrom-protocol", + "nebkor-maelstrom", "serde_json", ] @@ -287,15 +287,6 @@ dependencies = [ "tokio-util", ] -[[package]] -name = "maelstrom-protocol" -version = "0.0.1" -dependencies = [ - "serde", - "serde_json", - "serde_repr", -] - [[package]] name = "memchr" version = "2.7.2" @@ -322,6 +313,15 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "nebkor-maelstrom" +version = "0.0.1" +dependencies = [ + "serde", + "serde_json", + "serde_repr", +] + [[package]] name = "num_cpus" version = "1.16.0" diff --git a/Cargo.toml b/Cargo.toml index 68a8a92..740404f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,5 @@ [workspace] -members = ["maelstrom-protocol", "gg-echo", "gg-uid", "gg-broadcast"] +members = ["gg-echo", "gg-uid", "gg-broadcast", "nebkor-maelstrom"] resolver = "2" [workspace.package] diff --git a/README.md b/README.md index 0ba4603..212bdb0 100644 --- a/README.md +++ b/README.md @@ -1 +1,5 @@ Working through the [Fly.io distributed systems challenge](https://fly.io/dist-sys/), in Rust. + +The `nebkor-maelstrom` crate is meant to be roughly equivalent to the Go Maelstrom.Node package, and +provides serde-backed data structures for messages as well as runner for processing messages from +the Maelstrom network. See `gg-echo/src/main.rs` for the simplest possible use of it. diff --git a/gg-echo/Cargo.toml b/gg-echo/Cargo.toml index 26c9be9..6496998 100644 --- a/gg-echo/Cargo.toml +++ b/gg-echo/Cargo.toml @@ -6,4 +6,4 @@ authors.workspace = true [dependencies] serde_json.workspace = true -maelstrom-protocol = { path = "../maelstrom-protocol" } +nebkor-maelstrom = { path = "../nebkor-maelstrom" } diff --git a/gg-echo/src/main.rs b/gg-echo/src/main.rs index de56082..36b6144 100644 --- a/gg-echo/src/main.rs +++ b/gg-echo/src/main.rs @@ -1,17 +1,11 @@ -use maelstrom_protocol as proto; -use proto::{Body, Message}; -use std::{ - cell::OnceCell, - io::{BufRead, StdinLock, StdoutLock, Write}, - sync::Mutex, -}; +use std::{rc::Rc, sync::Mutex}; -fn main() { - let out = std::io::stdout().lock(); - let input = std::io::stdin().lock(); +use nebkor_maelstrom::{Body, Message, Node, Runner}; - let runner = &Runner::new(out); - let handler = |msg: &Message| { +struct Echo; + +impl Node for Echo { + fn handle(&mut self, runner: &Runner, msg: &Message) { let typ = &msg.body.typ; match typ.as_str() { "echo" => { @@ -20,120 +14,17 @@ fn main() { } _ => {} } - }; - runner.run(input, &handler); -} - -struct Runner<'io> { - msg_id: Mutex, - node_id: OnceCell, - nodes: OnceCell>, - output: Mutex>, -} - -impl<'io> Runner<'io> { - pub fn new(output: StdoutLock<'io>) -> Self { - Runner { - output: Mutex::new(output), - msg_id: Mutex::new(1), - nodes: OnceCell::new(), - node_id: OnceCell::new(), - } - } - - pub fn run(&self, input: StdinLock, handler: &dyn Fn(&Message)) { - for line in input.lines() { - match line { - Ok(line) => { - if let Ok(msg) = serde_json::from_str::(&line) { - let typ = &msg.body.typ; - match typ.as_str() { - "init" => { - self.init(&msg); - - let body = Body::from_type("init_ok"); - self.reply(&msg, body); - } - _ => { - handler(&msg); - } - } - } - } - _ => {} - } - } - } - - pub fn node_id(&self) -> String { - self.node_id.get().cloned().unwrap_or("".to_string()) - } - - pub fn msg_id(&self) -> u64 { - *self.msg_id.lock().unwrap() - } - - pub fn init(&self, msg: &Message) { - let node_id = msg - .body - .payload - .get("node_id") - .unwrap() - .as_str() - .unwrap() - .to_owned(); - let nodes = msg - .body - .payload - .get("node_ids") - .unwrap() - .as_array() - .unwrap() - .iter() - .map(|s| s.as_str().unwrap().to_string()) - .collect::>(); - - let _ = self.node_id.get_or_init(|| node_id.to_owned()); - let _ = self.nodes.get_or_init(|| nodes.to_vec()); - } - - pub fn reply(&self, req: &Message, body: Body) { - let mut body = body; - let src = self.node_id.get().unwrap().to_owned(); - let dest = req.src.clone(); - let in_reply_to = req.body.msg_id; - body.in_reply_to = in_reply_to; - let msg = Message { src, dest, body }; - self.send(msg); - } - - pub fn send(&self, msg: Message) { - let mut msg = msg; - if msg.body.msg_id == 0 { - let mid = { - let mut g = self.msg_id.lock().unwrap(); - let m = *g; - *g += 1; - m - }; - msg.body.msg_id = mid; - } - let msg = serde_json::to_string(&msg).unwrap(); - let msg = format!("{msg}\n"); - self.writeln(&msg); - } - - fn writeln(&self, msg: &str) { - let mut out = self.output.lock().unwrap(); - out.write_all(msg.as_bytes()).unwrap(); - out.flush().unwrap(); } } -fn message(dest: &str, src: &str, body: Body) -> Message { - Message { - dest: dest.to_owned(), - src: src.to_owned(), - body, - } +fn main() { + let out = std::io::stdout().lock(); + let input = std::io::stdin().lock(); + + let node = Echo; + let node = Rc::new(Mutex::new(node)); + + let runner = Runner::new(out, node); + + runner.run(input); } diff --git a/maelstrom-protocol/Cargo.toml b/nebkor-maelstrom/Cargo.toml similarity index 64% rename from maelstrom-protocol/Cargo.toml rename to nebkor-maelstrom/Cargo.toml index 42ca124..456c9dc 100644 --- a/maelstrom-protocol/Cargo.toml +++ b/nebkor-maelstrom/Cargo.toml @@ -1,10 +1,11 @@ [package] -name = "maelstrom-protocol" +name = "nebkor-maelstrom" edition = "2021" version.workspace = true authors.workspace = true +license-file.workspace = true [dependencies] +serde_json.workspace = true serde = { version = "1", default-features = false, features = ["derive"] } -serde_json = { version = "1", default-features = false, features = ["std"] } serde_repr = "0.1" diff --git a/nebkor-maelstrom/src/lib.rs b/nebkor-maelstrom/src/lib.rs new file mode 100644 index 0000000..2bb82bc --- /dev/null +++ b/nebkor-maelstrom/src/lib.rs @@ -0,0 +1,117 @@ +use std::{ + cell::OnceCell, + io::{BufRead, StdinLock, StdoutLock, Write}, + rc::Rc, + sync::{ + atomic::{AtomicU64, Ordering}, + Mutex, + }, +}; + +pub mod protocol; +pub use protocol::{Body, Message}; + +pub type DynNode = Rc>; + +pub trait Node { + fn handle(&mut self, runner: &Runner, msg: &Message); +} + +pub struct Runner<'io> { + msg_id: AtomicU64, + node: DynNode, + node_id: OnceCell, + nodes: OnceCell>, + output: Mutex>, + steps: AtomicU64, +} + +impl<'io> Runner<'io> { + pub fn new(output: StdoutLock<'io>, node: DynNode) -> Self { + Runner { + output: Mutex::new(output), + node, + msg_id: AtomicU64::new(1), + nodes: OnceCell::new(), + node_id: OnceCell::new(), + steps: AtomicU64::new(0), + } + } + + pub fn run(&self, input: StdinLock) { + for line in input.lines().map_while(Result::ok) { + if let Ok(msg) = serde_json::from_str::(&line) { + let typ = &msg.body.typ; + if let "init" = typ.as_str() { + self.init(&msg); + let body = Body::from_type("init_ok"); + self.reply(&msg, body); + } else { + let mut n = self.node.lock().unwrap(); + n.handle(self, &msg); + } + } + self.steps.fetch_add(1, Ordering::SeqCst); + } + } + + pub fn node_id(&self) -> String { + self.node_id.get().cloned().unwrap_or_default() + } + + pub fn next_msg_id(&self) -> u64 { + self.msg_id.fetch_add(1, Ordering::SeqCst) + } + + pub fn init(&self, msg: &Message) { + let node_id = msg + .body + .payload + .get("node_id") + .unwrap() + .as_str() + .unwrap() + .to_owned(); + + let nodes = msg + .body + .payload + .get("node_ids") + .unwrap() + .as_array() + .unwrap() + .iter() + .map(|s| s.as_str().unwrap().to_string()) + .collect::>(); + + let _ = self.node_id.get_or_init(|| node_id.to_owned()); + let _ = self.nodes.get_or_init(|| nodes.to_vec()); + } + + pub fn reply(&self, req: &Message, body: Body) { + let mut body = body; + let src = self.node_id.get().unwrap().to_owned(); + let dest = req.src.clone(); + let in_reply_to = req.body.msg_id; + body.in_reply_to = in_reply_to; + let msg = Message { src, dest, body }; + self.send(msg); + } + + pub fn send(&self, msg: Message) { + let mut msg = msg; + if msg.body.msg_id == 0 { + let mid = self.next_msg_id(); + msg.body.msg_id = mid; + } + let msg = serde_json::to_string(&msg).unwrap(); + self.writeln(&msg); + } + + fn writeln(&self, msg: &str) { + let mut out = self.output.lock().unwrap(); + let msg = format!("{msg}\n"); + out.write_all(msg.as_bytes()).unwrap(); + out.flush().unwrap(); + } +} diff --git a/maelstrom-protocol/src/lib.rs b/nebkor-maelstrom/src/protocol.rs similarity index 100% rename from maelstrom-protocol/src/lib.rs rename to nebkor-maelstrom/src/protocol.rs