Compare commits

..

8 commits

Author SHA1 Message Date
Joe Ardent
a15d382d1b add readme and thanks to maelbreaker 2024-06-03 17:26:39 -07:00
Joe Ardent
2793508a93 framework works 2024-06-03 16:54:15 -07:00
Joe Ardent
6f45521660 new IO works for old challenges. 2024-06-02 11:13:31 -07:00
Joe Ardent
e6d76ba37c still deadlocking 2024-05-30 21:52:52 -07:00
Joe Ardent
c53edd921a deadlocks on writing to stdout 2024-05-30 14:36:05 -07:00
Joe Ardent
8638ffe67f add support for callback on init in the runner. 2024-05-29 14:46:09 -07:00
Joe Ardent
461087023d Make handle take Message by value.
This makes closures easier to deal with in RPC calls.
2024-05-29 13:58:11 -07:00
Joe Ardent
9e1d05983d non-working gg-counter, empty kv impl 2024-05-28 13:35:32 -07:00
11 changed files with 403 additions and 108 deletions

9
Cargo.lock generated
View file

@ -36,6 +36,15 @@ dependencies = [
"serde_json", "serde_json",
] ]
[[package]]
name = "gg-g_counter"
version = "0.0.1"
dependencies = [
"nebkor-maelstrom",
"rand",
"serde_json",
]
[[package]] [[package]]
name = "gg-uid" name = "gg-uid"
version = "0.0.1" version = "0.0.1"

View file

@ -1,5 +1,5 @@
[workspace] [workspace]
members = ["gg-echo", "gg-uid", "gg-broadcast", "nebkor-maelstrom"] members = ["gg-echo", "gg-uid", "gg-broadcast", "nebkor-maelstrom", "gg-g_counter"]
resolver = "2" resolver = "2"
[workspace.package] [workspace.package]
@ -9,3 +9,5 @@ license-file = "LICENSE.md"
[workspace.dependencies] [workspace.dependencies]
serde_json = "1" serde_json = "1"
rand = "0.8"

View file

@ -8,4 +8,4 @@ license-file.workspace = true
[dependencies] [dependencies]
serde_json.workspace = true serde_json.workspace = true
nebkor-maelstrom = { path = "../nebkor-maelstrom" } nebkor-maelstrom = { path = "../nebkor-maelstrom" }
rand = "0.8.5" rand.workspace = true

View file

@ -1,7 +1,6 @@
use std::{ use std::{
collections::{HashMap, HashSet}, collections::{HashMap, HashSet},
io::BufRead, sync::{Arc, Mutex},
sync::{mpsc::channel, Arc, Mutex},
thread, thread,
time::Duration, time::Duration,
}; };
@ -10,28 +9,18 @@ use nebkor_maelstrom::{protocol::Payload, Body, Message, Node, Runner};
use rand::Rng; use rand::Rng;
fn main() { fn main() {
let out = std::io::stdout();
let std_in = Arc::new(std::io::stdin());
let (tx, rx) = channel();
let input = tx.clone();
let i = thread::spawn(move || {
let g = std_in.lock();
for line in g.lines().map_while(Result::ok) {
if let Ok(msg) = serde_json::from_str::<Message>(&line) {
input.send(msg).unwrap();
}
}
});
let node = BCaster::default(); let node = BCaster::default();
let node = Arc::new(Mutex::new(node)); let node = Arc::new(Mutex::new(node));
let runner = Runner::new(node, out); let runner = Runner::new(node);
let runner = &runner; let runner = &runner;
let g = thread::spawn(move || loop { runner.run(Some(Box::new(on_init)));
}
fn on_init(runner: &Runner) {
let tx = runner.get_backdoor();
thread::spawn(move || loop {
let millis = rand::thread_rng().gen_range(400..=800); let millis = rand::thread_rng().gen_range(400..=800);
thread::sleep(Duration::from_millis(millis)); thread::sleep(Duration::from_millis(millis));
let body = Body::from_type("do_gossip"); let body = Body::from_type("do_gossip");
@ -41,10 +30,6 @@ fn main() {
}; };
tx.send(msg).unwrap(); tx.send(msg).unwrap();
}); });
runner.run(rx);
let _ = i.join();
let _ = g.join();
} }
#[derive(Clone, Default)] #[derive(Clone, Default)]
@ -106,7 +91,7 @@ impl BCaster {
} }
impl Node for BCaster { impl Node for BCaster {
fn handle(&mut self, runner: &Runner, req: &Message) { fn handle(&mut self, runner: &Runner, req: Message) {
let typ = req.body.typ.as_str(); let typ = req.body.typ.as_str();
let frm = req.src.as_str(); let frm = req.src.as_str();
let nid = runner.node_id(); let nid = runner.node_id();
@ -121,7 +106,7 @@ impl Node for BCaster {
if let Some(val) = val { if let Some(val) = val {
self.store.insert(val); self.store.insert(val);
let body = Body::from_type("broadcast_ok"); let body = Body::from_type("broadcast_ok");
runner.reply(req, body); runner.reply(&req, body);
} }
} }
"read" => { "read" => {
@ -131,12 +116,12 @@ impl Node for BCaster {
.collect(); .collect();
let body = Body::from_type("read_ok").with_payload(payload); let body = Body::from_type("read_ok").with_payload(payload);
runner.reply(req, body); runner.reply(&req, body);
} }
"topology" => { "topology" => {
self.topology(runner.nodes(), nid); self.topology(runner.nodes(), nid);
let body = Body::from_type("topology_ok"); let body = Body::from_type("topology_ok");
runner.reply(req, body); runner.reply(&req, body);
} }
"gossip" => { "gossip" => {
let goss = req let goss = req

View file

@ -1,44 +1,25 @@
use std::{ use std::sync::{Arc, Mutex};
io::BufRead,
sync::{mpsc::channel, Arc, Mutex},
thread,
};
use nebkor_maelstrom::{Body, Message, Node, Runner}; use nebkor_maelstrom::{Body, Message, Node, Runner};
struct Echo; struct Echo;
impl Node for Echo { impl Node for Echo {
fn handle(&mut self, runner: &Runner, msg: &Message) { fn handle(&mut self, runner: &Runner, msg: Message) {
let typ = &msg.body.typ; let typ = &msg.body.typ;
if typ.as_str() == "echo" { if typ.as_str() == "echo" {
let body = Body::from_type("echo_ok").with_payload(msg.body.payload.clone()); let body = Body::from_type("echo_ok").with_payload(msg.body.payload.clone());
runner.reply(msg, body); runner.reply(&msg, body);
} }
} }
} }
fn main() { fn main() {
let out = std::io::stdout();
let std_in = Arc::new(std::io::stdin());
let (tx, rx) = channel();
let i = thread::spawn(move || {
let g = std_in.lock();
for line in g.lines().map_while(Result::ok) {
if let Ok(msg) = serde_json::from_str::<Message>(&line) {
tx.send(msg).unwrap();
}
}
});
let node = Echo; let node = Echo;
let node = Arc::new(Mutex::new(node)); let node = Arc::new(Mutex::new(node));
let runner = Runner::new(node, out); let runner = Runner::new(node);
runner.run(rx); runner.run(None);
i.join().unwrap();
} }

12
gg-g_counter/Cargo.toml Normal file
View file

@ -0,0 +1,12 @@
[package]
name = "gg-g_counter"
edition = "2021"
version.workspace = true
authors.workspace = true
license-file.workspace = true
[dependencies]
serde_json.workspace = true
nebkor-maelstrom = { path = "../nebkor-maelstrom" }
rand.workspace = true

85
gg-g_counter/src/main.rs Normal file
View file

@ -0,0 +1,85 @@
use std::sync::{Arc, Mutex};
use nebkor_maelstrom::{mk_payload, Body, Message, Node, Runner};
const KEY: &str = "COUNTER";
#[derive(Clone, Default)]
struct Counter;
fn main() {
let node = Counter;
let node = Arc::new(Mutex::new(node));
let runner = Runner::new(node);
let on_init = |rnr: &Runner| {
let payload = mk_payload(&[
("key", KEY.into()),
("from", 0i64.into()),
("to", 0i64.into()),
("create_if_not_exists", true.into()),
]);
let body = Body::from_type("cas").with_payload(payload);
rnr.send("seq-kv", body);
};
let on_init = Box::new(on_init);
runner.run(Some(on_init));
}
impl Node for Counter {
fn handle<'slf>(&'slf mut self, runner: &'slf Runner, req: Message) {
let read_payload = mk_payload(&[("key", KEY.into())]);
let read_body = Body::from_type("read").with_payload(read_payload);
let typ = req.body.typ.as_str();
//let frm = req.src.clone();
//let msg_id = req.body.msg_id.to_owned();
match typ {
"add" => {
let delta = req.body.payload.get("delta").unwrap().as_i64().unwrap();
let cur = runner
.rpc("seq-kv", read_body)
.recv()
.unwrap()
.body
.payload
.get("value")
.cloned()
.unwrap()
.as_i64()
.unwrap();
let cas_payload = mk_payload(&[
("key", KEY.into()),
("from", cur.into()),
("to", (cur + delta).into()),
]);
// ERRORS BE HERE
runner
.rpc("seq-kv", Body::from_type("cas").with_payload(cas_payload))
.recv()
.unwrap();
runner.reply(&req, Body::from_type("add_ok"));
}
"read" => {
let payload = mk_payload(&[("key", KEY.into())]);
let body = Body::from_type("read").with_payload(payload);
let val = runner
.rpc("seq-kv", body)
.recv()
.unwrap()
.body
.payload
.get("value")
.cloned()
.unwrap();
let body = Body::from_type("read_ok").with_payload(mk_payload(&[("value", val)]));
runner.reply(&req, body);
}
_ => {
eprintln!("unknown type: {req:?}");
}
}
}
}

View file

@ -1,40 +1,21 @@
use std::{ use std::sync::{Arc, Mutex};
io::BufRead,
sync::{mpsc::channel, Arc, Mutex},
thread,
};
use nebkor_maelstrom::{protocol::Payload, Body, Message, Node, Runner}; use nebkor_maelstrom::{protocol::Payload, Body, Message, Node, Runner};
fn main() { fn main() {
let out = std::io::stdout();
let std_in = Arc::new(std::io::stdin());
let (tx, rx) = channel();
let i = thread::spawn(move || {
let g = std_in.lock();
for line in g.lines().map_while(Result::ok) {
if let Ok(msg) = serde_json::from_str::<Message>(&line) {
tx.send(msg).unwrap();
}
}
});
let node = GenUid; let node = GenUid;
let node = Arc::new(Mutex::new(node)); let node = Arc::new(Mutex::new(node));
let runner = Runner::new(node, out); let runner = Runner::new(node);
runner.run(rx); runner.run(None);
i.join().unwrap();
} }
#[derive(Clone, Default)] #[derive(Clone, Default)]
struct GenUid; struct GenUid;
impl Node for GenUid { impl Node for GenUid {
fn handle(&mut self, runner: &Runner, msg: &Message) { fn handle(&mut self, runner: &Runner, msg: Message) {
let id = runner.node_id(); let id = runner.node_id();
let mid = runner.next_msg_id(); let mid = runner.next_msg_id();
if msg.body.typ == "generate" { if msg.body.typ == "generate" {
@ -45,7 +26,7 @@ impl Node for GenUid {
.with_msg_id(mid) .with_msg_id(mid)
.with_payload(payload); .with_payload(payload);
runner.reply(msg, body); runner.reply(&msg, body);
} }
} }
} }

View file

@ -0,0 +1,69 @@
# A synchronous and simple Maelstrom crate
`nebkor-maelstreom` is a lean and simple synchronous library for writing
[Maelstrom](https://github.com/jepsen-io/maelstrom/tree/0186f398f96564a0453dda97c07daeac67c3d8d7)-compatible
distributed actors. It has three dependencies:
- serde
- serde_json
- serde_repr
For a simple example, see the [gg-ehco](../gg-echo/src/main.rs) program:
``` rust
use std::sync::{Arc, Mutex};
use nebkor_maelstrom::{Body, Message, Node, Runner};
struct Echo;
impl Node for Echo {
fn handle(&mut self, runner: &Runner, msg: Message) {
let typ = &msg.body.typ;
if typ.as_str() == "echo" {
let body = Body::from_type("echo_ok").with_payload(msg.body.payload.clone());
runner.reply(&msg, body);
}
}
}
fn main() {
let node = Echo;
let node = Arc::new(Mutex::new(node));
let runner = Runner::new(node);
runner.run(None);
}
```
## How to use
Create a struct and implement `nebkor_maelstrom::Node` for it, which involves a single method,
`handle(&mut self, &Runner, Message)`. This method is passed a `Runner` which contains methods like
`send`, `reply`, and `rpc`.
In your main function, instantiate that struct and wrap it in an `Arc<Mutex<>>`, then pass that into
`Runner::new()` to get a Runner. The `run()` method takes an optional closure that will be run when
the `init` Message is received; see the [broadcast](../gg-broadcast/src/main.rs) program for an
example of that, where it spawns a thread to send periodic messages to the node.
## Design considerations
I wanted the client code to be as simple as possible, with the least amount of boilerplate. Using
`&mut self` as the receiver for the `handle()` method lets you easily mutate state in your node if
you need to, without the ceremony of `Rc<Mutex<>>` and the like. Eschewing `async` results in an
order of magnitude fewer dependencies, and the entire workspace (crate and clients) can be compiled
in a couple seconds.
## Acknowledgments
I straight-up stole the design of the IO/network system from
[Maelbreaker](https://github.com/rafibayer/maelbreaker/), which allowed me to get a working RPC
call. Thanks!
## TODO
- add error handling.

View file

@ -0,0 +1,51 @@
/*
use std::{rc::Rc, sync::Arc};
use serde_json::Value;
use crate::{mk_payload, Body, Message, Result, Runner};
#[derive(Debug, Default, Clone)]
pub struct Kv {
pub service: String,
}
impl Kv {
pub fn seq() -> Self {
Kv {
service: "seq-kv".to_string(),
}
}
pub fn lin() -> Self {
Kv {
service: "lin-kv".to_string(),
}
}
pub fn lww() -> Self {
Kv {
service: "lww-kv".to_string(),
}
}
pub fn read(&self, runner: &Runner, key: &str) -> Result<Value> {
todo!()
}
pub fn write(&self, runner: &Runner, key: &str, val: Value, create: bool) -> Result<()> {
todo!()
}
pub fn cas(
&self,
runner: &Runner,
key: &str,
from: Value,
to: Value,
create: bool,
) -> Result<()> {
todo!()
}
}
*/

View file

@ -1,66 +1,174 @@
use std::{ use std::{
io::{Stdout, Write}, collections::HashMap,
io::{BufRead, Write},
sync::{ sync::{
atomic::{AtomicU64, AtomicUsize, Ordering}, atomic::{AtomicU64, AtomicUsize, Ordering},
mpsc::{channel, Receiver, Sender},
Arc, Mutex, OnceLock, Arc, Mutex, OnceLock,
}, },
thread::{self},
}; };
pub mod protocol; pub mod protocol;
pub use protocol::{Body, Message}; use protocol::ErrorCode;
pub use protocol::{Body, Message, Payload};
use serde_json::Value;
pub mod kv;
pub type DynNode = Arc<Mutex<dyn Node>>; pub type DynNode = Arc<Mutex<dyn Node>>;
pub type OnInit = Box<dyn Fn(&Runner)>;
pub type Result<T> = std::result::Result<T, ErrorCode>;
pub type RpcPromise = Receiver<Message>;
static MSG_ID: AtomicU64 = AtomicU64::new(0);
pub trait Node { pub trait Node {
fn handle(&mut self, runner: &Runner, msg: &Message); fn handle(&mut self, runner: &Runner, msg: Message);
}
#[derive(Debug, Clone)]
pub struct Network {
promises: Arc<Mutex<HashMap<u64, Sender<Message>>>>,
node_output_tx: Sender<Message>,
}
impl Network {
pub fn new() -> (Self, Receiver<Message>) {
let (node_output_tx, node_output_rx) = channel();
let net = Self {
node_output_tx,
promises: Arc::new(Mutex::new(HashMap::default())),
};
(net, node_output_rx)
}
pub fn send(&self, msg: Message) {
self.node_output_tx.send(msg).unwrap();
}
pub fn rpc(&self, msg: Message) -> RpcPromise {
let (tx, rx) = channel();
{
let msg_id = msg.body.msg_id;
let mut g = self.promises.lock().unwrap();
g.insert(msg_id, tx);
}
self.send(msg);
rx
}
} }
pub struct Runner { pub struct Runner {
msg_id: AtomicU64,
node: DynNode, node: DynNode,
node_id: OnceLock<String>, node_id: OnceLock<String>,
nodes: OnceLock<Vec<String>>, nodes: OnceLock<Vec<String>>,
steps: AtomicUsize, network: OnceLock<Network>,
output: Arc<Mutex<Stdout>>, backdoor: OnceLock<Sender<Message>>,
steps: Arc<AtomicUsize>,
} }
impl Runner { impl Runner {
pub fn new(node: DynNode, output: Stdout) -> Self { pub fn new(node: DynNode) -> Self {
Runner { Runner {
node, node,
msg_id: AtomicU64::new(1),
nodes: OnceLock::new(), nodes: OnceLock::new(),
node_id: OnceLock::new(), node_id: OnceLock::new(),
steps: AtomicUsize::new(0), network: OnceLock::new(),
output: Arc::new(Mutex::new(output)), backdoor: OnceLock::new(),
steps: Arc::new(AtomicUsize::new(0)),
} }
} }
pub fn run(&self, input: std::sync::mpsc::Receiver<Message>) { pub fn run(&self, on_init: Option<OnInit>) {
for msg in input.iter() { let (stdin_tx, stdin_rx) = channel();
self.steps.fetch_add(1, Ordering::SeqCst); let (stdout_tx, stdout_rx) = channel();
let typ = &msg.body.typ;
if let "init" = typ.as_str() { thread::spawn(move || {
let stdin = std::io::stdin().lock().lines();
for line in stdin {
stdin_tx.send(line.unwrap()).unwrap();
}
});
thread::spawn(move || {
let mut stdout = std::io::stdout().lock();
for msg in stdout_rx {
writeln!(&mut stdout, "{msg}").unwrap();
}
});
self.run_internal(stdout_tx, stdin_rx, on_init);
}
fn run_internal(
&self,
stdout_tx: Sender<String>,
stdin_rx: Receiver<String>,
on_init: Option<OnInit>,
) {
let (network, node_receiver) = Network::new();
let _ = self.network.get_or_init(|| network.clone());
self.run_output(stdout_tx, node_receiver);
self.run_input(stdin_rx, network, on_init);
}
fn run_input(&self, stdin_rx: Receiver<String>, network: Network, on_init: Option<OnInit>) {
let (json_tx, json_rx) = channel();
let _ = self.backdoor.get_or_init(|| json_tx.clone());
thread::spawn(move || {
for line in stdin_rx {
let msg: Message = serde_json::from_str(&line).unwrap();
let irt = msg.body.in_reply_to;
if let Some(tx) = network.promises.lock().unwrap().remove(&irt) {
tx.send(msg).unwrap();
} else {
json_tx.send(msg).unwrap();
}
}
});
for msg in json_rx {
let mut node = self.node.lock().unwrap();
if msg.body.typ.as_str() == "init" {
self.init(&msg); self.init(&msg);
let body = Body::from_type("init_ok"); let body = Body::from_type("init_ok");
self.reply(&msg, body); self.reply(&msg, body);
if let Some(ref on_init) = on_init {
on_init(self);
}
} else { } else {
let mut n = self.node.lock().unwrap(); node.handle(self, msg);
n.handle(self, &msg);
} }
} }
} }
fn run_output(&self, stdout_tx: Sender<String>, node_output_rx: Receiver<Message>) {
thread::spawn(move || {
while let Ok(msg) = node_output_rx.recv() {
let msg = serde_json::to_string(&msg).unwrap();
stdout_tx.send(msg).unwrap();
}
});
}
pub fn get_backdoor(&self) -> Sender<Message> {
self.backdoor.get().unwrap().clone()
}
pub fn node_id(&self) -> String { pub fn node_id(&self) -> String {
self.node_id.get().cloned().unwrap_or_default() self.node_id.get().cloned().unwrap_or_default()
} }
pub fn next_msg_id(&self) -> u64 { pub fn next_msg_id(&self) -> u64 {
self.msg_id.fetch_add(1, Ordering::SeqCst) MSG_ID.fetch_add(1, Ordering::SeqCst)
} }
pub fn cur_msg_id(&self) -> u64 { pub fn cur_msg_id(&self) -> u64 {
self.msg_id.load(Ordering::SeqCst) MSG_ID.load(Ordering::SeqCst)
} }
pub fn nodes(&self) -> &[String] { pub fn nodes(&self) -> &[String] {
@ -90,18 +198,18 @@ impl Runner {
.unwrap() .unwrap()
.iter() .iter()
.map(|s| s.as_str().unwrap().to_string()) .map(|s| s.as_str().unwrap().to_string())
.collect::<Vec<_>>(); .collect();
let _ = self.node_id.get_or_init(|| node_id.to_owned()); let _ = self.node_id.get_or_init(|| node_id);
let _ = self.nodes.get_or_init(|| nodes.to_vec()); let _ = self.nodes.get_or_init(|| nodes);
} }
pub fn reply(&self, req: &Message, body: Body) { pub fn reply(&self, req: &Message, body: Body) {
let mut body = body; let mut body = body;
let dest = req.src.clone(); let dest = req.src.as_str();
let in_reply_to = req.body.msg_id; let in_reply_to = req.body.msg_id;
body.in_reply_to = in_reply_to; body.in_reply_to = in_reply_to;
self.send(&dest, body); self.send(dest, body);
} }
pub fn send(&self, dest: &str, body: Body) { pub fn send(&self, dest: &str, body: Body) {
@ -114,14 +222,26 @@ impl Runner {
dest: dest.to_string(), dest: dest.to_string(),
body, body,
}; };
let msg = serde_json::to_string(&msg).unwrap(); self.network.get().unwrap().send(msg);
self.writeln(&msg);
} }
fn writeln(&self, msg: &str) { pub fn rpc(&self, dest: &str, body: Body) -> RpcPromise {
let mut out = self.output.lock().unwrap(); let mut body = body;
let msg = format!("{msg}\n"); if body.msg_id == 0 {
out.write_all(msg.as_bytes()).unwrap(); body.msg_id = self.next_msg_id();
out.flush().unwrap(); }
let msg = Message {
src: self.node_id().to_string(),
dest: dest.to_string(),
body,
};
self.network.get().unwrap().rpc(msg)
} }
} }
pub fn mk_payload(payload: &[(&str, Value)]) -> Payload {
payload
.iter()
.map(|p| (p.0.to_string(), p.1.clone()))
.collect()
}