From c45b179de45ba7e03a884d6c7cdb4c1c2625ae20 Mon Sep 17 00:00:00 2001 From: Joe Ardent Date: Wed, 5 Jun 2024 16:13:43 -0700 Subject: [PATCH] Add examples and documentation, bump version. --- Cargo.toml | 2 +- examples/broadcast.rs | 56 ++++++++++++++++++++++++++ examples/echo.rs | 21 ++++++++++ src/kv.rs | 22 ++++++++++ src/lib.rs | 46 ++++++++++++++++----- src/protocol.rs | 94 +++++++++++++++++++++++++++++++++---------- 6 files changed, 207 insertions(+), 34 deletions(-) create mode 100644 examples/broadcast.rs create mode 100644 examples/echo.rs diff --git a/Cargo.toml b/Cargo.toml index eda4254..fb41cda 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "nebkor-maelstrom" edition = "2021" -version = "0.0.1" +version = "0.0.2" license-file = "LICENSE.md" readme = "README.md" description = "An easy-to-use and synchronous client for creating Maelstrom distributed clients." diff --git a/examples/broadcast.rs b/examples/broadcast.rs new file mode 100644 index 0000000..447f11d --- /dev/null +++ b/examples/broadcast.rs @@ -0,0 +1,56 @@ +use nebkor_maelstrom::{mk_payload, Body, Message, Node, Runner}; + +use std::{collections::HashSet, thread, time::Duration}; + +fn main() { + let node = SingleNodeBroadcast::default(); + + let on_init = |runner: &Runner| { + let backdoor = runner.get_backdoor(); + let msg = Message::from_body(Body::from_type("tony_danza")); + thread::spawn(move || { + // send a `tony_danza` message to the broadcast node every 100ms + thread::sleep(Duration::from_millis(100)); + backdoor.send(msg.clone()).unwrap(); + }); + }; + let on_init = Box::new(on_init); + + let runner = Runner::new(node); + runner.run(Some(on_init)); +} + +#[derive(Clone, Default)] +struct SingleNodeBroadcast { + pub store: HashSet, +} + +impl Node for SingleNodeBroadcast { + fn handle(&mut self, runner: &Runner, req: Message) { + let typ = req.typ(); + + match typ { + "broadcast" => { + let val = req.body.payload.get("message").unwrap().as_i64().unwrap(); + self.store.insert(val); + // maybe before replying with "broadcast_ok" maybe consider telling your neighbors + // about what you heard? maybe? + runner.reply(&req, Body::from_type("broadcast_ok")); + } + "read" => { + let vals = self.store.iter().cloned().collect::>(); + let payload = mk_payload(&[("messages", vals.into())]); + let body = Body::from_type("read_ok").with_payload(payload); + runner.reply(&req, body); + } + "topology" => { + let body = Body::from_type("topology_ok"); + runner.reply(&req, body); + } + "tony_danza" => { + eprintln!("Eh, oh, oh, eh!"); + } + _ => {} + } + } +} diff --git a/examples/echo.rs b/examples/echo.rs new file mode 100644 index 0000000..681d898 --- /dev/null +++ b/examples/echo.rs @@ -0,0 +1,21 @@ +use nebkor_maelstrom::{Body, Message, Node, Runner}; + +struct Echo; + +impl Node for Echo { + fn handle(&mut self, runner: &Runner, msg: Message) { + let typ = &msg.body.typ; + if typ.as_str() == "echo" { + let body = Body::from_type("echo_ok").with_payload(msg.body.payload.clone()); + runner.reply(&msg, body); + } + } +} + +fn main() { + let node = Echo; + + let runner = Runner::new(node); + + runner.run(None); +} diff --git a/src/kv.rs b/src/kv.rs index dd841e6..571677b 100644 --- a/src/kv.rs +++ b/src/kv.rs @@ -2,24 +2,44 @@ use serde_json::Value; use crate::{check_err, mk_payload, Body, RpcResult, Runner}; +/// A convenient way to do RPC with Maelstrom's KV services. #[derive(Debug, Default, Clone)] pub struct Kv { pub service: &'static str, } impl Kv { + /// Construct a proxy to the `seq-kv` service. pub fn seq() -> Self { Kv { service: "seq-kv" } } + /// Construct a proxy to the `lin-kv` service. pub fn lin() -> Self { Kv { service: "lin-kv" } } + /// Construct a proxy to the `lww-kv` service. pub fn lww() -> Self { Kv { service: "lww-kv" } } + /// Returns the `Value` from a remote call to the KV service for key `key`. + /// + /// # Examples + /// + /// ```no_run + /// # use nebkor_maelstrom::{kv::Kv, Node, Runner, Message}; + /// # struct Foo; + /// # impl Node for Foo {fn handle(&mut self, runner: &Runner, msg: Message) {}} + /// # let runner = Runner::new(Foo); + /// // get a proxy to the `seq-kv` service + /// let kv = Kv::seq(); + /// let result = kv.read(&runner, "MY_KEY"); + /// // if "MY_KEY" had previously been written into the store, then the Result will be `Ok` + /// // and the body will be `Some(Value)`. + /// assert!(result.is_ok() && result.unwrap().is_some()); + /// ``` pub fn read(&self, runner: &Runner, key: &str) -> RpcResult { let payload = mk_payload(&[("key", key.into())]); let body = Body::from_type("read").with_payload(payload); @@ -29,6 +49,7 @@ impl Kv { Ok(Some(msg.body.payload.get("value").unwrap().to_owned())) } + /// The success value is `None`. pub fn write(&self, runner: &Runner, key: &str, val: Value) -> RpcResult { let payload = mk_payload(&[("key", key.into()), ("value", val)]); let body = Body::from_type("write").with_payload(payload); @@ -37,6 +58,7 @@ impl Kv { Ok(None) } + /// The success value is `None`. pub fn cas( &self, runner: &Runner, diff --git a/src/lib.rs b/src/lib.rs index 45ab037..df73a4c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -49,6 +49,29 @@ impl Runner { } } + /// Start processing messages from stdin and sending them to your node. The `on_init` argument + /// is an optional callback that will be called with `&self` after the `init` message from + /// Maelstrom has been processed. + /// + /// # Examples + /// + /// ```no_run + /// use nebkor_maelstrom::{Body, Message, Node, Runner}; + /// struct Foo; + /// impl Node for Foo {fn handle(&mut self, _runner: &Runner, _msg: Message) { /* empty impl here*/ }} + /// + /// let runner = Runner::new(Foo); + /// + /// let on_init = |rnr: &Runner| { + /// eprintln!("received the `init` message!"); + /// let msg = Message { body: Body::from_type("ignore_me"), ..Default::default() }; + /// // send `msg` to the node to be processed by its `handle()` method + /// rnr.get_backdoor().send(msg).unwrap(); + /// }; + /// let on_init = Box::new(on_init); + /// + /// runner.run(Some(on_init)); + /// ``` pub fn run(&self, on_init: Option) { let (stdin_tx, stdin_rx) = channel(); thread::spawn(move || { @@ -78,26 +101,27 @@ impl Runner { self.process_input(stdin_rx, on_init); } + /// Get a Sender that will send Messages to the node as input. Useful for triggering periodic + /// behavior from a separate thread, or for sending a Message to the node from `on_init`. See + /// the `broadcast` example for a use of it. pub fn get_backdoor(&self) -> Sender { self.backdoor.get().unwrap().clone() } - pub fn node_id(&self) -> String { - self.node_id.get().cloned().unwrap_or_default() + pub fn node_id(&self) -> &str { + self.node_id.get().unwrap() } pub fn next_msg_id(&self) -> u64 { self.msg_id.fetch_add(1, Ordering::SeqCst) } - pub fn cur_msg_id(&self) -> u64 { - self.msg_id.load(Ordering::SeqCst) - } - + /// A list of all nodes in the network, including this one. pub fn nodes(&self) -> &[String] { self.nodes.get().unwrap() } + /// Construct a new `Message` from `body` and send it to `req.src`. pub fn reply(&self, req: &Message, body: Body) { let mut body = body; let dest = req.src.as_str(); @@ -106,11 +130,13 @@ impl Runner { self.send(dest, body); } + /// Construct a new `Message` from `body` and send it to `dest`. pub fn send(&self, dest: &str, body: Body) { let msg = self.mk_msg(dest, body); self.outbound_tx.get().unwrap().send(msg).unwrap(); } + /// Returns a Receiver that will receive the reply from the request. pub fn rpc(&self, dest: &str, body: Body) -> RpcPromise { let msg = self.mk_msg(dest, body); let (tx, rx) = channel(); @@ -186,11 +212,9 @@ impl Runner { if body.msg_id == 0 { body.msg_id = self.next_msg_id(); } - Message { - src: self.node_id().to_string(), - dest: dest.to_string(), - body, - } + Message::from_dest(dest) + .with_body(body) + .with_src(self.node_id()) } } diff --git a/src/protocol.rs b/src/protocol.rs index 6f35178..97bbc8d 100644 --- a/src/protocol.rs +++ b/src/protocol.rs @@ -11,6 +11,55 @@ pub struct Message { pub body: Body, } +impl Message { + /// `src` and `dest` will be empty. + pub fn from_body(body: Body) -> Self { + Message { + body, + ..Default::default() + } + } + + pub fn with_body(self, body: Body) -> Self { + let mut m = self; + m.body = body; + m + } + + pub fn from_dest(dest: &str) -> Self { + Message { + dest: dest.to_string(), + ..Default::default() + } + } + + pub fn with_dest(self, dest: &str) -> Self { + let mut m = self; + m.dest = dest.to_string(); + m + } + + pub fn from_src(src: &str) -> Self { + Message { + src: src.to_string(), + ..Default::default() + } + } + + pub fn with_src(self, src: &str) -> Self { + let mut m = self; + m.src = src.to_string(); + m + } + + /// The Maelstrom type of a Body (and hence a Message) is just a string. This is for the sake of + /// ease of use for doing the Gossip Glomers challenges; this crate is not meant to be a real + /// network client framework. + pub fn typ(&self) -> &str { + self.body.typ.as_str() + } +} + #[derive(Debug, Default, Clone, Serialize, Deserialize)] pub struct Body { #[serde(rename = "type")] @@ -53,23 +102,29 @@ impl Body { b.payload = payload; b } + + pub fn error(code: ErrorCode, in_reply_to: u64, text: Option<&str>) -> Self { + Body { + in_reply_to, + typ: "error".to_string(), + code: Some(code), + text: text.map(|t| t.to_string()), + ..Default::default() + } + } } -pub fn init_ok(msg_id: u64, in_reply_to: u64) -> Body { - Body::from_type("init_ok") - .with_msg_id(msg_id) - .with_in_reply_to(in_reply_to) -} - -#[derive(Debug, Clone, Copy, Serialize, Deserialize)] +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Hash)] #[serde(untagged)] pub enum ErrorCode { Definite(DefiniteError), Indefinite(IndefiniteError), } -#[derive(Debug, Clone, Copy, Serialize_repr, Deserialize_repr)] -#[repr(u64)] +#[derive( + Debug, Clone, Copy, Serialize_repr, Deserialize_repr, PartialEq, Eq, PartialOrd, Ord, Hash, +)] +#[repr(u16)] pub enum DefiniteError { NodeNotFound = 2, NotSupported = 10, @@ -82,24 +137,15 @@ pub enum DefiniteError { TxnConflict = 30, } -#[derive(Debug, Clone, Copy, Serialize_repr, Deserialize_repr)] -#[repr(u64)] +#[derive( + Debug, Clone, Copy, Serialize_repr, Deserialize_repr, PartialEq, Eq, PartialOrd, Ord, Hash, +)] +#[repr(u16)] pub enum IndefiniteError { Timeout = 0, Crash = 13, } -pub fn error(msg_id: u64, in_reply_to: u64, code: ErrorCode, text: Option<&str>) -> Body { - Body { - typ: "error".to_string(), - msg_id, - in_reply_to, - code: Some(code), - text: text.map(|t| t.to_string()), - payload: Default::default(), - } -} - #[allow(clippy::trivially_copy_pass_by_ref)] fn u64_zero_by_ref(num: &u64) -> bool { *num == 0 @@ -114,5 +160,9 @@ mod test { let ec = ErrorCode::Definite(DefiniteError::Abort); let e = serde_json::to_string(&ec).unwrap(); assert_eq!(&e, "14"); + let s: ErrorCode = serde_json::from_str(&e).unwrap(); + assert_eq!(s, ec); + let n: ErrorCode = serde_json::from_value(14u16.into()).unwrap(); + assert_eq!(n, ec); } }