Add examples and documentation, bump version.

This commit is contained in:
Joe Ardent 2024-06-05 16:13:43 -07:00
parent c6c05b41b4
commit c45b179de4
6 changed files with 207 additions and 34 deletions

View File

@ -1,7 +1,7 @@
[package] [package]
name = "nebkor-maelstrom" name = "nebkor-maelstrom"
edition = "2021" edition = "2021"
version = "0.0.1" version = "0.0.2"
license-file = "LICENSE.md" license-file = "LICENSE.md"
readme = "README.md" readme = "README.md"
description = "An easy-to-use and synchronous client for creating Maelstrom distributed clients." description = "An easy-to-use and synchronous client for creating Maelstrom distributed clients."

56
examples/broadcast.rs Normal file
View File

@ -0,0 +1,56 @@
use nebkor_maelstrom::{mk_payload, Body, Message, Node, Runner};
use std::{collections::HashSet, thread, time::Duration};
fn main() {
let node = SingleNodeBroadcast::default();
let on_init = |runner: &Runner| {
let backdoor = runner.get_backdoor();
let msg = Message::from_body(Body::from_type("tony_danza"));
thread::spawn(move || {
// send a `tony_danza` message to the broadcast node every 100ms
thread::sleep(Duration::from_millis(100));
backdoor.send(msg.clone()).unwrap();
});
};
let on_init = Box::new(on_init);
let runner = Runner::new(node);
runner.run(Some(on_init));
}
#[derive(Clone, Default)]
struct SingleNodeBroadcast {
pub store: HashSet<i64>,
}
impl Node for SingleNodeBroadcast {
fn handle(&mut self, runner: &Runner, req: Message) {
let typ = req.typ();
match typ {
"broadcast" => {
let val = req.body.payload.get("message").unwrap().as_i64().unwrap();
self.store.insert(val);
// maybe before replying with "broadcast_ok" maybe consider telling your neighbors
// about what you heard? maybe?
runner.reply(&req, Body::from_type("broadcast_ok"));
}
"read" => {
let vals = self.store.iter().cloned().collect::<Vec<_>>();
let payload = mk_payload(&[("messages", vals.into())]);
let body = Body::from_type("read_ok").with_payload(payload);
runner.reply(&req, body);
}
"topology" => {
let body = Body::from_type("topology_ok");
runner.reply(&req, body);
}
"tony_danza" => {
eprintln!("Eh, oh, oh, eh!");
}
_ => {}
}
}
}

21
examples/echo.rs Normal file
View File

@ -0,0 +1,21 @@
use nebkor_maelstrom::{Body, Message, Node, Runner};
struct Echo;
impl Node for Echo {
fn handle(&mut self, runner: &Runner, msg: Message) {
let typ = &msg.body.typ;
if typ.as_str() == "echo" {
let body = Body::from_type("echo_ok").with_payload(msg.body.payload.clone());
runner.reply(&msg, body);
}
}
}
fn main() {
let node = Echo;
let runner = Runner::new(node);
runner.run(None);
}

View File

@ -2,24 +2,44 @@ use serde_json::Value;
use crate::{check_err, mk_payload, Body, RpcResult, Runner}; use crate::{check_err, mk_payload, Body, RpcResult, Runner};
/// A convenient way to do RPC with Maelstrom's KV services.
#[derive(Debug, Default, Clone)] #[derive(Debug, Default, Clone)]
pub struct Kv { pub struct Kv {
pub service: &'static str, pub service: &'static str,
} }
impl Kv { impl Kv {
/// Construct a proxy to the `seq-kv` service.
pub fn seq() -> Self { pub fn seq() -> Self {
Kv { service: "seq-kv" } Kv { service: "seq-kv" }
} }
/// Construct a proxy to the `lin-kv` service.
pub fn lin() -> Self { pub fn lin() -> Self {
Kv { service: "lin-kv" } Kv { service: "lin-kv" }
} }
/// Construct a proxy to the `lww-kv` service.
pub fn lww() -> Self { pub fn lww() -> Self {
Kv { service: "lww-kv" } Kv { service: "lww-kv" }
} }
/// Returns the `Value` from a remote call to the KV service for key `key`.
///
/// # Examples
///
/// ```no_run
/// # use nebkor_maelstrom::{kv::Kv, Node, Runner, Message};
/// # struct Foo;
/// # impl Node for Foo {fn handle(&mut self, runner: &Runner, msg: Message) {}}
/// # let runner = Runner::new(Foo);
/// // get a proxy to the `seq-kv` service
/// let kv = Kv::seq();
/// let result = kv.read(&runner, "MY_KEY");
/// // if "MY_KEY" had previously been written into the store, then the Result will be `Ok`
/// // and the body will be `Some(Value)`.
/// assert!(result.is_ok() && result.unwrap().is_some());
/// ```
pub fn read(&self, runner: &Runner, key: &str) -> RpcResult { pub fn read(&self, runner: &Runner, key: &str) -> RpcResult {
let payload = mk_payload(&[("key", key.into())]); let payload = mk_payload(&[("key", key.into())]);
let body = Body::from_type("read").with_payload(payload); let body = Body::from_type("read").with_payload(payload);
@ -29,6 +49,7 @@ impl Kv {
Ok(Some(msg.body.payload.get("value").unwrap().to_owned())) Ok(Some(msg.body.payload.get("value").unwrap().to_owned()))
} }
/// The success value is `None`.
pub fn write(&self, runner: &Runner, key: &str, val: Value) -> RpcResult { pub fn write(&self, runner: &Runner, key: &str, val: Value) -> RpcResult {
let payload = mk_payload(&[("key", key.into()), ("value", val)]); let payload = mk_payload(&[("key", key.into()), ("value", val)]);
let body = Body::from_type("write").with_payload(payload); let body = Body::from_type("write").with_payload(payload);
@ -37,6 +58,7 @@ impl Kv {
Ok(None) Ok(None)
} }
/// The success value is `None`.
pub fn cas( pub fn cas(
&self, &self,
runner: &Runner, runner: &Runner,

View File

@ -49,6 +49,29 @@ impl Runner {
} }
} }
/// Start processing messages from stdin and sending them to your node. The `on_init` argument
/// is an optional callback that will be called with `&self` after the `init` message from
/// Maelstrom has been processed.
///
/// # Examples
///
/// ```no_run
/// use nebkor_maelstrom::{Body, Message, Node, Runner};
/// struct Foo;
/// impl Node for Foo {fn handle(&mut self, _runner: &Runner, _msg: Message) { /* empty impl here*/ }}
///
/// let runner = Runner::new(Foo);
///
/// let on_init = |rnr: &Runner| {
/// eprintln!("received the `init` message!");
/// let msg = Message { body: Body::from_type("ignore_me"), ..Default::default() };
/// // send `msg` to the node to be processed by its `handle()` method
/// rnr.get_backdoor().send(msg).unwrap();
/// };
/// let on_init = Box::new(on_init);
///
/// runner.run(Some(on_init));
/// ```
pub fn run(&self, on_init: Option<OnInit>) { pub fn run(&self, on_init: Option<OnInit>) {
let (stdin_tx, stdin_rx) = channel(); let (stdin_tx, stdin_rx) = channel();
thread::spawn(move || { thread::spawn(move || {
@ -78,26 +101,27 @@ impl Runner {
self.process_input(stdin_rx, on_init); self.process_input(stdin_rx, on_init);
} }
/// Get a Sender that will send Messages to the node as input. Useful for triggering periodic
/// behavior from a separate thread, or for sending a Message to the node from `on_init`. See
/// the `broadcast` example for a use of it.
pub fn get_backdoor(&self) -> Sender<Message> { pub fn get_backdoor(&self) -> Sender<Message> {
self.backdoor.get().unwrap().clone() self.backdoor.get().unwrap().clone()
} }
pub fn node_id(&self) -> String { pub fn node_id(&self) -> &str {
self.node_id.get().cloned().unwrap_or_default() self.node_id.get().unwrap()
} }
pub fn next_msg_id(&self) -> u64 { pub fn next_msg_id(&self) -> u64 {
self.msg_id.fetch_add(1, Ordering::SeqCst) self.msg_id.fetch_add(1, Ordering::SeqCst)
} }
pub fn cur_msg_id(&self) -> u64 { /// A list of all nodes in the network, including this one.
self.msg_id.load(Ordering::SeqCst)
}
pub fn nodes(&self) -> &[String] { pub fn nodes(&self) -> &[String] {
self.nodes.get().unwrap() self.nodes.get().unwrap()
} }
/// Construct a new `Message` from `body` and send it to `req.src`.
pub fn reply(&self, req: &Message, body: Body) { pub fn reply(&self, req: &Message, body: Body) {
let mut body = body; let mut body = body;
let dest = req.src.as_str(); let dest = req.src.as_str();
@ -106,11 +130,13 @@ impl Runner {
self.send(dest, body); self.send(dest, body);
} }
/// Construct a new `Message` from `body` and send it to `dest`.
pub fn send(&self, dest: &str, body: Body) { pub fn send(&self, dest: &str, body: Body) {
let msg = self.mk_msg(dest, body); let msg = self.mk_msg(dest, body);
self.outbound_tx.get().unwrap().send(msg).unwrap(); self.outbound_tx.get().unwrap().send(msg).unwrap();
} }
/// Returns a Receiver<Message> that will receive the reply from the request.
pub fn rpc(&self, dest: &str, body: Body) -> RpcPromise { pub fn rpc(&self, dest: &str, body: Body) -> RpcPromise {
let msg = self.mk_msg(dest, body); let msg = self.mk_msg(dest, body);
let (tx, rx) = channel(); let (tx, rx) = channel();
@ -186,11 +212,9 @@ impl Runner {
if body.msg_id == 0 { if body.msg_id == 0 {
body.msg_id = self.next_msg_id(); body.msg_id = self.next_msg_id();
} }
Message { Message::from_dest(dest)
src: self.node_id().to_string(), .with_body(body)
dest: dest.to_string(), .with_src(self.node_id())
body,
}
} }
} }

View File

@ -11,6 +11,55 @@ pub struct Message {
pub body: Body, pub body: Body,
} }
impl Message {
/// `src` and `dest` will be empty.
pub fn from_body(body: Body) -> Self {
Message {
body,
..Default::default()
}
}
pub fn with_body(self, body: Body) -> Self {
let mut m = self;
m.body = body;
m
}
pub fn from_dest(dest: &str) -> Self {
Message {
dest: dest.to_string(),
..Default::default()
}
}
pub fn with_dest(self, dest: &str) -> Self {
let mut m = self;
m.dest = dest.to_string();
m
}
pub fn from_src(src: &str) -> Self {
Message {
src: src.to_string(),
..Default::default()
}
}
pub fn with_src(self, src: &str) -> Self {
let mut m = self;
m.src = src.to_string();
m
}
/// The Maelstrom type of a Body (and hence a Message) is just a string. This is for the sake of
/// ease of use for doing the Gossip Glomers challenges; this crate is not meant to be a real
/// network client framework.
pub fn typ(&self) -> &str {
self.body.typ.as_str()
}
}
#[derive(Debug, Default, Clone, Serialize, Deserialize)] #[derive(Debug, Default, Clone, Serialize, Deserialize)]
pub struct Body { pub struct Body {
#[serde(rename = "type")] #[serde(rename = "type")]
@ -53,23 +102,29 @@ impl Body {
b.payload = payload; b.payload = payload;
b b
} }
pub fn error(code: ErrorCode, in_reply_to: u64, text: Option<&str>) -> Self {
Body {
in_reply_to,
typ: "error".to_string(),
code: Some(code),
text: text.map(|t| t.to_string()),
..Default::default()
}
}
} }
pub fn init_ok(msg_id: u64, in_reply_to: u64) -> Body { #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Hash)]
Body::from_type("init_ok")
.with_msg_id(msg_id)
.with_in_reply_to(in_reply_to)
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
#[serde(untagged)] #[serde(untagged)]
pub enum ErrorCode { pub enum ErrorCode {
Definite(DefiniteError), Definite(DefiniteError),
Indefinite(IndefiniteError), Indefinite(IndefiniteError),
} }
#[derive(Debug, Clone, Copy, Serialize_repr, Deserialize_repr)] #[derive(
#[repr(u64)] Debug, Clone, Copy, Serialize_repr, Deserialize_repr, PartialEq, Eq, PartialOrd, Ord, Hash,
)]
#[repr(u16)]
pub enum DefiniteError { pub enum DefiniteError {
NodeNotFound = 2, NodeNotFound = 2,
NotSupported = 10, NotSupported = 10,
@ -82,24 +137,15 @@ pub enum DefiniteError {
TxnConflict = 30, TxnConflict = 30,
} }
#[derive(Debug, Clone, Copy, Serialize_repr, Deserialize_repr)] #[derive(
#[repr(u64)] Debug, Clone, Copy, Serialize_repr, Deserialize_repr, PartialEq, Eq, PartialOrd, Ord, Hash,
)]
#[repr(u16)]
pub enum IndefiniteError { pub enum IndefiniteError {
Timeout = 0, Timeout = 0,
Crash = 13, Crash = 13,
} }
pub fn error(msg_id: u64, in_reply_to: u64, code: ErrorCode, text: Option<&str>) -> Body {
Body {
typ: "error".to_string(),
msg_id,
in_reply_to,
code: Some(code),
text: text.map(|t| t.to_string()),
payload: Default::default(),
}
}
#[allow(clippy::trivially_copy_pass_by_ref)] #[allow(clippy::trivially_copy_pass_by_ref)]
fn u64_zero_by_ref(num: &u64) -> bool { fn u64_zero_by_ref(num: &u64) -> bool {
*num == 0 *num == 0
@ -114,5 +160,9 @@ mod test {
let ec = ErrorCode::Definite(DefiniteError::Abort); let ec = ErrorCode::Definite(DefiniteError::Abort);
let e = serde_json::to_string(&ec).unwrap(); let e = serde_json::to_string(&ec).unwrap();
assert_eq!(&e, "14"); assert_eq!(&e, "14");
let s: ErrorCode = serde_json::from_str(&e).unwrap();
assert_eq!(s, ec);
let n: ErrorCode = serde_json::from_value(14u16.into()).unwrap();
assert_eq!(n, ec);
} }
} }