From c6c05b41b450c4a98e24ac4cec26038386670424 Mon Sep 17 00:00:00 2001 From: Joe Ardent Date: Tue, 4 Jun 2024 17:27:52 -0700 Subject: [PATCH] initial import from chatty-catties --- .gitignore | 3 + .rustfmt.toml | 4 + Cargo.toml | 14 ++++ LICENSE.md | 5 ++ README.md | 61 ++++++++++++++ src/kv.rs | 59 ++++++++++++++ src/lib.rs | 210 ++++++++++++++++++++++++++++++++++++++++++++++++ src/protocol.rs | 118 +++++++++++++++++++++++++++ 8 files changed, 474 insertions(+) create mode 100644 .gitignore create mode 100644 .rustfmt.toml create mode 100644 Cargo.toml create mode 100644 LICENSE.md create mode 100644 README.md create mode 100644 src/kv.rs create mode 100644 src/lib.rs create mode 100644 src/protocol.rs diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..25d4747 --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +target/ +store/ +Cargo.lock diff --git a/.rustfmt.toml b/.rustfmt.toml new file mode 100644 index 0000000..4c8d0e1 --- /dev/null +++ b/.rustfmt.toml @@ -0,0 +1,4 @@ +imports_granularity = "Crate" +group_imports = "StdExternalCrate" +wrap_comments = true +edition = "2021" diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..eda4254 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "nebkor-maelstrom" +edition = "2021" +version = "0.0.1" +license-file = "LICENSE.md" +readme = "README.md" +description = "An easy-to-use and synchronous client for creating Maelstrom distributed clients." +repository = "https://git.kittencollective.com/nebkor/nebkor-maelstrom" +keywords = ["maelstrom", "glomers", "gossip", "distributed"] + +[dependencies] +serde_json = "1" +serde = { version = "1", default-features = false, features = ["derive"] } +serde_repr = "0.1" diff --git a/LICENSE.md b/LICENSE.md new file mode 100644 index 0000000..589cfb6 --- /dev/null +++ b/LICENSE.md @@ -0,0 +1,5 @@ +# The Chaos License (GLP) + +This software is released under the terms of the Chaos License. In cases where the terms of the +license are unclear, refer to the [Fuck Around and Find Out +License](https://git.sr.ht/~boringcactus/fafol/tree/master/LICENSE-v0.2.md). diff --git a/README.md b/README.md new file mode 100644 index 0000000..ede6dbf --- /dev/null +++ b/README.md @@ -0,0 +1,61 @@ +# A synchronous and simple Maelstrom crate + +`nebkor-maelstreom` is a lean and simple synchronous library for writing +[Maelstrom](https://github.com/jepsen-io/maelstrom/tree/0186f398f96564a0453dda97c07daeac67c3d8d7)-compatible +distributed actors. It has three dependencies: + + - serde + - serde_json + - serde_repr + +For a simple example, see the [gg-echo](https://git.kittencollective.com/nebkor/chatty-catties/src/branch/main/gg-echo/src/main.rs) program: + +``` rust +use nebkor_maelstrom::{Body, Message, Node, Runner}; + +struct Echo; + +impl Node for Echo { + fn handle(&mut self, runner: &Runner, msg: Message) { + let typ = &msg.body.typ; + if typ.as_str() == "echo" { + let body = Body::from_type("echo_ok").with_payload(msg.body.payload.clone()); + runner.reply(&msg, body); + } + } +} + +fn main() { + let node = Echo; + + let runner = Runner::new(node); + + runner.run(None); +} +``` + +## How to use + +Create a struct and implement `nebkor_maelstrom::Node` for it, which involves a single method, +`handle(&mut self, &Runner, Message)`. This method is passed a `Runner` which contains methods like +`send`, `reply`, and `rpc`. + +In your main function, instantiate that struct and pass that into `Runner::new()` to get a +Runner. The `run()` method takes an optional closure that will be run when the `init` Message is +received; see the +[broadcast](https://git.kittencollective.com/nebkor/chatty-catties/src/commit/af6d2c0b2720669f91a758c8c5755a146a914be4/gg-broadcast/src/main.rs#L10-L30) +program for an example of that, where it spawns a thread to send periodic messages to the node. + +## Design considerations + +I wanted the client code to be as simple as possible, with the least amount of boilerplate. Using +`&mut self` as the receiver for the `handle()` method lets you easily mutate state in your node if +you need to, without the ceremony of `Rc>` and the like. Eschewing `async` results in an +order of magnitude fewer dependencies, and the entire workspace (crate and clients) can be compiled +in a couple seconds. + +## Acknowledgments + +I straight-up stole the design of the IO/network system from +[Maelbreaker](https://github.com/rafibayer/maelbreaker/), which allowed me to get a working RPC +call. Thanks! diff --git a/src/kv.rs b/src/kv.rs new file mode 100644 index 0000000..dd841e6 --- /dev/null +++ b/src/kv.rs @@ -0,0 +1,59 @@ +use serde_json::Value; + +use crate::{check_err, mk_payload, Body, RpcResult, Runner}; + +#[derive(Debug, Default, Clone)] +pub struct Kv { + pub service: &'static str, +} + +impl Kv { + pub fn seq() -> Self { + Kv { service: "seq-kv" } + } + + pub fn lin() -> Self { + Kv { service: "lin-kv" } + } + + pub fn lww() -> Self { + Kv { service: "lww-kv" } + } + + pub fn read(&self, runner: &Runner, key: &str) -> RpcResult { + let payload = mk_payload(&[("key", key.into())]); + let body = Body::from_type("read").with_payload(payload); + let rx = runner.rpc(self.service, body); + let msg = rx.recv().unwrap(); + check_err(&msg)?; + Ok(Some(msg.body.payload.get("value").unwrap().to_owned())) + } + + pub fn write(&self, runner: &Runner, key: &str, val: Value) -> RpcResult { + let payload = mk_payload(&[("key", key.into()), ("value", val)]); + let body = Body::from_type("write").with_payload(payload); + let msg = runner.rpc(self.service, body).recv().unwrap(); + check_err(&msg)?; + Ok(None) + } + + pub fn cas( + &self, + runner: &Runner, + key: &str, + from: Value, + to: Value, + create: bool, + ) -> RpcResult { + let payload = mk_payload(&[ + ("key", key.into()), + ("from", from), + ("to", to), + ("create_if_not_exists", create.into()), + ]); + let body = Body::from_type("cas").with_payload(payload); + let msg = runner.rpc(self.service, body).recv().unwrap(); + check_err(&msg)?; + Ok(None) + } +} diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..45ab037 --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,210 @@ +use std::{ + collections::HashMap, + io::{BufRead, Write}, + sync::{ + atomic::{AtomicU64, Ordering}, + mpsc::{channel, Receiver, Sender}, + Arc, Mutex, OnceLock, + }, + thread::{self}, +}; + +use serde_json::Value; + +pub mod protocol; +pub use protocol::{Body, ErrorCode, Message, Payload}; + +pub mod kv; + +pub type NodeyNodeFace = Arc>; +pub type OnInit = Box; +pub type RpcPromise = Receiver; +pub type RpcResult = std::result::Result, ErrorCode>; + +pub trait Node { + fn handle(&mut self, runner: &Runner, msg: Message); +} + +pub struct Runner { + node: NodeyNodeFace, + node_id: OnceLock, + nodes: OnceLock>, + backdoor: OnceLock>, + promises: Arc>>>, + outbound_tx: OnceLock>, + msg_id: AtomicU64, +} + +impl Runner { + pub fn new(node: N) -> Self { + let node = Arc::new(Mutex::new(node)); + Runner { + node, + nodes: OnceLock::new(), + node_id: OnceLock::new(), + backdoor: OnceLock::new(), + outbound_tx: OnceLock::new(), + promises: Default::default(), + msg_id: AtomicU64::new(1), + } + } + + pub fn run(&self, on_init: Option) { + let (stdin_tx, stdin_rx) = channel(); + thread::spawn(move || { + let stdin = std::io::stdin().lock().lines(); + for line in stdin { + stdin_tx.send(line.unwrap()).unwrap(); + } + }); + + let (stdout_tx, stdout_rx) = channel(); + thread::spawn(move || { + let mut stdout = std::io::stdout().lock(); + for msg in stdout_rx { + writeln!(&mut stdout, "{msg}").unwrap(); + } + }); + + let (outbound_tx, outbound_rx) = channel(); + let _ = self.outbound_tx.get_or_init(|| outbound_tx); + thread::spawn(move || { + while let Ok(msg) = outbound_rx.recv() { + let msg = serde_json::to_string(&msg).unwrap(); + stdout_tx.send(msg).unwrap(); + } + }); + + self.process_input(stdin_rx, on_init); + } + + pub fn get_backdoor(&self) -> Sender { + self.backdoor.get().unwrap().clone() + } + + pub fn node_id(&self) -> String { + self.node_id.get().cloned().unwrap_or_default() + } + + pub fn next_msg_id(&self) -> u64 { + self.msg_id.fetch_add(1, Ordering::SeqCst) + } + + pub fn cur_msg_id(&self) -> u64 { + self.msg_id.load(Ordering::SeqCst) + } + + pub fn nodes(&self) -> &[String] { + self.nodes.get().unwrap() + } + + pub fn reply(&self, req: &Message, body: Body) { + let mut body = body; + let dest = req.src.as_str(); + let in_reply_to = req.body.msg_id; + body.in_reply_to = in_reply_to; + self.send(dest, body); + } + + pub fn send(&self, dest: &str, body: Body) { + let msg = self.mk_msg(dest, body); + self.outbound_tx.get().unwrap().send(msg).unwrap(); + } + + pub fn rpc(&self, dest: &str, body: Body) -> RpcPromise { + let msg = self.mk_msg(dest, body); + let (tx, rx) = channel(); + { + let msg_id = msg.body.msg_id; + let mut g = self.promises.lock().unwrap(); + g.insert(msg_id, tx); + } + self.outbound_tx.get().unwrap().send(msg).unwrap(); + rx + } + + // internal methods + fn init(&self, msg: &Message) { + let node_id = msg + .body + .payload + .get("node_id") + .unwrap() + .as_str() + .unwrap() + .to_owned(); + + let nodes = msg + .body + .payload + .get("node_ids") + .unwrap() + .as_array() + .unwrap() + .iter() + .map(|s| s.as_str().unwrap().to_string()) + .collect(); + + let _ = self.node_id.get_or_init(|| node_id); + let _ = self.nodes.get_or_init(|| nodes); + } + + fn process_input(&self, stdin_rx: Receiver, on_init: Option) { + let (json_tx, json_rx) = channel(); + let _ = self.backdoor.get_or_init(|| json_tx.clone()); + let proms = self.promises.clone(); + thread::spawn(move || { + for line in stdin_rx { + let msg: Message = serde_json::from_str(&line).unwrap(); + let irt = msg.body.in_reply_to; + if let Some(tx) = proms.lock().unwrap().remove(&irt) { + tx.send(msg).unwrap(); + } else { + json_tx.send(msg).unwrap(); + } + } + }); + + let msg = json_rx.recv().unwrap(); + { + self.init(&msg); + let body = Body::from_type("init_ok"); + self.reply(&msg, body); + if let Some(on_init) = on_init { + on_init(self); + } + } + + let mut node = self.node.lock().unwrap(); + for msg in json_rx { + node.handle(self, msg); + } + } + + fn mk_msg(&self, dest: &str, body: Body) -> Message { + let mut body = body; + if body.msg_id == 0 { + body.msg_id = self.next_msg_id(); + } + Message { + src: self.node_id().to_string(), + dest: dest.to_string(), + body, + } + } +} + +pub fn check_err(msg: &Message) -> RpcResult { + if msg.body.typ.as_str() == "error" { + let error = msg.body.code.unwrap(); + return Err(error); + } + Ok(None) +} + +pub fn mk_payload(payload: &[(&str, Value)]) -> Payload { + payload + .iter() + .map(|p| (p.0.to_string(), p.1.clone())) + .collect() +} diff --git a/src/protocol.rs b/src/protocol.rs new file mode 100644 index 0000000..6f35178 --- /dev/null +++ b/src/protocol.rs @@ -0,0 +1,118 @@ +use serde::{Deserialize, Serialize}; +use serde_json::{Map, Value}; +use serde_repr::{Deserialize_repr, Serialize_repr}; + +pub type Payload = Map; + +#[derive(Debug, Default, Clone, Serialize, Deserialize)] +pub struct Message { + pub src: String, + pub dest: String, + pub body: Body, +} + +#[derive(Debug, Default, Clone, Serialize, Deserialize)] +pub struct Body { + #[serde(rename = "type")] + pub typ: String, + #[serde(default, skip_serializing_if = "u64_zero_by_ref")] + pub msg_id: u64, + #[serde(default, skip_serializing_if = "u64_zero_by_ref")] + pub in_reply_to: u64, + #[serde(flatten)] + pub payload: Payload, + // the following are for the case of errors + #[serde(default, skip_serializing_if = "Option::is_none")] + pub code: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub text: Option, +} + +impl Body { + pub fn from_type(typ: &str) -> Self { + Body { + typ: typ.to_string(), + ..Default::default() + } + } + + pub fn with_msg_id(self, msg_id: u64) -> Self { + let mut b = self; + b.msg_id = msg_id; + b + } + + pub fn with_in_reply_to(self, in_reply_to: u64) -> Self { + let mut b = self; + b.in_reply_to = in_reply_to; + b + } + + pub fn with_payload(self, payload: Payload) -> Self { + let mut b = self; + b.payload = payload; + b + } +} + +pub fn init_ok(msg_id: u64, in_reply_to: u64) -> Body { + Body::from_type("init_ok") + .with_msg_id(msg_id) + .with_in_reply_to(in_reply_to) +} + +#[derive(Debug, Clone, Copy, Serialize, Deserialize)] +#[serde(untagged)] +pub enum ErrorCode { + Definite(DefiniteError), + Indefinite(IndefiniteError), +} + +#[derive(Debug, Clone, Copy, Serialize_repr, Deserialize_repr)] +#[repr(u64)] +pub enum DefiniteError { + NodeNotFound = 2, + NotSupported = 10, + TemporarilyUnavailable = 11, + MalformedRequest = 12, + Abort = 14, + KeyNotFound = 20, + KeyAlreadyExists = 21, + PreconditionFailed = 22, + TxnConflict = 30, +} + +#[derive(Debug, Clone, Copy, Serialize_repr, Deserialize_repr)] +#[repr(u64)] +pub enum IndefiniteError { + Timeout = 0, + Crash = 13, +} + +pub fn error(msg_id: u64, in_reply_to: u64, code: ErrorCode, text: Option<&str>) -> Body { + Body { + typ: "error".to_string(), + msg_id, + in_reply_to, + code: Some(code), + text: text.map(|t| t.to_string()), + payload: Default::default(), + } +} + +#[allow(clippy::trivially_copy_pass_by_ref)] +fn u64_zero_by_ref(num: &u64) -> bool { + *num == 0 +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn error_codes() { + let ec = ErrorCode::Definite(DefiniteError::Abort); + let e = serde_json::to_string(&ec).unwrap(); + assert_eq!(&e, "14"); + } +}