use the crates.io version of nebkor-maelstrom
This commit is contained in:
parent
af6d2c0b27
commit
a49ddffa56
11 changed files with 16 additions and 478 deletions
18
Cargo.lock
generated
18
Cargo.lock
generated
|
@ -68,6 +68,8 @@ checksum = "97b3888a4aecf77e811145cadf6eef5901f4782c53886191b2f693f24761847c"
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "nebkor-maelstrom"
|
name = "nebkor-maelstrom"
|
||||||
version = "0.0.1"
|
version = "0.0.1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "83dd99cc42f1882a9a269091b25a3afb88f6237ed56bc255659e49aaf9e89fc3"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"serde",
|
"serde",
|
||||||
"serde_json",
|
"serde_json",
|
||||||
|
@ -82,9 +84,9 @@ checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "proc-macro2"
|
name = "proc-macro2"
|
||||||
version = "1.0.83"
|
version = "1.0.85"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "0b33eb56c327dec362a9e55b3ad14f9d2f0904fb5a5b03b513ab5465399e9f43"
|
checksum = "22244ce15aa966053a896d1accb3a6e68469b97c7f33f284b99f0d576879fc23"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"unicode-ident",
|
"unicode-ident",
|
||||||
]
|
]
|
||||||
|
@ -136,18 +138,18 @@ checksum = "f3cb5ba0dc43242ce17de99c180e96db90b235b8a9fdc9543c96d2209116bd9f"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "serde"
|
name = "serde"
|
||||||
version = "1.0.202"
|
version = "1.0.203"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "226b61a0d411b2ba5ff6d7f73a476ac4f8bb900373459cd00fab8512828ba395"
|
checksum = "7253ab4de971e72fb7be983802300c30b5a7f0c2e56fab8abfc6a214307c0094"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"serde_derive",
|
"serde_derive",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "serde_derive"
|
name = "serde_derive"
|
||||||
version = "1.0.202"
|
version = "1.0.203"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "6048858004bcff69094cd972ed40a32500f153bd3be9f716b2eed2e8217c4838"
|
checksum = "500cbc0ebeb6f46627f50f3f5811ccf6bf00643be300b4c3eabc0ef55dc5b5ba"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"proc-macro2",
|
"proc-macro2",
|
||||||
"quote",
|
"quote",
|
||||||
|
@ -178,9 +180,9 @@ dependencies = [
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "syn"
|
name = "syn"
|
||||||
version = "2.0.65"
|
version = "2.0.66"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "d2863d96a84c6439701d7a38f9de935ec562c8832cc55d1dde0f513b52fad106"
|
checksum = "c42f3f41a2de00b01c0aaad383c5a45241efc8b2d1eda5661812fda5f3cdcff5"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"proc-macro2",
|
"proc-macro2",
|
||||||
"quote",
|
"quote",
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
[workspace]
|
[workspace]
|
||||||
members = ["gg-echo", "gg-uid", "gg-broadcast", "nebkor-maelstrom", "gg-g_counter"]
|
members = ["gg-echo", "gg-uid", "gg-broadcast", "gg-g_counter"]
|
||||||
resolver = "2"
|
resolver = "2"
|
||||||
|
|
||||||
[workspace.package]
|
[workspace.package]
|
||||||
|
@ -10,4 +10,4 @@ license-file = "LICENSE.md"
|
||||||
[workspace.dependencies]
|
[workspace.dependencies]
|
||||||
serde_json = "1"
|
serde_json = "1"
|
||||||
rand = "0.8"
|
rand = "0.8"
|
||||||
|
nebkor-maelstrom = "*"
|
||||||
|
|
|
@ -7,5 +7,5 @@ license-file.workspace = true
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
serde_json.workspace = true
|
serde_json.workspace = true
|
||||||
nebkor-maelstrom = { path = "../nebkor-maelstrom" }
|
nebkor-maelstrom.workspace = true
|
||||||
rand.workspace = true
|
rand.workspace = true
|
||||||
|
|
|
@ -6,4 +6,4 @@ authors.workspace = true
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
serde_json.workspace = true
|
serde_json.workspace = true
|
||||||
nebkor-maelstrom = { path = "../nebkor-maelstrom" }
|
nebkor-maelstrom.workspace = true
|
||||||
|
|
|
@ -7,6 +7,6 @@ license-file.workspace = true
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
serde_json.workspace = true
|
serde_json.workspace = true
|
||||||
nebkor-maelstrom = { path = "../nebkor-maelstrom" }
|
nebkor-maelstrom.workspace = true
|
||||||
rand.workspace = true
|
rand.workspace = true
|
||||||
|
|
||||||
|
|
|
@ -7,4 +7,4 @@ license-file.workspace = true
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
serde_json.workspace = true
|
serde_json.workspace = true
|
||||||
nebkor-maelstrom = { path = "../nebkor-maelstrom" }
|
nebkor-maelstrom.workspace = true
|
||||||
|
|
|
@ -1,11 +0,0 @@
|
||||||
[package]
|
|
||||||
name = "nebkor-maelstrom"
|
|
||||||
edition = "2021"
|
|
||||||
version.workspace = true
|
|
||||||
authors.workspace = true
|
|
||||||
license-file.workspace = true
|
|
||||||
|
|
||||||
[dependencies]
|
|
||||||
serde_json.workspace = true
|
|
||||||
serde = { version = "1", default-features = false, features = ["derive"] }
|
|
||||||
serde_repr = "0.1"
|
|
|
@ -1,66 +0,0 @@
|
||||||
# A synchronous and simple Maelstrom crate
|
|
||||||
|
|
||||||
`nebkor-maelstreom` is a lean and simple synchronous library for writing
|
|
||||||
[Maelstrom](https://github.com/jepsen-io/maelstrom/tree/0186f398f96564a0453dda97c07daeac67c3d8d7)-compatible
|
|
||||||
distributed actors. It has three dependencies:
|
|
||||||
|
|
||||||
- serde
|
|
||||||
- serde_json
|
|
||||||
- serde_repr
|
|
||||||
|
|
||||||
For a simple example, see the [gg-echo](https://git.kittencollective.com/nebkor/chatty-catties/src/branch/main/gg-echo/src/main.rs) program:
|
|
||||||
|
|
||||||
``` rust
|
|
||||||
use nebkor_maelstrom::{Body, Message, Node, Runner};
|
|
||||||
|
|
||||||
struct Echo;
|
|
||||||
|
|
||||||
impl Node for Echo {
|
|
||||||
fn handle(&mut self, runner: &Runner, msg: Message) {
|
|
||||||
let typ = &msg.body.typ;
|
|
||||||
if typ.as_str() == "echo" {
|
|
||||||
let body = Body::from_type("echo_ok").with_payload(msg.body.payload.clone());
|
|
||||||
runner.reply(&msg, body);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn main() {
|
|
||||||
let node = Echo;
|
|
||||||
|
|
||||||
let runner = Runner::new(node);
|
|
||||||
|
|
||||||
runner.run(None);
|
|
||||||
}
|
|
||||||
```
|
|
||||||
|
|
||||||
## How to use
|
|
||||||
|
|
||||||
Create a struct and implement `nebkor_maelstrom::Node` for it, which involves a single method,
|
|
||||||
`handle(&mut self, &Runner, Message)`. This method is passed a `Runner` which contains methods like
|
|
||||||
`send`, `reply`, and `rpc`.
|
|
||||||
|
|
||||||
In your main function, instantiate that struct and pass that into `Runner::new()` to get a
|
|
||||||
Runner. The `run()` method takes an optional closure that will be run when the `init` Message is
|
|
||||||
received; see the
|
|
||||||
[broadcast](https://git.kittencollective.com/nebkor/chatty-catties/src/commit/fff7fdc2d52f7d4c2d4d9c581ea16cdf0e1e3f30/gg-broadcast/src/main.rs#L18-L33)
|
|
||||||
program for an example of that, where it spawns a thread to send periodic messages to the node.
|
|
||||||
|
|
||||||
## Design considerations
|
|
||||||
|
|
||||||
I wanted the client code to be as simple as possible, with the least amount of boilerplate. Using
|
|
||||||
`&mut self` as the receiver for the `handle()` method lets you easily mutate state in your node if
|
|
||||||
you need to, without the ceremony of `Rc<Mutex<>>` and the like. Eschewing `async` results in an
|
|
||||||
order of magnitude fewer dependencies, and the entire workspace (crate and clients) can be compiled
|
|
||||||
in a couple seconds.
|
|
||||||
|
|
||||||
## Acknowledgments
|
|
||||||
|
|
||||||
I straight-up stole the design of the IO/network system from
|
|
||||||
[Maelbreaker](https://github.com/rafibayer/maelbreaker/), which allowed me to get a working RPC
|
|
||||||
call. Thanks!
|
|
||||||
|
|
||||||
|
|
||||||
## TODO
|
|
||||||
|
|
||||||
- add error handling.
|
|
|
@ -1,59 +0,0 @@
|
||||||
use serde_json::Value;
|
|
||||||
|
|
||||||
use crate::{check_err, mk_payload, Body, RpcResult, Runner};
|
|
||||||
|
|
||||||
#[derive(Debug, Default, Clone)]
|
|
||||||
pub struct Kv {
|
|
||||||
pub service: &'static str,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Kv {
|
|
||||||
pub fn seq() -> Self {
|
|
||||||
Kv { service: "seq-kv" }
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn lin() -> Self {
|
|
||||||
Kv { service: "lin-kv" }
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn lww() -> Self {
|
|
||||||
Kv { service: "lww-kv" }
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn read(&self, runner: &Runner, key: &str) -> RpcResult {
|
|
||||||
let payload = mk_payload(&[("key", key.into())]);
|
|
||||||
let body = Body::from_type("read").with_payload(payload);
|
|
||||||
let rx = runner.rpc(self.service, body);
|
|
||||||
let msg = rx.recv().unwrap();
|
|
||||||
check_err(&msg)?;
|
|
||||||
Ok(Some(msg.body.payload.get("value").unwrap().to_owned()))
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn write(&self, runner: &Runner, key: &str, val: Value) -> RpcResult {
|
|
||||||
let payload = mk_payload(&[("key", key.into()), ("value", val)]);
|
|
||||||
let body = Body::from_type("write").with_payload(payload);
|
|
||||||
let msg = runner.rpc(self.service, body).recv().unwrap();
|
|
||||||
check_err(&msg)?;
|
|
||||||
Ok(None)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn cas(
|
|
||||||
&self,
|
|
||||||
runner: &Runner,
|
|
||||||
key: &str,
|
|
||||||
from: Value,
|
|
||||||
to: Value,
|
|
||||||
create: bool,
|
|
||||||
) -> RpcResult {
|
|
||||||
let payload = mk_payload(&[
|
|
||||||
("key", key.into()),
|
|
||||||
("from", from),
|
|
||||||
("to", to),
|
|
||||||
("create_if_not_exists", create.into()),
|
|
||||||
]);
|
|
||||||
let body = Body::from_type("cas").with_payload(payload);
|
|
||||||
let msg = runner.rpc(self.service, body).recv().unwrap();
|
|
||||||
check_err(&msg)?;
|
|
||||||
Ok(None)
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,210 +0,0 @@
|
||||||
use std::{
|
|
||||||
collections::HashMap,
|
|
||||||
io::{BufRead, Write},
|
|
||||||
sync::{
|
|
||||||
atomic::{AtomicU64, Ordering},
|
|
||||||
mpsc::{channel, Receiver, Sender},
|
|
||||||
Arc, Mutex, OnceLock,
|
|
||||||
},
|
|
||||||
thread::{self},
|
|
||||||
};
|
|
||||||
|
|
||||||
use serde_json::Value;
|
|
||||||
|
|
||||||
pub mod protocol;
|
|
||||||
pub use protocol::{Body, ErrorCode, Message, Payload};
|
|
||||||
|
|
||||||
pub mod kv;
|
|
||||||
|
|
||||||
pub type NodeyNodeFace = Arc<Mutex<dyn Node>>;
|
|
||||||
pub type OnInit = Box<dyn Fn(&Runner)>;
|
|
||||||
pub type RpcPromise = Receiver<Message>;
|
|
||||||
pub type RpcResult = std::result::Result<Option<Value>, ErrorCode>;
|
|
||||||
|
|
||||||
pub trait Node {
|
|
||||||
fn handle(&mut self, runner: &Runner, msg: Message);
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct Runner {
|
|
||||||
node: NodeyNodeFace,
|
|
||||||
node_id: OnceLock<String>,
|
|
||||||
nodes: OnceLock<Vec<String>>,
|
|
||||||
backdoor: OnceLock<Sender<Message>>,
|
|
||||||
promises: Arc<Mutex<HashMap<u64, Sender<Message>>>>,
|
|
||||||
outbound_tx: OnceLock<Sender<Message>>,
|
|
||||||
msg_id: AtomicU64,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Runner {
|
|
||||||
pub fn new<N: Node + 'static>(node: N) -> Self {
|
|
||||||
let node = Arc::new(Mutex::new(node));
|
|
||||||
Runner {
|
|
||||||
node,
|
|
||||||
nodes: OnceLock::new(),
|
|
||||||
node_id: OnceLock::new(),
|
|
||||||
backdoor: OnceLock::new(),
|
|
||||||
outbound_tx: OnceLock::new(),
|
|
||||||
promises: Default::default(),
|
|
||||||
msg_id: AtomicU64::new(1),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn run(&self, on_init: Option<OnInit>) {
|
|
||||||
let (stdin_tx, stdin_rx) = channel();
|
|
||||||
thread::spawn(move || {
|
|
||||||
let stdin = std::io::stdin().lock().lines();
|
|
||||||
for line in stdin {
|
|
||||||
stdin_tx.send(line.unwrap()).unwrap();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
let (stdout_tx, stdout_rx) = channel();
|
|
||||||
thread::spawn(move || {
|
|
||||||
let mut stdout = std::io::stdout().lock();
|
|
||||||
for msg in stdout_rx {
|
|
||||||
writeln!(&mut stdout, "{msg}").unwrap();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
let (outbound_tx, outbound_rx) = channel();
|
|
||||||
let _ = self.outbound_tx.get_or_init(|| outbound_tx);
|
|
||||||
thread::spawn(move || {
|
|
||||||
while let Ok(msg) = outbound_rx.recv() {
|
|
||||||
let msg = serde_json::to_string(&msg).unwrap();
|
|
||||||
stdout_tx.send(msg).unwrap();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
self.process_input(stdin_rx, on_init);
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn get_backdoor(&self) -> Sender<Message> {
|
|
||||||
self.backdoor.get().unwrap().clone()
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn node_id(&self) -> String {
|
|
||||||
self.node_id.get().cloned().unwrap_or_default()
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn next_msg_id(&self) -> u64 {
|
|
||||||
self.msg_id.fetch_add(1, Ordering::SeqCst)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn cur_msg_id(&self) -> u64 {
|
|
||||||
self.msg_id.load(Ordering::SeqCst)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn nodes(&self) -> &[String] {
|
|
||||||
self.nodes.get().unwrap()
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn reply(&self, req: &Message, body: Body) {
|
|
||||||
let mut body = body;
|
|
||||||
let dest = req.src.as_str();
|
|
||||||
let in_reply_to = req.body.msg_id;
|
|
||||||
body.in_reply_to = in_reply_to;
|
|
||||||
self.send(dest, body);
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn send(&self, dest: &str, body: Body) {
|
|
||||||
let msg = self.mk_msg(dest, body);
|
|
||||||
self.outbound_tx.get().unwrap().send(msg).unwrap();
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn rpc(&self, dest: &str, body: Body) -> RpcPromise {
|
|
||||||
let msg = self.mk_msg(dest, body);
|
|
||||||
let (tx, rx) = channel();
|
|
||||||
{
|
|
||||||
let msg_id = msg.body.msg_id;
|
|
||||||
let mut g = self.promises.lock().unwrap();
|
|
||||||
g.insert(msg_id, tx);
|
|
||||||
}
|
|
||||||
self.outbound_tx.get().unwrap().send(msg).unwrap();
|
|
||||||
rx
|
|
||||||
}
|
|
||||||
|
|
||||||
// internal methods
|
|
||||||
fn init(&self, msg: &Message) {
|
|
||||||
let node_id = msg
|
|
||||||
.body
|
|
||||||
.payload
|
|
||||||
.get("node_id")
|
|
||||||
.unwrap()
|
|
||||||
.as_str()
|
|
||||||
.unwrap()
|
|
||||||
.to_owned();
|
|
||||||
|
|
||||||
let nodes = msg
|
|
||||||
.body
|
|
||||||
.payload
|
|
||||||
.get("node_ids")
|
|
||||||
.unwrap()
|
|
||||||
.as_array()
|
|
||||||
.unwrap()
|
|
||||||
.iter()
|
|
||||||
.map(|s| s.as_str().unwrap().to_string())
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
let _ = self.node_id.get_or_init(|| node_id);
|
|
||||||
let _ = self.nodes.get_or_init(|| nodes);
|
|
||||||
}
|
|
||||||
|
|
||||||
fn process_input(&self, stdin_rx: Receiver<String>, on_init: Option<OnInit>) {
|
|
||||||
let (json_tx, json_rx) = channel();
|
|
||||||
let _ = self.backdoor.get_or_init(|| json_tx.clone());
|
|
||||||
let proms = self.promises.clone();
|
|
||||||
thread::spawn(move || {
|
|
||||||
for line in stdin_rx {
|
|
||||||
let msg: Message = serde_json::from_str(&line).unwrap();
|
|
||||||
let irt = msg.body.in_reply_to;
|
|
||||||
if let Some(tx) = proms.lock().unwrap().remove(&irt) {
|
|
||||||
tx.send(msg).unwrap();
|
|
||||||
} else {
|
|
||||||
json_tx.send(msg).unwrap();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
let msg = json_rx.recv().unwrap();
|
|
||||||
{
|
|
||||||
self.init(&msg);
|
|
||||||
let body = Body::from_type("init_ok");
|
|
||||||
self.reply(&msg, body);
|
|
||||||
if let Some(on_init) = on_init {
|
|
||||||
on_init(self);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut node = self.node.lock().unwrap();
|
|
||||||
for msg in json_rx {
|
|
||||||
node.handle(self, msg);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn mk_msg(&self, dest: &str, body: Body) -> Message {
|
|
||||||
let mut body = body;
|
|
||||||
if body.msg_id == 0 {
|
|
||||||
body.msg_id = self.next_msg_id();
|
|
||||||
}
|
|
||||||
Message {
|
|
||||||
src: self.node_id().to_string(),
|
|
||||||
dest: dest.to_string(),
|
|
||||||
body,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn check_err(msg: &Message) -> RpcResult {
|
|
||||||
if msg.body.typ.as_str() == "error" {
|
|
||||||
let error = msg.body.code.unwrap();
|
|
||||||
return Err(error);
|
|
||||||
}
|
|
||||||
Ok(None)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn mk_payload(payload: &[(&str, Value)]) -> Payload {
|
|
||||||
payload
|
|
||||||
.iter()
|
|
||||||
.map(|p| (p.0.to_string(), p.1.clone()))
|
|
||||||
.collect()
|
|
||||||
}
|
|
|
@ -1,118 +0,0 @@
|
||||||
use serde::{Deserialize, Serialize};
|
|
||||||
use serde_json::{Map, Value};
|
|
||||||
use serde_repr::{Deserialize_repr, Serialize_repr};
|
|
||||||
|
|
||||||
pub type Payload = Map<String, Value>;
|
|
||||||
|
|
||||||
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
|
|
||||||
pub struct Message {
|
|
||||||
pub src: String,
|
|
||||||
pub dest: String,
|
|
||||||
pub body: Body,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
|
|
||||||
pub struct Body {
|
|
||||||
#[serde(rename = "type")]
|
|
||||||
pub typ: String,
|
|
||||||
#[serde(default, skip_serializing_if = "u64_zero_by_ref")]
|
|
||||||
pub msg_id: u64,
|
|
||||||
#[serde(default, skip_serializing_if = "u64_zero_by_ref")]
|
|
||||||
pub in_reply_to: u64,
|
|
||||||
#[serde(flatten)]
|
|
||||||
pub payload: Payload,
|
|
||||||
// the following are for the case of errors
|
|
||||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
|
||||||
pub code: Option<ErrorCode>,
|
|
||||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
|
||||||
pub text: Option<String>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Body {
|
|
||||||
pub fn from_type(typ: &str) -> Self {
|
|
||||||
Body {
|
|
||||||
typ: typ.to_string(),
|
|
||||||
..Default::default()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn with_msg_id(self, msg_id: u64) -> Self {
|
|
||||||
let mut b = self;
|
|
||||||
b.msg_id = msg_id;
|
|
||||||
b
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn with_in_reply_to(self, in_reply_to: u64) -> Self {
|
|
||||||
let mut b = self;
|
|
||||||
b.in_reply_to = in_reply_to;
|
|
||||||
b
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn with_payload(self, payload: Payload) -> Self {
|
|
||||||
let mut b = self;
|
|
||||||
b.payload = payload;
|
|
||||||
b
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn init_ok(msg_id: u64, in_reply_to: u64) -> Body {
|
|
||||||
Body::from_type("init_ok")
|
|
||||||
.with_msg_id(msg_id)
|
|
||||||
.with_in_reply_to(in_reply_to)
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
|
|
||||||
#[serde(untagged)]
|
|
||||||
pub enum ErrorCode {
|
|
||||||
Definite(DefiniteError),
|
|
||||||
Indefinite(IndefiniteError),
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Clone, Copy, Serialize_repr, Deserialize_repr)]
|
|
||||||
#[repr(u64)]
|
|
||||||
pub enum DefiniteError {
|
|
||||||
NodeNotFound = 2,
|
|
||||||
NotSupported = 10,
|
|
||||||
TemporarilyUnavailable = 11,
|
|
||||||
MalformedRequest = 12,
|
|
||||||
Abort = 14,
|
|
||||||
KeyNotFound = 20,
|
|
||||||
KeyAlreadyExists = 21,
|
|
||||||
PreconditionFailed = 22,
|
|
||||||
TxnConflict = 30,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Clone, Copy, Serialize_repr, Deserialize_repr)]
|
|
||||||
#[repr(u64)]
|
|
||||||
pub enum IndefiniteError {
|
|
||||||
Timeout = 0,
|
|
||||||
Crash = 13,
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn error(msg_id: u64, in_reply_to: u64, code: ErrorCode, text: Option<&str>) -> Body {
|
|
||||||
Body {
|
|
||||||
typ: "error".to_string(),
|
|
||||||
msg_id,
|
|
||||||
in_reply_to,
|
|
||||||
code: Some(code),
|
|
||||||
text: text.map(|t| t.to_string()),
|
|
||||||
payload: Default::default(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[allow(clippy::trivially_copy_pass_by_ref)]
|
|
||||||
fn u64_zero_by_ref(num: &u64) -> bool {
|
|
||||||
*num == 0
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
mod test {
|
|
||||||
use super::*;
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn error_codes() {
|
|
||||||
let ec = ErrorCode::Definite(DefiniteError::Abort);
|
|
||||||
let e = serde_json::to_string(&ec).unwrap();
|
|
||||||
assert_eq!(&e, "14");
|
|
||||||
}
|
|
||||||
}
|
|
Loading…
Reference in a new issue