initial import from chatty-catties

This commit is contained in:
Joe Ardent 2024-06-04 17:27:52 -07:00
commit c6c05b41b4
8 changed files with 474 additions and 0 deletions

3
.gitignore vendored Normal file
View file

@ -0,0 +1,3 @@
target/
store/
Cargo.lock

4
.rustfmt.toml Normal file
View file

@ -0,0 +1,4 @@
imports_granularity = "Crate"
group_imports = "StdExternalCrate"
wrap_comments = true
edition = "2021"

14
Cargo.toml Normal file
View file

@ -0,0 +1,14 @@
[package]
name = "nebkor-maelstrom"
edition = "2021"
version = "0.0.1"
license-file = "LICENSE.md"
readme = "README.md"
description = "An easy-to-use and synchronous client for creating Maelstrom distributed clients."
repository = "https://git.kittencollective.com/nebkor/nebkor-maelstrom"
keywords = ["maelstrom", "glomers", "gossip", "distributed"]
[dependencies]
serde_json = "1"
serde = { version = "1", default-features = false, features = ["derive"] }
serde_repr = "0.1"

5
LICENSE.md Normal file
View file

@ -0,0 +1,5 @@
# The Chaos License (GLP)
This software is released under the terms of the Chaos License. In cases where the terms of the
license are unclear, refer to the [Fuck Around and Find Out
License](https://git.sr.ht/~boringcactus/fafol/tree/master/LICENSE-v0.2.md).

61
README.md Normal file
View file

@ -0,0 +1,61 @@
# A synchronous and simple Maelstrom crate
`nebkor-maelstreom` is a lean and simple synchronous library for writing
[Maelstrom](https://github.com/jepsen-io/maelstrom/tree/0186f398f96564a0453dda97c07daeac67c3d8d7)-compatible
distributed actors. It has three dependencies:
- serde
- serde_json
- serde_repr
For a simple example, see the [gg-echo](https://git.kittencollective.com/nebkor/chatty-catties/src/branch/main/gg-echo/src/main.rs) program:
``` rust
use nebkor_maelstrom::{Body, Message, Node, Runner};
struct Echo;
impl Node for Echo {
fn handle(&mut self, runner: &Runner, msg: Message) {
let typ = &msg.body.typ;
if typ.as_str() == "echo" {
let body = Body::from_type("echo_ok").with_payload(msg.body.payload.clone());
runner.reply(&msg, body);
}
}
}
fn main() {
let node = Echo;
let runner = Runner::new(node);
runner.run(None);
}
```
## How to use
Create a struct and implement `nebkor_maelstrom::Node` for it, which involves a single method,
`handle(&mut self, &Runner, Message)`. This method is passed a `Runner` which contains methods like
`send`, `reply`, and `rpc`.
In your main function, instantiate that struct and pass that into `Runner::new()` to get a
Runner. The `run()` method takes an optional closure that will be run when the `init` Message is
received; see the
[broadcast](https://git.kittencollective.com/nebkor/chatty-catties/src/commit/af6d2c0b2720669f91a758c8c5755a146a914be4/gg-broadcast/src/main.rs#L10-L30)
program for an example of that, where it spawns a thread to send periodic messages to the node.
## Design considerations
I wanted the client code to be as simple as possible, with the least amount of boilerplate. Using
`&mut self` as the receiver for the `handle()` method lets you easily mutate state in your node if
you need to, without the ceremony of `Rc<Mutex<>>` and the like. Eschewing `async` results in an
order of magnitude fewer dependencies, and the entire workspace (crate and clients) can be compiled
in a couple seconds.
## Acknowledgments
I straight-up stole the design of the IO/network system from
[Maelbreaker](https://github.com/rafibayer/maelbreaker/), which allowed me to get a working RPC
call. Thanks!

59
src/kv.rs Normal file
View file

@ -0,0 +1,59 @@
use serde_json::Value;
use crate::{check_err, mk_payload, Body, RpcResult, Runner};
#[derive(Debug, Default, Clone)]
pub struct Kv {
pub service: &'static str,
}
impl Kv {
pub fn seq() -> Self {
Kv { service: "seq-kv" }
}
pub fn lin() -> Self {
Kv { service: "lin-kv" }
}
pub fn lww() -> Self {
Kv { service: "lww-kv" }
}
pub fn read(&self, runner: &Runner, key: &str) -> RpcResult {
let payload = mk_payload(&[("key", key.into())]);
let body = Body::from_type("read").with_payload(payload);
let rx = runner.rpc(self.service, body);
let msg = rx.recv().unwrap();
check_err(&msg)?;
Ok(Some(msg.body.payload.get("value").unwrap().to_owned()))
}
pub fn write(&self, runner: &Runner, key: &str, val: Value) -> RpcResult {
let payload = mk_payload(&[("key", key.into()), ("value", val)]);
let body = Body::from_type("write").with_payload(payload);
let msg = runner.rpc(self.service, body).recv().unwrap();
check_err(&msg)?;
Ok(None)
}
pub fn cas(
&self,
runner: &Runner,
key: &str,
from: Value,
to: Value,
create: bool,
) -> RpcResult {
let payload = mk_payload(&[
("key", key.into()),
("from", from),
("to", to),
("create_if_not_exists", create.into()),
]);
let body = Body::from_type("cas").with_payload(payload);
let msg = runner.rpc(self.service, body).recv().unwrap();
check_err(&msg)?;
Ok(None)
}
}

210
src/lib.rs Normal file
View file

@ -0,0 +1,210 @@
use std::{
collections::HashMap,
io::{BufRead, Write},
sync::{
atomic::{AtomicU64, Ordering},
mpsc::{channel, Receiver, Sender},
Arc, Mutex, OnceLock,
},
thread::{self},
};
use serde_json::Value;
pub mod protocol;
pub use protocol::{Body, ErrorCode, Message, Payload};
pub mod kv;
pub type NodeyNodeFace = Arc<Mutex<dyn Node>>;
pub type OnInit = Box<dyn Fn(&Runner)>;
pub type RpcPromise = Receiver<Message>;
pub type RpcResult = std::result::Result<Option<Value>, ErrorCode>;
pub trait Node {
fn handle(&mut self, runner: &Runner, msg: Message);
}
pub struct Runner {
node: NodeyNodeFace,
node_id: OnceLock<String>,
nodes: OnceLock<Vec<String>>,
backdoor: OnceLock<Sender<Message>>,
promises: Arc<Mutex<HashMap<u64, Sender<Message>>>>,
outbound_tx: OnceLock<Sender<Message>>,
msg_id: AtomicU64,
}
impl Runner {
pub fn new<N: Node + 'static>(node: N) -> Self {
let node = Arc::new(Mutex::new(node));
Runner {
node,
nodes: OnceLock::new(),
node_id: OnceLock::new(),
backdoor: OnceLock::new(),
outbound_tx: OnceLock::new(),
promises: Default::default(),
msg_id: AtomicU64::new(1),
}
}
pub fn run(&self, on_init: Option<OnInit>) {
let (stdin_tx, stdin_rx) = channel();
thread::spawn(move || {
let stdin = std::io::stdin().lock().lines();
for line in stdin {
stdin_tx.send(line.unwrap()).unwrap();
}
});
let (stdout_tx, stdout_rx) = channel();
thread::spawn(move || {
let mut stdout = std::io::stdout().lock();
for msg in stdout_rx {
writeln!(&mut stdout, "{msg}").unwrap();
}
});
let (outbound_tx, outbound_rx) = channel();
let _ = self.outbound_tx.get_or_init(|| outbound_tx);
thread::spawn(move || {
while let Ok(msg) = outbound_rx.recv() {
let msg = serde_json::to_string(&msg).unwrap();
stdout_tx.send(msg).unwrap();
}
});
self.process_input(stdin_rx, on_init);
}
pub fn get_backdoor(&self) -> Sender<Message> {
self.backdoor.get().unwrap().clone()
}
pub fn node_id(&self) -> String {
self.node_id.get().cloned().unwrap_or_default()
}
pub fn next_msg_id(&self) -> u64 {
self.msg_id.fetch_add(1, Ordering::SeqCst)
}
pub fn cur_msg_id(&self) -> u64 {
self.msg_id.load(Ordering::SeqCst)
}
pub fn nodes(&self) -> &[String] {
self.nodes.get().unwrap()
}
pub fn reply(&self, req: &Message, body: Body) {
let mut body = body;
let dest = req.src.as_str();
let in_reply_to = req.body.msg_id;
body.in_reply_to = in_reply_to;
self.send(dest, body);
}
pub fn send(&self, dest: &str, body: Body) {
let msg = self.mk_msg(dest, body);
self.outbound_tx.get().unwrap().send(msg).unwrap();
}
pub fn rpc(&self, dest: &str, body: Body) -> RpcPromise {
let msg = self.mk_msg(dest, body);
let (tx, rx) = channel();
{
let msg_id = msg.body.msg_id;
let mut g = self.promises.lock().unwrap();
g.insert(msg_id, tx);
}
self.outbound_tx.get().unwrap().send(msg).unwrap();
rx
}
// internal methods
fn init(&self, msg: &Message) {
let node_id = msg
.body
.payload
.get("node_id")
.unwrap()
.as_str()
.unwrap()
.to_owned();
let nodes = msg
.body
.payload
.get("node_ids")
.unwrap()
.as_array()
.unwrap()
.iter()
.map(|s| s.as_str().unwrap().to_string())
.collect();
let _ = self.node_id.get_or_init(|| node_id);
let _ = self.nodes.get_or_init(|| nodes);
}
fn process_input(&self, stdin_rx: Receiver<String>, on_init: Option<OnInit>) {
let (json_tx, json_rx) = channel();
let _ = self.backdoor.get_or_init(|| json_tx.clone());
let proms = self.promises.clone();
thread::spawn(move || {
for line in stdin_rx {
let msg: Message = serde_json::from_str(&line).unwrap();
let irt = msg.body.in_reply_to;
if let Some(tx) = proms.lock().unwrap().remove(&irt) {
tx.send(msg).unwrap();
} else {
json_tx.send(msg).unwrap();
}
}
});
let msg = json_rx.recv().unwrap();
{
self.init(&msg);
let body = Body::from_type("init_ok");
self.reply(&msg, body);
if let Some(on_init) = on_init {
on_init(self);
}
}
let mut node = self.node.lock().unwrap();
for msg in json_rx {
node.handle(self, msg);
}
}
fn mk_msg(&self, dest: &str, body: Body) -> Message {
let mut body = body;
if body.msg_id == 0 {
body.msg_id = self.next_msg_id();
}
Message {
src: self.node_id().to_string(),
dest: dest.to_string(),
body,
}
}
}
pub fn check_err(msg: &Message) -> RpcResult {
if msg.body.typ.as_str() == "error" {
let error = msg.body.code.unwrap();
return Err(error);
}
Ok(None)
}
pub fn mk_payload(payload: &[(&str, Value)]) -> Payload {
payload
.iter()
.map(|p| (p.0.to_string(), p.1.clone()))
.collect()
}

118
src/protocol.rs Normal file
View file

@ -0,0 +1,118 @@
use serde::{Deserialize, Serialize};
use serde_json::{Map, Value};
use serde_repr::{Deserialize_repr, Serialize_repr};
pub type Payload = Map<String, Value>;
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
pub struct Message {
pub src: String,
pub dest: String,
pub body: Body,
}
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
pub struct Body {
#[serde(rename = "type")]
pub typ: String,
#[serde(default, skip_serializing_if = "u64_zero_by_ref")]
pub msg_id: u64,
#[serde(default, skip_serializing_if = "u64_zero_by_ref")]
pub in_reply_to: u64,
#[serde(flatten)]
pub payload: Payload,
// the following are for the case of errors
#[serde(default, skip_serializing_if = "Option::is_none")]
pub code: Option<ErrorCode>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub text: Option<String>,
}
impl Body {
pub fn from_type(typ: &str) -> Self {
Body {
typ: typ.to_string(),
..Default::default()
}
}
pub fn with_msg_id(self, msg_id: u64) -> Self {
let mut b = self;
b.msg_id = msg_id;
b
}
pub fn with_in_reply_to(self, in_reply_to: u64) -> Self {
let mut b = self;
b.in_reply_to = in_reply_to;
b
}
pub fn with_payload(self, payload: Payload) -> Self {
let mut b = self;
b.payload = payload;
b
}
}
pub fn init_ok(msg_id: u64, in_reply_to: u64) -> Body {
Body::from_type("init_ok")
.with_msg_id(msg_id)
.with_in_reply_to(in_reply_to)
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
#[serde(untagged)]
pub enum ErrorCode {
Definite(DefiniteError),
Indefinite(IndefiniteError),
}
#[derive(Debug, Clone, Copy, Serialize_repr, Deserialize_repr)]
#[repr(u64)]
pub enum DefiniteError {
NodeNotFound = 2,
NotSupported = 10,
TemporarilyUnavailable = 11,
MalformedRequest = 12,
Abort = 14,
KeyNotFound = 20,
KeyAlreadyExists = 21,
PreconditionFailed = 22,
TxnConflict = 30,
}
#[derive(Debug, Clone, Copy, Serialize_repr, Deserialize_repr)]
#[repr(u64)]
pub enum IndefiniteError {
Timeout = 0,
Crash = 13,
}
pub fn error(msg_id: u64, in_reply_to: u64, code: ErrorCode, text: Option<&str>) -> Body {
Body {
typ: "error".to_string(),
msg_id,
in_reply_to,
code: Some(code),
text: text.map(|t| t.to_string()),
payload: Default::default(),
}
}
#[allow(clippy::trivially_copy_pass_by_ref)]
fn u64_zero_by_ref(num: &u64) -> bool {
*num == 0
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn error_codes() {
let ec = ErrorCode::Definite(DefiniteError::Abort);
let e = serde_json::to_string(&ec).unwrap();
assert_eq!(&e, "14");
}
}