Compare commits
3 commits
e5150af666
...
067a03b918
Author | SHA1 | Date | |
---|---|---|---|
|
067a03b918 | ||
|
ad1b8ccb31 | ||
|
0c4496ae07 |
6 changed files with 37 additions and 34 deletions
|
@ -1,4 +1,4 @@
|
|||
imports_granularity = "Crate"
|
||||
group_imports = "StdExternalCrate"
|
||||
wrap_comments = true
|
||||
edition = "2021"
|
||||
edition = "2024"
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
[package]
|
||||
name = "nebkor-maelstrom"
|
||||
edition = "2021"
|
||||
version = "1.0.0"
|
||||
edition = "2024"
|
||||
version = "1.1.0"
|
||||
license-file = "LICENSE.md"
|
||||
readme = "README.md"
|
||||
description = "An easy-to-use and synchronous client for creating Maelstrom distributed clients."
|
||||
|
|
12
README.md
12
README.md
|
@ -7,8 +7,8 @@ distributed actors. It has three dependencies:
|
|||
- serde
|
||||
- serde_json
|
||||
- serde_repr
|
||||
|
||||
For a simple example, see the [echo](https://git.kittencollective.com/nebkor/nebkor-maelstrom/src/branch/main/examples/echo.rs) example:
|
||||
|
||||
Here's a simple and complete example that implements the single node [echo](https://fly.io/dist-sys/1/) challenge:
|
||||
|
||||
``` rust
|
||||
use nebkor_maelstrom::{Body, Message, Node, Runner};
|
||||
|
@ -55,7 +55,7 @@ much away.
|
|||
|
||||
Create a struct and implement `nebkor_maelstrom::Node` for it, which involves a single method,
|
||||
`handle(&mut self, &Runner, Message)`. This method is passed a `Runner` which contains methods like
|
||||
`send`, `reply`, and `rpc`.
|
||||
`send()`, `reply()`, and `rpc()`.
|
||||
|
||||
In your main function, instantiate that struct and pass that into `Runner::new()` to get a
|
||||
Runner. The `run()` method takes an optional callback that will be run when the `init` Message is
|
||||
|
@ -83,6 +83,6 @@ diving into its source from your IDE or browser.
|
|||
|
||||
## Acknowledgments
|
||||
|
||||
I straight-up stole the design of the IO/network system from
|
||||
[Maelbreaker](https://github.com/rafibayer/maelbreaker/), which allowed me to get a working RPC
|
||||
call. Thanks! And thanks to Nicole for nudging me to publish this.
|
||||
I straight-up stole the initial design of the IO/network system from
|
||||
[Maelbreaker](https://github.com/rafibayer/maelbreaker/), which allowed me to get working RPC
|
||||
calls; thanks! And thanks to Nicole for nudging me to publish this.
|
||||
|
|
|
@ -1,6 +1,4 @@
|
|||
use crate::Value;
|
||||
|
||||
use crate::{check_err, mk_payload, Body, RpcResult, Runner};
|
||||
use crate::{Body, RpcResult, Runner, Value, check_err, mk_payload};
|
||||
|
||||
/// A convenient way to do RPC with Maelstrom's KV services.
|
||||
#[derive(Debug, Default, Clone)]
|
||||
|
@ -37,7 +35,8 @@ impl Kv {
|
|||
/// let kv = Kv::seq();
|
||||
/// let result = kv.read(&runner, "MY_KEY");
|
||||
/// // if "MY_KEY" had previously been written into the store, then the Result will be `Ok`
|
||||
/// // and the body will always be `Some(Value)`.
|
||||
/// // and the body will always be `Some(Value)`; otherwise, it will be an `Err`, probably
|
||||
/// // an `ErrorCode::Definite(DefiniteError::KeyNotFound)`.
|
||||
/// assert!(result.is_ok() && result.unwrap().is_some());
|
||||
/// ```
|
||||
pub fn read(&self, runner: &Runner, key: &str) -> RpcResult {
|
||||
|
|
39
src/lib.rs
39
src/lib.rs
|
@ -2,9 +2,9 @@ use std::{
|
|||
collections::HashMap,
|
||||
io::{BufRead, Write},
|
||||
sync::{
|
||||
atomic::{AtomicU64, Ordering},
|
||||
mpsc::{channel, Receiver, Sender},
|
||||
Arc, Mutex, OnceLock,
|
||||
atomic::{AtomicU64, Ordering},
|
||||
mpsc::{Receiver, Sender, channel},
|
||||
},
|
||||
thread::{self},
|
||||
};
|
||||
|
@ -18,7 +18,7 @@ pub mod kv;
|
|||
|
||||
pub type NodeyNodeFace = Arc<Mutex<dyn Node>>;
|
||||
pub type OnInit = Box<dyn Fn(&Runner)>;
|
||||
pub type RpcPromise = Receiver<Message>;
|
||||
pub type RpcFuture = Receiver<Message>;
|
||||
pub type RpcResult = std::result::Result<Option<Value>, ErrorCode>;
|
||||
|
||||
pub trait Node {
|
||||
|
@ -49,14 +49,15 @@ impl Runner {
|
|||
}
|
||||
}
|
||||
|
||||
/// Start processing messages from stdin and sending them to your node. The `on_init` argument
|
||||
/// is an optional callback that will be called with `&self` after the `init` message from
|
||||
/// Maelstrom has been processed.
|
||||
/// Start processing messages from stdin and sending them to your node. The
|
||||
/// `on_init` argument is an optional callback that will be called with
|
||||
/// `&self` after the `init` message from Maelstrom has been processed.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```no_run
|
||||
/// use nebkor_maelstrom::{Body, Message, Node, Runner};
|
||||
///
|
||||
/// struct Foo;
|
||||
/// impl Node for Foo {fn handle(&mut self, _runner: &Runner, _msg: Message) { /* empty impl */ }}
|
||||
///
|
||||
|
@ -88,9 +89,10 @@ impl Runner {
|
|||
self.process_input(on_init);
|
||||
}
|
||||
|
||||
/// Get a Sender that will send Messages to the node as input. Useful for triggering periodic
|
||||
/// behavior from a separate thread, or for sending a Message to the node from `on_init`. See
|
||||
/// the `broadcast` example for a use of it.
|
||||
/// Get a Sender that will send Messages to the node as input. Useful for
|
||||
/// triggering periodic behavior from a separate thread, or for sending
|
||||
/// a Message to the node from `on_init`. See the `broadcast` example
|
||||
/// for a use of it.
|
||||
pub fn get_backdoor(&self) -> Sender<Message> {
|
||||
self.backdoor.get().unwrap().clone()
|
||||
}
|
||||
|
@ -123,14 +125,15 @@ impl Runner {
|
|||
self.outbound_tx.get().unwrap().send(msg).unwrap();
|
||||
}
|
||||
|
||||
/// Returns a Receiver<Message> that will receive the reply from the request.
|
||||
pub fn rpc(&self, dest: &str, body: Body) -> RpcPromise {
|
||||
/// Returns a Receiver<Message> that will receive the reply from the
|
||||
/// request.
|
||||
pub fn rpc(&self, dest: &str, body: Body) -> RpcFuture {
|
||||
let msg = self.mk_msg(dest, body);
|
||||
let (tx, rx) = channel();
|
||||
{
|
||||
let msg_id = msg.body.msg_id;
|
||||
let mut g = self.promises.lock().unwrap();
|
||||
g.insert(msg_id, tx);
|
||||
let mut promises = self.promises.lock().unwrap();
|
||||
promises.insert(msg_id, tx);
|
||||
}
|
||||
self.outbound_tx.get().unwrap().send(msg).unwrap();
|
||||
rx
|
||||
|
@ -164,8 +167,8 @@ impl Runner {
|
|||
|
||||
fn process_input(&self, on_init: Option<OnInit>) {
|
||||
// for sending Messages to the node's inputs
|
||||
let (encoded_input_tx, encoded_input_rx) = channel();
|
||||
let _ = self.backdoor.get_or_init(|| encoded_input_tx.clone());
|
||||
let (decoded_input_tx, decoded_input_rx) = channel();
|
||||
let _ = self.backdoor.get_or_init(|| decoded_input_tx.clone());
|
||||
|
||||
// decouple stdin from processing
|
||||
let proms = self.promises.clone();
|
||||
|
@ -179,13 +182,13 @@ impl Runner {
|
|||
promise.send(msg).unwrap();
|
||||
} else {
|
||||
// just let the node's `handle()` method handle it
|
||||
encoded_input_tx.send(msg).unwrap();
|
||||
decoded_input_tx.send(msg).unwrap();
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// first Message is always `init`:
|
||||
let msg = encoded_input_rx.recv().unwrap();
|
||||
let msg = decoded_input_rx.recv().unwrap();
|
||||
{
|
||||
self.init(&msg);
|
||||
let body = Body::from_type("init_ok");
|
||||
|
@ -197,7 +200,7 @@ impl Runner {
|
|||
|
||||
// every other message is for the node's handle() method
|
||||
let mut node = self.node.lock().unwrap();
|
||||
for msg in encoded_input_rx {
|
||||
for msg in decoded_input_rx {
|
||||
node.handle(self, msg);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -52,9 +52,10 @@ impl Message {
|
|||
m
|
||||
}
|
||||
|
||||
/// The Maelstrom type of a Body (and hence a Message) is just a string. This is for the sake of
|
||||
/// ease of use for doing the Gossip Glomers challenges; this crate is not meant to be a real
|
||||
/// network client framework.
|
||||
/// The Maelstrom type of a Body (and hence a Message) is just a string.
|
||||
/// This is for the sake of ease of use for doing the Gossip Glomers
|
||||
/// challenges; this crate is not meant to be a real network client
|
||||
/// framework.
|
||||
pub fn typ(&self) -> &str {
|
||||
self.body.typ.as_str()
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue