Compare commits

..

3 commits

Author SHA1 Message Date
Joe Ardent
067a03b918 tweak the docs a bit 2025-07-16 14:49:38 -07:00
Joe Ardent
ad1b8ccb31 rename 'promises' to 'futures' for rpc returns 2025-07-16 14:25:36 -07:00
Joe Ardent
0c4496ae07 rename 'encoded_message' to 'decoded_message' 2024-11-10 12:11:58 -08:00
6 changed files with 37 additions and 34 deletions

View file

@ -1,4 +1,4 @@
imports_granularity = "Crate"
group_imports = "StdExternalCrate"
wrap_comments = true
edition = "2021"
edition = "2024"

View file

@ -1,7 +1,7 @@
[package]
name = "nebkor-maelstrom"
edition = "2021"
version = "1.0.0"
edition = "2024"
version = "1.1.0"
license-file = "LICENSE.md"
readme = "README.md"
description = "An easy-to-use and synchronous client for creating Maelstrom distributed clients."

View file

@ -7,8 +7,8 @@ distributed actors. It has three dependencies:
- serde
- serde_json
- serde_repr
For a simple example, see the [echo](https://git.kittencollective.com/nebkor/nebkor-maelstrom/src/branch/main/examples/echo.rs) example:
Here's a simple and complete example that implements the single node [echo](https://fly.io/dist-sys/1/) challenge:
``` rust
use nebkor_maelstrom::{Body, Message, Node, Runner};
@ -55,7 +55,7 @@ much away.
Create a struct and implement `nebkor_maelstrom::Node` for it, which involves a single method,
`handle(&mut self, &Runner, Message)`. This method is passed a `Runner` which contains methods like
`send`, `reply`, and `rpc`.
`send()`, `reply()`, and `rpc()`.
In your main function, instantiate that struct and pass that into `Runner::new()` to get a
Runner. The `run()` method takes an optional callback that will be run when the `init` Message is
@ -83,6 +83,6 @@ diving into its source from your IDE or browser.
## Acknowledgments
I straight-up stole the design of the IO/network system from
[Maelbreaker](https://github.com/rafibayer/maelbreaker/), which allowed me to get a working RPC
call. Thanks! And thanks to Nicole for nudging me to publish this.
I straight-up stole the initial design of the IO/network system from
[Maelbreaker](https://github.com/rafibayer/maelbreaker/), which allowed me to get working RPC
calls; thanks! And thanks to Nicole for nudging me to publish this.

View file

@ -1,6 +1,4 @@
use crate::Value;
use crate::{check_err, mk_payload, Body, RpcResult, Runner};
use crate::{Body, RpcResult, Runner, Value, check_err, mk_payload};
/// A convenient way to do RPC with Maelstrom's KV services.
#[derive(Debug, Default, Clone)]
@ -37,7 +35,8 @@ impl Kv {
/// let kv = Kv::seq();
/// let result = kv.read(&runner, "MY_KEY");
/// // if "MY_KEY" had previously been written into the store, then the Result will be `Ok`
/// // and the body will always be `Some(Value)`.
/// // and the body will always be `Some(Value)`; otherwise, it will be an `Err`, probably
/// // an `ErrorCode::Definite(DefiniteError::KeyNotFound)`.
/// assert!(result.is_ok() && result.unwrap().is_some());
/// ```
pub fn read(&self, runner: &Runner, key: &str) -> RpcResult {

View file

@ -2,9 +2,9 @@ use std::{
collections::HashMap,
io::{BufRead, Write},
sync::{
atomic::{AtomicU64, Ordering},
mpsc::{channel, Receiver, Sender},
Arc, Mutex, OnceLock,
atomic::{AtomicU64, Ordering},
mpsc::{Receiver, Sender, channel},
},
thread::{self},
};
@ -18,7 +18,7 @@ pub mod kv;
pub type NodeyNodeFace = Arc<Mutex<dyn Node>>;
pub type OnInit = Box<dyn Fn(&Runner)>;
pub type RpcPromise = Receiver<Message>;
pub type RpcFuture = Receiver<Message>;
pub type RpcResult = std::result::Result<Option<Value>, ErrorCode>;
pub trait Node {
@ -49,14 +49,15 @@ impl Runner {
}
}
/// Start processing messages from stdin and sending them to your node. The `on_init` argument
/// is an optional callback that will be called with `&self` after the `init` message from
/// Maelstrom has been processed.
/// Start processing messages from stdin and sending them to your node. The
/// `on_init` argument is an optional callback that will be called with
/// `&self` after the `init` message from Maelstrom has been processed.
///
/// # Examples
///
/// ```no_run
/// use nebkor_maelstrom::{Body, Message, Node, Runner};
///
/// struct Foo;
/// impl Node for Foo {fn handle(&mut self, _runner: &Runner, _msg: Message) { /* empty impl */ }}
///
@ -88,9 +89,10 @@ impl Runner {
self.process_input(on_init);
}
/// Get a Sender that will send Messages to the node as input. Useful for triggering periodic
/// behavior from a separate thread, or for sending a Message to the node from `on_init`. See
/// the `broadcast` example for a use of it.
/// Get a Sender that will send Messages to the node as input. Useful for
/// triggering periodic behavior from a separate thread, or for sending
/// a Message to the node from `on_init`. See the `broadcast` example
/// for a use of it.
pub fn get_backdoor(&self) -> Sender<Message> {
self.backdoor.get().unwrap().clone()
}
@ -123,14 +125,15 @@ impl Runner {
self.outbound_tx.get().unwrap().send(msg).unwrap();
}
/// Returns a Receiver<Message> that will receive the reply from the request.
pub fn rpc(&self, dest: &str, body: Body) -> RpcPromise {
/// Returns a Receiver<Message> that will receive the reply from the
/// request.
pub fn rpc(&self, dest: &str, body: Body) -> RpcFuture {
let msg = self.mk_msg(dest, body);
let (tx, rx) = channel();
{
let msg_id = msg.body.msg_id;
let mut g = self.promises.lock().unwrap();
g.insert(msg_id, tx);
let mut promises = self.promises.lock().unwrap();
promises.insert(msg_id, tx);
}
self.outbound_tx.get().unwrap().send(msg).unwrap();
rx
@ -164,8 +167,8 @@ impl Runner {
fn process_input(&self, on_init: Option<OnInit>) {
// for sending Messages to the node's inputs
let (encoded_input_tx, encoded_input_rx) = channel();
let _ = self.backdoor.get_or_init(|| encoded_input_tx.clone());
let (decoded_input_tx, decoded_input_rx) = channel();
let _ = self.backdoor.get_or_init(|| decoded_input_tx.clone());
// decouple stdin from processing
let proms = self.promises.clone();
@ -179,13 +182,13 @@ impl Runner {
promise.send(msg).unwrap();
} else {
// just let the node's `handle()` method handle it
encoded_input_tx.send(msg).unwrap();
decoded_input_tx.send(msg).unwrap();
}
}
});
// first Message is always `init`:
let msg = encoded_input_rx.recv().unwrap();
let msg = decoded_input_rx.recv().unwrap();
{
self.init(&msg);
let body = Body::from_type("init_ok");
@ -197,7 +200,7 @@ impl Runner {
// every other message is for the node's handle() method
let mut node = self.node.lock().unwrap();
for msg in encoded_input_rx {
for msg in decoded_input_rx {
node.handle(self, msg);
}
}

View file

@ -52,9 +52,10 @@ impl Message {
m
}
/// The Maelstrom type of a Body (and hence a Message) is just a string. This is for the sake of
/// ease of use for doing the Gossip Glomers challenges; this crate is not meant to be a real
/// network client framework.
/// The Maelstrom type of a Body (and hence a Message) is just a string.
/// This is for the sake of ease of use for doing the Gossip Glomers
/// challenges; this crate is not meant to be a real network client
/// framework.
pub fn typ(&self) -> &str {
self.body.typ.as_str()
}