diff --git a/Cargo.lock b/Cargo.lock index f7b1a4a..e411b96 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -68,6 +68,8 @@ checksum = "97b3888a4aecf77e811145cadf6eef5901f4782c53886191b2f693f24761847c" [[package]] name = "nebkor-maelstrom" version = "0.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "83dd99cc42f1882a9a269091b25a3afb88f6237ed56bc255659e49aaf9e89fc3" dependencies = [ "serde", "serde_json", @@ -82,9 +84,9 @@ checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" [[package]] name = "proc-macro2" -version = "1.0.83" +version = "1.0.85" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b33eb56c327dec362a9e55b3ad14f9d2f0904fb5a5b03b513ab5465399e9f43" +checksum = "22244ce15aa966053a896d1accb3a6e68469b97c7f33f284b99f0d576879fc23" dependencies = [ "unicode-ident", ] @@ -136,18 +138,18 @@ checksum = "f3cb5ba0dc43242ce17de99c180e96db90b235b8a9fdc9543c96d2209116bd9f" [[package]] name = "serde" -version = "1.0.202" +version = "1.0.203" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "226b61a0d411b2ba5ff6d7f73a476ac4f8bb900373459cd00fab8512828ba395" +checksum = "7253ab4de971e72fb7be983802300c30b5a7f0c2e56fab8abfc6a214307c0094" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.202" +version = "1.0.203" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6048858004bcff69094cd972ed40a32500f153bd3be9f716b2eed2e8217c4838" +checksum = "500cbc0ebeb6f46627f50f3f5811ccf6bf00643be300b4c3eabc0ef55dc5b5ba" dependencies = [ "proc-macro2", "quote", @@ -178,9 +180,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.65" +version = "2.0.66" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d2863d96a84c6439701d7a38f9de935ec562c8832cc55d1dde0f513b52fad106" +checksum = "c42f3f41a2de00b01c0aaad383c5a45241efc8b2d1eda5661812fda5f3cdcff5" dependencies = [ "proc-macro2", "quote", diff --git a/Cargo.toml b/Cargo.toml index bb1b15b..5a26b77 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,5 @@ [workspace] -members = ["gg-echo", "gg-uid", "gg-broadcast", "nebkor-maelstrom", "gg-g_counter"] +members = ["gg-echo", "gg-uid", "gg-broadcast", "gg-g_counter"] resolver = "2" [workspace.package] @@ -10,4 +10,4 @@ license-file = "LICENSE.md" [workspace.dependencies] serde_json = "1" rand = "0.8" - +nebkor-maelstrom = "*" diff --git a/gg-broadcast/Cargo.toml b/gg-broadcast/Cargo.toml index a713c61..a9397de 100644 --- a/gg-broadcast/Cargo.toml +++ b/gg-broadcast/Cargo.toml @@ -7,5 +7,5 @@ license-file.workspace = true [dependencies] serde_json.workspace = true -nebkor-maelstrom = { path = "../nebkor-maelstrom" } +nebkor-maelstrom.workspace = true rand.workspace = true diff --git a/gg-echo/Cargo.toml b/gg-echo/Cargo.toml index 6496998..eccf335 100644 --- a/gg-echo/Cargo.toml +++ b/gg-echo/Cargo.toml @@ -6,4 +6,4 @@ authors.workspace = true [dependencies] serde_json.workspace = true -nebkor-maelstrom = { path = "../nebkor-maelstrom" } +nebkor-maelstrom.workspace = true diff --git a/gg-g_counter/Cargo.toml b/gg-g_counter/Cargo.toml index eb83a0d..6bd7c4c 100644 --- a/gg-g_counter/Cargo.toml +++ b/gg-g_counter/Cargo.toml @@ -7,6 +7,6 @@ license-file.workspace = true [dependencies] serde_json.workspace = true -nebkor-maelstrom = { path = "../nebkor-maelstrom" } +nebkor-maelstrom.workspace = true rand.workspace = true diff --git a/gg-uid/Cargo.toml b/gg-uid/Cargo.toml index 87102f9..1e43093 100644 --- a/gg-uid/Cargo.toml +++ b/gg-uid/Cargo.toml @@ -7,4 +7,4 @@ license-file.workspace = true [dependencies] serde_json.workspace = true -nebkor-maelstrom = { path = "../nebkor-maelstrom" } +nebkor-maelstrom.workspace = true diff --git a/nebkor-maelstrom/Cargo.toml b/nebkor-maelstrom/Cargo.toml deleted file mode 100644 index 456c9dc..0000000 --- a/nebkor-maelstrom/Cargo.toml +++ /dev/null @@ -1,11 +0,0 @@ -[package] -name = "nebkor-maelstrom" -edition = "2021" -version.workspace = true -authors.workspace = true -license-file.workspace = true - -[dependencies] -serde_json.workspace = true -serde = { version = "1", default-features = false, features = ["derive"] } -serde_repr = "0.1" diff --git a/nebkor-maelstrom/README.md b/nebkor-maelstrom/README.md deleted file mode 100644 index dc03b8d..0000000 --- a/nebkor-maelstrom/README.md +++ /dev/null @@ -1,66 +0,0 @@ -# A synchronous and simple Maelstrom crate - -`nebkor-maelstreom` is a lean and simple synchronous library for writing -[Maelstrom](https://github.com/jepsen-io/maelstrom/tree/0186f398f96564a0453dda97c07daeac67c3d8d7)-compatible -distributed actors. It has three dependencies: - - - serde - - serde_json - - serde_repr - -For a simple example, see the [gg-echo](https://git.kittencollective.com/nebkor/chatty-catties/src/branch/main/gg-echo/src/main.rs) program: - -``` rust -use nebkor_maelstrom::{Body, Message, Node, Runner}; - -struct Echo; - -impl Node for Echo { - fn handle(&mut self, runner: &Runner, msg: Message) { - let typ = &msg.body.typ; - if typ.as_str() == "echo" { - let body = Body::from_type("echo_ok").with_payload(msg.body.payload.clone()); - runner.reply(&msg, body); - } - } -} - -fn main() { - let node = Echo; - - let runner = Runner::new(node); - - runner.run(None); -} -``` - -## How to use - -Create a struct and implement `nebkor_maelstrom::Node` for it, which involves a single method, -`handle(&mut self, &Runner, Message)`. This method is passed a `Runner` which contains methods like -`send`, `reply`, and `rpc`. - -In your main function, instantiate that struct and pass that into `Runner::new()` to get a -Runner. The `run()` method takes an optional closure that will be run when the `init` Message is -received; see the -[broadcast](https://git.kittencollective.com/nebkor/chatty-catties/src/commit/fff7fdc2d52f7d4c2d4d9c581ea16cdf0e1e3f30/gg-broadcast/src/main.rs#L18-L33) -program for an example of that, where it spawns a thread to send periodic messages to the node. - -## Design considerations - -I wanted the client code to be as simple as possible, with the least amount of boilerplate. Using -`&mut self` as the receiver for the `handle()` method lets you easily mutate state in your node if -you need to, without the ceremony of `Rc>` and the like. Eschewing `async` results in an -order of magnitude fewer dependencies, and the entire workspace (crate and clients) can be compiled -in a couple seconds. - -## Acknowledgments - -I straight-up stole the design of the IO/network system from -[Maelbreaker](https://github.com/rafibayer/maelbreaker/), which allowed me to get a working RPC -call. Thanks! - - -## TODO - - - add error handling. diff --git a/nebkor-maelstrom/src/kv.rs b/nebkor-maelstrom/src/kv.rs deleted file mode 100644 index dd841e6..0000000 --- a/nebkor-maelstrom/src/kv.rs +++ /dev/null @@ -1,59 +0,0 @@ -use serde_json::Value; - -use crate::{check_err, mk_payload, Body, RpcResult, Runner}; - -#[derive(Debug, Default, Clone)] -pub struct Kv { - pub service: &'static str, -} - -impl Kv { - pub fn seq() -> Self { - Kv { service: "seq-kv" } - } - - pub fn lin() -> Self { - Kv { service: "lin-kv" } - } - - pub fn lww() -> Self { - Kv { service: "lww-kv" } - } - - pub fn read(&self, runner: &Runner, key: &str) -> RpcResult { - let payload = mk_payload(&[("key", key.into())]); - let body = Body::from_type("read").with_payload(payload); - let rx = runner.rpc(self.service, body); - let msg = rx.recv().unwrap(); - check_err(&msg)?; - Ok(Some(msg.body.payload.get("value").unwrap().to_owned())) - } - - pub fn write(&self, runner: &Runner, key: &str, val: Value) -> RpcResult { - let payload = mk_payload(&[("key", key.into()), ("value", val)]); - let body = Body::from_type("write").with_payload(payload); - let msg = runner.rpc(self.service, body).recv().unwrap(); - check_err(&msg)?; - Ok(None) - } - - pub fn cas( - &self, - runner: &Runner, - key: &str, - from: Value, - to: Value, - create: bool, - ) -> RpcResult { - let payload = mk_payload(&[ - ("key", key.into()), - ("from", from), - ("to", to), - ("create_if_not_exists", create.into()), - ]); - let body = Body::from_type("cas").with_payload(payload); - let msg = runner.rpc(self.service, body).recv().unwrap(); - check_err(&msg)?; - Ok(None) - } -} diff --git a/nebkor-maelstrom/src/lib.rs b/nebkor-maelstrom/src/lib.rs deleted file mode 100644 index 45ab037..0000000 --- a/nebkor-maelstrom/src/lib.rs +++ /dev/null @@ -1,210 +0,0 @@ -use std::{ - collections::HashMap, - io::{BufRead, Write}, - sync::{ - atomic::{AtomicU64, Ordering}, - mpsc::{channel, Receiver, Sender}, - Arc, Mutex, OnceLock, - }, - thread::{self}, -}; - -use serde_json::Value; - -pub mod protocol; -pub use protocol::{Body, ErrorCode, Message, Payload}; - -pub mod kv; - -pub type NodeyNodeFace = Arc>; -pub type OnInit = Box; -pub type RpcPromise = Receiver; -pub type RpcResult = std::result::Result, ErrorCode>; - -pub trait Node { - fn handle(&mut self, runner: &Runner, msg: Message); -} - -pub struct Runner { - node: NodeyNodeFace, - node_id: OnceLock, - nodes: OnceLock>, - backdoor: OnceLock>, - promises: Arc>>>, - outbound_tx: OnceLock>, - msg_id: AtomicU64, -} - -impl Runner { - pub fn new(node: N) -> Self { - let node = Arc::new(Mutex::new(node)); - Runner { - node, - nodes: OnceLock::new(), - node_id: OnceLock::new(), - backdoor: OnceLock::new(), - outbound_tx: OnceLock::new(), - promises: Default::default(), - msg_id: AtomicU64::new(1), - } - } - - pub fn run(&self, on_init: Option) { - let (stdin_tx, stdin_rx) = channel(); - thread::spawn(move || { - let stdin = std::io::stdin().lock().lines(); - for line in stdin { - stdin_tx.send(line.unwrap()).unwrap(); - } - }); - - let (stdout_tx, stdout_rx) = channel(); - thread::spawn(move || { - let mut stdout = std::io::stdout().lock(); - for msg in stdout_rx { - writeln!(&mut stdout, "{msg}").unwrap(); - } - }); - - let (outbound_tx, outbound_rx) = channel(); - let _ = self.outbound_tx.get_or_init(|| outbound_tx); - thread::spawn(move || { - while let Ok(msg) = outbound_rx.recv() { - let msg = serde_json::to_string(&msg).unwrap(); - stdout_tx.send(msg).unwrap(); - } - }); - - self.process_input(stdin_rx, on_init); - } - - pub fn get_backdoor(&self) -> Sender { - self.backdoor.get().unwrap().clone() - } - - pub fn node_id(&self) -> String { - self.node_id.get().cloned().unwrap_or_default() - } - - pub fn next_msg_id(&self) -> u64 { - self.msg_id.fetch_add(1, Ordering::SeqCst) - } - - pub fn cur_msg_id(&self) -> u64 { - self.msg_id.load(Ordering::SeqCst) - } - - pub fn nodes(&self) -> &[String] { - self.nodes.get().unwrap() - } - - pub fn reply(&self, req: &Message, body: Body) { - let mut body = body; - let dest = req.src.as_str(); - let in_reply_to = req.body.msg_id; - body.in_reply_to = in_reply_to; - self.send(dest, body); - } - - pub fn send(&self, dest: &str, body: Body) { - let msg = self.mk_msg(dest, body); - self.outbound_tx.get().unwrap().send(msg).unwrap(); - } - - pub fn rpc(&self, dest: &str, body: Body) -> RpcPromise { - let msg = self.mk_msg(dest, body); - let (tx, rx) = channel(); - { - let msg_id = msg.body.msg_id; - let mut g = self.promises.lock().unwrap(); - g.insert(msg_id, tx); - } - self.outbound_tx.get().unwrap().send(msg).unwrap(); - rx - } - - // internal methods - fn init(&self, msg: &Message) { - let node_id = msg - .body - .payload - .get("node_id") - .unwrap() - .as_str() - .unwrap() - .to_owned(); - - let nodes = msg - .body - .payload - .get("node_ids") - .unwrap() - .as_array() - .unwrap() - .iter() - .map(|s| s.as_str().unwrap().to_string()) - .collect(); - - let _ = self.node_id.get_or_init(|| node_id); - let _ = self.nodes.get_or_init(|| nodes); - } - - fn process_input(&self, stdin_rx: Receiver, on_init: Option) { - let (json_tx, json_rx) = channel(); - let _ = self.backdoor.get_or_init(|| json_tx.clone()); - let proms = self.promises.clone(); - thread::spawn(move || { - for line in stdin_rx { - let msg: Message = serde_json::from_str(&line).unwrap(); - let irt = msg.body.in_reply_to; - if let Some(tx) = proms.lock().unwrap().remove(&irt) { - tx.send(msg).unwrap(); - } else { - json_tx.send(msg).unwrap(); - } - } - }); - - let msg = json_rx.recv().unwrap(); - { - self.init(&msg); - let body = Body::from_type("init_ok"); - self.reply(&msg, body); - if let Some(on_init) = on_init { - on_init(self); - } - } - - let mut node = self.node.lock().unwrap(); - for msg in json_rx { - node.handle(self, msg); - } - } - - fn mk_msg(&self, dest: &str, body: Body) -> Message { - let mut body = body; - if body.msg_id == 0 { - body.msg_id = self.next_msg_id(); - } - Message { - src: self.node_id().to_string(), - dest: dest.to_string(), - body, - } - } -} - -pub fn check_err(msg: &Message) -> RpcResult { - if msg.body.typ.as_str() == "error" { - let error = msg.body.code.unwrap(); - return Err(error); - } - Ok(None) -} - -pub fn mk_payload(payload: &[(&str, Value)]) -> Payload { - payload - .iter() - .map(|p| (p.0.to_string(), p.1.clone())) - .collect() -} diff --git a/nebkor-maelstrom/src/protocol.rs b/nebkor-maelstrom/src/protocol.rs deleted file mode 100644 index 6f35178..0000000 --- a/nebkor-maelstrom/src/protocol.rs +++ /dev/null @@ -1,118 +0,0 @@ -use serde::{Deserialize, Serialize}; -use serde_json::{Map, Value}; -use serde_repr::{Deserialize_repr, Serialize_repr}; - -pub type Payload = Map; - -#[derive(Debug, Default, Clone, Serialize, Deserialize)] -pub struct Message { - pub src: String, - pub dest: String, - pub body: Body, -} - -#[derive(Debug, Default, Clone, Serialize, Deserialize)] -pub struct Body { - #[serde(rename = "type")] - pub typ: String, - #[serde(default, skip_serializing_if = "u64_zero_by_ref")] - pub msg_id: u64, - #[serde(default, skip_serializing_if = "u64_zero_by_ref")] - pub in_reply_to: u64, - #[serde(flatten)] - pub payload: Payload, - // the following are for the case of errors - #[serde(default, skip_serializing_if = "Option::is_none")] - pub code: Option, - #[serde(default, skip_serializing_if = "Option::is_none")] - pub text: Option, -} - -impl Body { - pub fn from_type(typ: &str) -> Self { - Body { - typ: typ.to_string(), - ..Default::default() - } - } - - pub fn with_msg_id(self, msg_id: u64) -> Self { - let mut b = self; - b.msg_id = msg_id; - b - } - - pub fn with_in_reply_to(self, in_reply_to: u64) -> Self { - let mut b = self; - b.in_reply_to = in_reply_to; - b - } - - pub fn with_payload(self, payload: Payload) -> Self { - let mut b = self; - b.payload = payload; - b - } -} - -pub fn init_ok(msg_id: u64, in_reply_to: u64) -> Body { - Body::from_type("init_ok") - .with_msg_id(msg_id) - .with_in_reply_to(in_reply_to) -} - -#[derive(Debug, Clone, Copy, Serialize, Deserialize)] -#[serde(untagged)] -pub enum ErrorCode { - Definite(DefiniteError), - Indefinite(IndefiniteError), -} - -#[derive(Debug, Clone, Copy, Serialize_repr, Deserialize_repr)] -#[repr(u64)] -pub enum DefiniteError { - NodeNotFound = 2, - NotSupported = 10, - TemporarilyUnavailable = 11, - MalformedRequest = 12, - Abort = 14, - KeyNotFound = 20, - KeyAlreadyExists = 21, - PreconditionFailed = 22, - TxnConflict = 30, -} - -#[derive(Debug, Clone, Copy, Serialize_repr, Deserialize_repr)] -#[repr(u64)] -pub enum IndefiniteError { - Timeout = 0, - Crash = 13, -} - -pub fn error(msg_id: u64, in_reply_to: u64, code: ErrorCode, text: Option<&str>) -> Body { - Body { - typ: "error".to_string(), - msg_id, - in_reply_to, - code: Some(code), - text: text.map(|t| t.to_string()), - payload: Default::default(), - } -} - -#[allow(clippy::trivially_copy_pass_by_ref)] -fn u64_zero_by_ref(num: &u64) -> bool { - *num == 0 -} - -#[cfg(test)] -mod test { - use super::*; - - #[test] - fn error_codes() { - let ec = ErrorCode::Definite(DefiniteError::Abort); - let e = serde_json::to_string(&ec).unwrap(); - assert_eq!(&e, "14"); - } -}