Compare commits

..

No commits in common. "067a03b918e1369eced41842db1248b87afc4b35" and "e5150af666e0531a3a1615254718b63df117abb9" have entirely different histories.

6 changed files with 34 additions and 37 deletions

View file

@ -1,4 +1,4 @@
imports_granularity = "Crate" imports_granularity = "Crate"
group_imports = "StdExternalCrate" group_imports = "StdExternalCrate"
wrap_comments = true wrap_comments = true
edition = "2024" edition = "2021"

View file

@ -1,7 +1,7 @@
[package] [package]
name = "nebkor-maelstrom" name = "nebkor-maelstrom"
edition = "2024" edition = "2021"
version = "1.1.0" version = "1.0.0"
license-file = "LICENSE.md" license-file = "LICENSE.md"
readme = "README.md" readme = "README.md"
description = "An easy-to-use and synchronous client for creating Maelstrom distributed clients." description = "An easy-to-use and synchronous client for creating Maelstrom distributed clients."

View file

@ -7,8 +7,8 @@ distributed actors. It has three dependencies:
- serde - serde
- serde_json - serde_json
- serde_repr - serde_repr
Here's a simple and complete example that implements the single node [echo](https://fly.io/dist-sys/1/) challenge: For a simple example, see the [echo](https://git.kittencollective.com/nebkor/nebkor-maelstrom/src/branch/main/examples/echo.rs) example:
``` rust ``` rust
use nebkor_maelstrom::{Body, Message, Node, Runner}; use nebkor_maelstrom::{Body, Message, Node, Runner};
@ -55,7 +55,7 @@ much away.
Create a struct and implement `nebkor_maelstrom::Node` for it, which involves a single method, Create a struct and implement `nebkor_maelstrom::Node` for it, which involves a single method,
`handle(&mut self, &Runner, Message)`. This method is passed a `Runner` which contains methods like `handle(&mut self, &Runner, Message)`. This method is passed a `Runner` which contains methods like
`send()`, `reply()`, and `rpc()`. `send`, `reply`, and `rpc`.
In your main function, instantiate that struct and pass that into `Runner::new()` to get a In your main function, instantiate that struct and pass that into `Runner::new()` to get a
Runner. The `run()` method takes an optional callback that will be run when the `init` Message is Runner. The `run()` method takes an optional callback that will be run when the `init` Message is
@ -83,6 +83,6 @@ diving into its source from your IDE or browser.
## Acknowledgments ## Acknowledgments
I straight-up stole the initial design of the IO/network system from I straight-up stole the design of the IO/network system from
[Maelbreaker](https://github.com/rafibayer/maelbreaker/), which allowed me to get working RPC [Maelbreaker](https://github.com/rafibayer/maelbreaker/), which allowed me to get a working RPC
calls; thanks! And thanks to Nicole for nudging me to publish this. call. Thanks! And thanks to Nicole for nudging me to publish this.

View file

@ -1,4 +1,6 @@
use crate::{Body, RpcResult, Runner, Value, check_err, mk_payload}; use crate::Value;
use crate::{check_err, mk_payload, Body, RpcResult, Runner};
/// A convenient way to do RPC with Maelstrom's KV services. /// A convenient way to do RPC with Maelstrom's KV services.
#[derive(Debug, Default, Clone)] #[derive(Debug, Default, Clone)]
@ -35,8 +37,7 @@ impl Kv {
/// let kv = Kv::seq(); /// let kv = Kv::seq();
/// let result = kv.read(&runner, "MY_KEY"); /// let result = kv.read(&runner, "MY_KEY");
/// // if "MY_KEY" had previously been written into the store, then the Result will be `Ok` /// // if "MY_KEY" had previously been written into the store, then the Result will be `Ok`
/// // and the body will always be `Some(Value)`; otherwise, it will be an `Err`, probably /// // and the body will always be `Some(Value)`.
/// // an `ErrorCode::Definite(DefiniteError::KeyNotFound)`.
/// assert!(result.is_ok() && result.unwrap().is_some()); /// assert!(result.is_ok() && result.unwrap().is_some());
/// ``` /// ```
pub fn read(&self, runner: &Runner, key: &str) -> RpcResult { pub fn read(&self, runner: &Runner, key: &str) -> RpcResult {

View file

@ -2,9 +2,9 @@ use std::{
collections::HashMap, collections::HashMap,
io::{BufRead, Write}, io::{BufRead, Write},
sync::{ sync::{
Arc, Mutex, OnceLock,
atomic::{AtomicU64, Ordering}, atomic::{AtomicU64, Ordering},
mpsc::{Receiver, Sender, channel}, mpsc::{channel, Receiver, Sender},
Arc, Mutex, OnceLock,
}, },
thread::{self}, thread::{self},
}; };
@ -18,7 +18,7 @@ pub mod kv;
pub type NodeyNodeFace = Arc<Mutex<dyn Node>>; pub type NodeyNodeFace = Arc<Mutex<dyn Node>>;
pub type OnInit = Box<dyn Fn(&Runner)>; pub type OnInit = Box<dyn Fn(&Runner)>;
pub type RpcFuture = Receiver<Message>; pub type RpcPromise = Receiver<Message>;
pub type RpcResult = std::result::Result<Option<Value>, ErrorCode>; pub type RpcResult = std::result::Result<Option<Value>, ErrorCode>;
pub trait Node { pub trait Node {
@ -49,15 +49,14 @@ impl Runner {
} }
} }
/// Start processing messages from stdin and sending them to your node. The /// Start processing messages from stdin and sending them to your node. The `on_init` argument
/// `on_init` argument is an optional callback that will be called with /// is an optional callback that will be called with `&self` after the `init` message from
/// `&self` after the `init` message from Maelstrom has been processed. /// Maelstrom has been processed.
/// ///
/// # Examples /// # Examples
/// ///
/// ```no_run /// ```no_run
/// use nebkor_maelstrom::{Body, Message, Node, Runner}; /// use nebkor_maelstrom::{Body, Message, Node, Runner};
///
/// struct Foo; /// struct Foo;
/// impl Node for Foo {fn handle(&mut self, _runner: &Runner, _msg: Message) { /* empty impl */ }} /// impl Node for Foo {fn handle(&mut self, _runner: &Runner, _msg: Message) { /* empty impl */ }}
/// ///
@ -89,10 +88,9 @@ impl Runner {
self.process_input(on_init); self.process_input(on_init);
} }
/// Get a Sender that will send Messages to the node as input. Useful for /// Get a Sender that will send Messages to the node as input. Useful for triggering periodic
/// triggering periodic behavior from a separate thread, or for sending /// behavior from a separate thread, or for sending a Message to the node from `on_init`. See
/// a Message to the node from `on_init`. See the `broadcast` example /// the `broadcast` example for a use of it.
/// for a use of it.
pub fn get_backdoor(&self) -> Sender<Message> { pub fn get_backdoor(&self) -> Sender<Message> {
self.backdoor.get().unwrap().clone() self.backdoor.get().unwrap().clone()
} }
@ -125,15 +123,14 @@ impl Runner {
self.outbound_tx.get().unwrap().send(msg).unwrap(); self.outbound_tx.get().unwrap().send(msg).unwrap();
} }
/// Returns a Receiver<Message> that will receive the reply from the /// Returns a Receiver<Message> that will receive the reply from the request.
/// request. pub fn rpc(&self, dest: &str, body: Body) -> RpcPromise {
pub fn rpc(&self, dest: &str, body: Body) -> RpcFuture {
let msg = self.mk_msg(dest, body); let msg = self.mk_msg(dest, body);
let (tx, rx) = channel(); let (tx, rx) = channel();
{ {
let msg_id = msg.body.msg_id; let msg_id = msg.body.msg_id;
let mut promises = self.promises.lock().unwrap(); let mut g = self.promises.lock().unwrap();
promises.insert(msg_id, tx); g.insert(msg_id, tx);
} }
self.outbound_tx.get().unwrap().send(msg).unwrap(); self.outbound_tx.get().unwrap().send(msg).unwrap();
rx rx
@ -167,8 +164,8 @@ impl Runner {
fn process_input(&self, on_init: Option<OnInit>) { fn process_input(&self, on_init: Option<OnInit>) {
// for sending Messages to the node's inputs // for sending Messages to the node's inputs
let (decoded_input_tx, decoded_input_rx) = channel(); let (encoded_input_tx, encoded_input_rx) = channel();
let _ = self.backdoor.get_or_init(|| decoded_input_tx.clone()); let _ = self.backdoor.get_or_init(|| encoded_input_tx.clone());
// decouple stdin from processing // decouple stdin from processing
let proms = self.promises.clone(); let proms = self.promises.clone();
@ -182,13 +179,13 @@ impl Runner {
promise.send(msg).unwrap(); promise.send(msg).unwrap();
} else { } else {
// just let the node's `handle()` method handle it // just let the node's `handle()` method handle it
decoded_input_tx.send(msg).unwrap(); encoded_input_tx.send(msg).unwrap();
} }
} }
}); });
// first Message is always `init`: // first Message is always `init`:
let msg = decoded_input_rx.recv().unwrap(); let msg = encoded_input_rx.recv().unwrap();
{ {
self.init(&msg); self.init(&msg);
let body = Body::from_type("init_ok"); let body = Body::from_type("init_ok");
@ -200,7 +197,7 @@ impl Runner {
// every other message is for the node's handle() method // every other message is for the node's handle() method
let mut node = self.node.lock().unwrap(); let mut node = self.node.lock().unwrap();
for msg in decoded_input_rx { for msg in encoded_input_rx {
node.handle(self, msg); node.handle(self, msg);
} }
} }

View file

@ -52,10 +52,9 @@ impl Message {
m m
} }
/// The Maelstrom type of a Body (and hence a Message) is just a string. /// The Maelstrom type of a Body (and hence a Message) is just a string. This is for the sake of
/// This is for the sake of ease of use for doing the Gossip Glomers /// ease of use for doing the Gossip Glomers challenges; this crate is not meant to be a real
/// challenges; this crate is not meant to be a real network client /// network client framework.
/// framework.
pub fn typ(&self) -> &str { pub fn typ(&self) -> &str {
self.body.typ.as_str() self.body.typ.as_str()
} }