Compare commits
3 commits
e5150af666
...
067a03b918
Author | SHA1 | Date | |
---|---|---|---|
|
067a03b918 | ||
|
ad1b8ccb31 | ||
|
0c4496ae07 |
6 changed files with 37 additions and 34 deletions
|
@ -1,4 +1,4 @@
|
||||||
imports_granularity = "Crate"
|
imports_granularity = "Crate"
|
||||||
group_imports = "StdExternalCrate"
|
group_imports = "StdExternalCrate"
|
||||||
wrap_comments = true
|
wrap_comments = true
|
||||||
edition = "2021"
|
edition = "2024"
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
[package]
|
[package]
|
||||||
name = "nebkor-maelstrom"
|
name = "nebkor-maelstrom"
|
||||||
edition = "2021"
|
edition = "2024"
|
||||||
version = "1.0.0"
|
version = "1.1.0"
|
||||||
license-file = "LICENSE.md"
|
license-file = "LICENSE.md"
|
||||||
readme = "README.md"
|
readme = "README.md"
|
||||||
description = "An easy-to-use and synchronous client for creating Maelstrom distributed clients."
|
description = "An easy-to-use and synchronous client for creating Maelstrom distributed clients."
|
||||||
|
|
12
README.md
12
README.md
|
@ -7,8 +7,8 @@ distributed actors. It has three dependencies:
|
||||||
- serde
|
- serde
|
||||||
- serde_json
|
- serde_json
|
||||||
- serde_repr
|
- serde_repr
|
||||||
|
|
||||||
For a simple example, see the [echo](https://git.kittencollective.com/nebkor/nebkor-maelstrom/src/branch/main/examples/echo.rs) example:
|
Here's a simple and complete example that implements the single node [echo](https://fly.io/dist-sys/1/) challenge:
|
||||||
|
|
||||||
``` rust
|
``` rust
|
||||||
use nebkor_maelstrom::{Body, Message, Node, Runner};
|
use nebkor_maelstrom::{Body, Message, Node, Runner};
|
||||||
|
@ -55,7 +55,7 @@ much away.
|
||||||
|
|
||||||
Create a struct and implement `nebkor_maelstrom::Node` for it, which involves a single method,
|
Create a struct and implement `nebkor_maelstrom::Node` for it, which involves a single method,
|
||||||
`handle(&mut self, &Runner, Message)`. This method is passed a `Runner` which contains methods like
|
`handle(&mut self, &Runner, Message)`. This method is passed a `Runner` which contains methods like
|
||||||
`send`, `reply`, and `rpc`.
|
`send()`, `reply()`, and `rpc()`.
|
||||||
|
|
||||||
In your main function, instantiate that struct and pass that into `Runner::new()` to get a
|
In your main function, instantiate that struct and pass that into `Runner::new()` to get a
|
||||||
Runner. The `run()` method takes an optional callback that will be run when the `init` Message is
|
Runner. The `run()` method takes an optional callback that will be run when the `init` Message is
|
||||||
|
@ -83,6 +83,6 @@ diving into its source from your IDE or browser.
|
||||||
|
|
||||||
## Acknowledgments
|
## Acknowledgments
|
||||||
|
|
||||||
I straight-up stole the design of the IO/network system from
|
I straight-up stole the initial design of the IO/network system from
|
||||||
[Maelbreaker](https://github.com/rafibayer/maelbreaker/), which allowed me to get a working RPC
|
[Maelbreaker](https://github.com/rafibayer/maelbreaker/), which allowed me to get working RPC
|
||||||
call. Thanks! And thanks to Nicole for nudging me to publish this.
|
calls; thanks! And thanks to Nicole for nudging me to publish this.
|
||||||
|
|
|
@ -1,6 +1,4 @@
|
||||||
use crate::Value;
|
use crate::{Body, RpcResult, Runner, Value, check_err, mk_payload};
|
||||||
|
|
||||||
use crate::{check_err, mk_payload, Body, RpcResult, Runner};
|
|
||||||
|
|
||||||
/// A convenient way to do RPC with Maelstrom's KV services.
|
/// A convenient way to do RPC with Maelstrom's KV services.
|
||||||
#[derive(Debug, Default, Clone)]
|
#[derive(Debug, Default, Clone)]
|
||||||
|
@ -37,7 +35,8 @@ impl Kv {
|
||||||
/// let kv = Kv::seq();
|
/// let kv = Kv::seq();
|
||||||
/// let result = kv.read(&runner, "MY_KEY");
|
/// let result = kv.read(&runner, "MY_KEY");
|
||||||
/// // if "MY_KEY" had previously been written into the store, then the Result will be `Ok`
|
/// // if "MY_KEY" had previously been written into the store, then the Result will be `Ok`
|
||||||
/// // and the body will always be `Some(Value)`.
|
/// // and the body will always be `Some(Value)`; otherwise, it will be an `Err`, probably
|
||||||
|
/// // an `ErrorCode::Definite(DefiniteError::KeyNotFound)`.
|
||||||
/// assert!(result.is_ok() && result.unwrap().is_some());
|
/// assert!(result.is_ok() && result.unwrap().is_some());
|
||||||
/// ```
|
/// ```
|
||||||
pub fn read(&self, runner: &Runner, key: &str) -> RpcResult {
|
pub fn read(&self, runner: &Runner, key: &str) -> RpcResult {
|
||||||
|
|
39
src/lib.rs
39
src/lib.rs
|
@ -2,9 +2,9 @@ use std::{
|
||||||
collections::HashMap,
|
collections::HashMap,
|
||||||
io::{BufRead, Write},
|
io::{BufRead, Write},
|
||||||
sync::{
|
sync::{
|
||||||
atomic::{AtomicU64, Ordering},
|
|
||||||
mpsc::{channel, Receiver, Sender},
|
|
||||||
Arc, Mutex, OnceLock,
|
Arc, Mutex, OnceLock,
|
||||||
|
atomic::{AtomicU64, Ordering},
|
||||||
|
mpsc::{Receiver, Sender, channel},
|
||||||
},
|
},
|
||||||
thread::{self},
|
thread::{self},
|
||||||
};
|
};
|
||||||
|
@ -18,7 +18,7 @@ pub mod kv;
|
||||||
|
|
||||||
pub type NodeyNodeFace = Arc<Mutex<dyn Node>>;
|
pub type NodeyNodeFace = Arc<Mutex<dyn Node>>;
|
||||||
pub type OnInit = Box<dyn Fn(&Runner)>;
|
pub type OnInit = Box<dyn Fn(&Runner)>;
|
||||||
pub type RpcPromise = Receiver<Message>;
|
pub type RpcFuture = Receiver<Message>;
|
||||||
pub type RpcResult = std::result::Result<Option<Value>, ErrorCode>;
|
pub type RpcResult = std::result::Result<Option<Value>, ErrorCode>;
|
||||||
|
|
||||||
pub trait Node {
|
pub trait Node {
|
||||||
|
@ -49,14 +49,15 @@ impl Runner {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Start processing messages from stdin and sending them to your node. The `on_init` argument
|
/// Start processing messages from stdin and sending them to your node. The
|
||||||
/// is an optional callback that will be called with `&self` after the `init` message from
|
/// `on_init` argument is an optional callback that will be called with
|
||||||
/// Maelstrom has been processed.
|
/// `&self` after the `init` message from Maelstrom has been processed.
|
||||||
///
|
///
|
||||||
/// # Examples
|
/// # Examples
|
||||||
///
|
///
|
||||||
/// ```no_run
|
/// ```no_run
|
||||||
/// use nebkor_maelstrom::{Body, Message, Node, Runner};
|
/// use nebkor_maelstrom::{Body, Message, Node, Runner};
|
||||||
|
///
|
||||||
/// struct Foo;
|
/// struct Foo;
|
||||||
/// impl Node for Foo {fn handle(&mut self, _runner: &Runner, _msg: Message) { /* empty impl */ }}
|
/// impl Node for Foo {fn handle(&mut self, _runner: &Runner, _msg: Message) { /* empty impl */ }}
|
||||||
///
|
///
|
||||||
|
@ -88,9 +89,10 @@ impl Runner {
|
||||||
self.process_input(on_init);
|
self.process_input(on_init);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get a Sender that will send Messages to the node as input. Useful for triggering periodic
|
/// Get a Sender that will send Messages to the node as input. Useful for
|
||||||
/// behavior from a separate thread, or for sending a Message to the node from `on_init`. See
|
/// triggering periodic behavior from a separate thread, or for sending
|
||||||
/// the `broadcast` example for a use of it.
|
/// a Message to the node from `on_init`. See the `broadcast` example
|
||||||
|
/// for a use of it.
|
||||||
pub fn get_backdoor(&self) -> Sender<Message> {
|
pub fn get_backdoor(&self) -> Sender<Message> {
|
||||||
self.backdoor.get().unwrap().clone()
|
self.backdoor.get().unwrap().clone()
|
||||||
}
|
}
|
||||||
|
@ -123,14 +125,15 @@ impl Runner {
|
||||||
self.outbound_tx.get().unwrap().send(msg).unwrap();
|
self.outbound_tx.get().unwrap().send(msg).unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns a Receiver<Message> that will receive the reply from the request.
|
/// Returns a Receiver<Message> that will receive the reply from the
|
||||||
pub fn rpc(&self, dest: &str, body: Body) -> RpcPromise {
|
/// request.
|
||||||
|
pub fn rpc(&self, dest: &str, body: Body) -> RpcFuture {
|
||||||
let msg = self.mk_msg(dest, body);
|
let msg = self.mk_msg(dest, body);
|
||||||
let (tx, rx) = channel();
|
let (tx, rx) = channel();
|
||||||
{
|
{
|
||||||
let msg_id = msg.body.msg_id;
|
let msg_id = msg.body.msg_id;
|
||||||
let mut g = self.promises.lock().unwrap();
|
let mut promises = self.promises.lock().unwrap();
|
||||||
g.insert(msg_id, tx);
|
promises.insert(msg_id, tx);
|
||||||
}
|
}
|
||||||
self.outbound_tx.get().unwrap().send(msg).unwrap();
|
self.outbound_tx.get().unwrap().send(msg).unwrap();
|
||||||
rx
|
rx
|
||||||
|
@ -164,8 +167,8 @@ impl Runner {
|
||||||
|
|
||||||
fn process_input(&self, on_init: Option<OnInit>) {
|
fn process_input(&self, on_init: Option<OnInit>) {
|
||||||
// for sending Messages to the node's inputs
|
// for sending Messages to the node's inputs
|
||||||
let (encoded_input_tx, encoded_input_rx) = channel();
|
let (decoded_input_tx, decoded_input_rx) = channel();
|
||||||
let _ = self.backdoor.get_or_init(|| encoded_input_tx.clone());
|
let _ = self.backdoor.get_or_init(|| decoded_input_tx.clone());
|
||||||
|
|
||||||
// decouple stdin from processing
|
// decouple stdin from processing
|
||||||
let proms = self.promises.clone();
|
let proms = self.promises.clone();
|
||||||
|
@ -179,13 +182,13 @@ impl Runner {
|
||||||
promise.send(msg).unwrap();
|
promise.send(msg).unwrap();
|
||||||
} else {
|
} else {
|
||||||
// just let the node's `handle()` method handle it
|
// just let the node's `handle()` method handle it
|
||||||
encoded_input_tx.send(msg).unwrap();
|
decoded_input_tx.send(msg).unwrap();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
// first Message is always `init`:
|
// first Message is always `init`:
|
||||||
let msg = encoded_input_rx.recv().unwrap();
|
let msg = decoded_input_rx.recv().unwrap();
|
||||||
{
|
{
|
||||||
self.init(&msg);
|
self.init(&msg);
|
||||||
let body = Body::from_type("init_ok");
|
let body = Body::from_type("init_ok");
|
||||||
|
@ -197,7 +200,7 @@ impl Runner {
|
||||||
|
|
||||||
// every other message is for the node's handle() method
|
// every other message is for the node's handle() method
|
||||||
let mut node = self.node.lock().unwrap();
|
let mut node = self.node.lock().unwrap();
|
||||||
for msg in encoded_input_rx {
|
for msg in decoded_input_rx {
|
||||||
node.handle(self, msg);
|
node.handle(self, msg);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -52,9 +52,10 @@ impl Message {
|
||||||
m
|
m
|
||||||
}
|
}
|
||||||
|
|
||||||
/// The Maelstrom type of a Body (and hence a Message) is just a string. This is for the sake of
|
/// The Maelstrom type of a Body (and hence a Message) is just a string.
|
||||||
/// ease of use for doing the Gossip Glomers challenges; this crate is not meant to be a real
|
/// This is for the sake of ease of use for doing the Gossip Glomers
|
||||||
/// network client framework.
|
/// challenges; this crate is not meant to be a real network client
|
||||||
|
/// framework.
|
||||||
pub fn typ(&self) -> &str {
|
pub fn typ(&self) -> &str {
|
||||||
self.body.typ.as_str()
|
self.body.typ.as_str()
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue