Compare commits
No commits in common. "a15d382d1b8c3980ae8cb6ebf6c88993c4026c62" and "6ed80d17fb4d60ab9ad07018eb22d15c33c94ca0" have entirely different histories.
a15d382d1b
...
6ed80d17fb
11 changed files with 108 additions and 403 deletions
9
Cargo.lock
generated
9
Cargo.lock
generated
|
@ -36,15 +36,6 @@ dependencies = [
|
||||||
"serde_json",
|
"serde_json",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "gg-g_counter"
|
|
||||||
version = "0.0.1"
|
|
||||||
dependencies = [
|
|
||||||
"nebkor-maelstrom",
|
|
||||||
"rand",
|
|
||||||
"serde_json",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "gg-uid"
|
name = "gg-uid"
|
||||||
version = "0.0.1"
|
version = "0.0.1"
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
[workspace]
|
[workspace]
|
||||||
members = ["gg-echo", "gg-uid", "gg-broadcast", "nebkor-maelstrom", "gg-g_counter"]
|
members = ["gg-echo", "gg-uid", "gg-broadcast", "nebkor-maelstrom"]
|
||||||
resolver = "2"
|
resolver = "2"
|
||||||
|
|
||||||
[workspace.package]
|
[workspace.package]
|
||||||
|
@ -9,5 +9,3 @@ license-file = "LICENSE.md"
|
||||||
|
|
||||||
[workspace.dependencies]
|
[workspace.dependencies]
|
||||||
serde_json = "1"
|
serde_json = "1"
|
||||||
rand = "0.8"
|
|
||||||
|
|
||||||
|
|
|
@ -8,4 +8,4 @@ license-file.workspace = true
|
||||||
[dependencies]
|
[dependencies]
|
||||||
serde_json.workspace = true
|
serde_json.workspace = true
|
||||||
nebkor-maelstrom = { path = "../nebkor-maelstrom" }
|
nebkor-maelstrom = { path = "../nebkor-maelstrom" }
|
||||||
rand.workspace = true
|
rand = "0.8.5"
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
use std::{
|
use std::{
|
||||||
collections::{HashMap, HashSet},
|
collections::{HashMap, HashSet},
|
||||||
sync::{Arc, Mutex},
|
io::BufRead,
|
||||||
|
sync::{mpsc::channel, Arc, Mutex},
|
||||||
thread,
|
thread,
|
||||||
time::Duration,
|
time::Duration,
|
||||||
};
|
};
|
||||||
|
@ -9,18 +10,28 @@ use nebkor_maelstrom::{protocol::Payload, Body, Message, Node, Runner};
|
||||||
use rand::Rng;
|
use rand::Rng;
|
||||||
|
|
||||||
fn main() {
|
fn main() {
|
||||||
|
let out = std::io::stdout();
|
||||||
|
let std_in = Arc::new(std::io::stdin());
|
||||||
|
|
||||||
|
let (tx, rx) = channel();
|
||||||
|
let input = tx.clone();
|
||||||
|
|
||||||
|
let i = thread::spawn(move || {
|
||||||
|
let g = std_in.lock();
|
||||||
|
for line in g.lines().map_while(Result::ok) {
|
||||||
|
if let Ok(msg) = serde_json::from_str::<Message>(&line) {
|
||||||
|
input.send(msg).unwrap();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
let node = BCaster::default();
|
let node = BCaster::default();
|
||||||
let node = Arc::new(Mutex::new(node));
|
let node = Arc::new(Mutex::new(node));
|
||||||
|
|
||||||
let runner = Runner::new(node);
|
let runner = Runner::new(node, out);
|
||||||
let runner = &runner;
|
let runner = &runner;
|
||||||
|
|
||||||
runner.run(Some(Box::new(on_init)));
|
let g = thread::spawn(move || loop {
|
||||||
}
|
|
||||||
|
|
||||||
fn on_init(runner: &Runner) {
|
|
||||||
let tx = runner.get_backdoor();
|
|
||||||
thread::spawn(move || loop {
|
|
||||||
let millis = rand::thread_rng().gen_range(400..=800);
|
let millis = rand::thread_rng().gen_range(400..=800);
|
||||||
thread::sleep(Duration::from_millis(millis));
|
thread::sleep(Duration::from_millis(millis));
|
||||||
let body = Body::from_type("do_gossip");
|
let body = Body::from_type("do_gossip");
|
||||||
|
@ -30,6 +41,10 @@ fn on_init(runner: &Runner) {
|
||||||
};
|
};
|
||||||
tx.send(msg).unwrap();
|
tx.send(msg).unwrap();
|
||||||
});
|
});
|
||||||
|
|
||||||
|
runner.run(rx);
|
||||||
|
let _ = i.join();
|
||||||
|
let _ = g.join();
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, Default)]
|
#[derive(Clone, Default)]
|
||||||
|
@ -91,7 +106,7 @@ impl BCaster {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Node for BCaster {
|
impl Node for BCaster {
|
||||||
fn handle(&mut self, runner: &Runner, req: Message) {
|
fn handle(&mut self, runner: &Runner, req: &Message) {
|
||||||
let typ = req.body.typ.as_str();
|
let typ = req.body.typ.as_str();
|
||||||
let frm = req.src.as_str();
|
let frm = req.src.as_str();
|
||||||
let nid = runner.node_id();
|
let nid = runner.node_id();
|
||||||
|
@ -106,7 +121,7 @@ impl Node for BCaster {
|
||||||
if let Some(val) = val {
|
if let Some(val) = val {
|
||||||
self.store.insert(val);
|
self.store.insert(val);
|
||||||
let body = Body::from_type("broadcast_ok");
|
let body = Body::from_type("broadcast_ok");
|
||||||
runner.reply(&req, body);
|
runner.reply(req, body);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
"read" => {
|
"read" => {
|
||||||
|
@ -116,12 +131,12 @@ impl Node for BCaster {
|
||||||
.collect();
|
.collect();
|
||||||
let body = Body::from_type("read_ok").with_payload(payload);
|
let body = Body::from_type("read_ok").with_payload(payload);
|
||||||
|
|
||||||
runner.reply(&req, body);
|
runner.reply(req, body);
|
||||||
}
|
}
|
||||||
"topology" => {
|
"topology" => {
|
||||||
self.topology(runner.nodes(), nid);
|
self.topology(runner.nodes(), nid);
|
||||||
let body = Body::from_type("topology_ok");
|
let body = Body::from_type("topology_ok");
|
||||||
runner.reply(&req, body);
|
runner.reply(req, body);
|
||||||
}
|
}
|
||||||
"gossip" => {
|
"gossip" => {
|
||||||
let goss = req
|
let goss = req
|
||||||
|
|
|
@ -1,25 +1,44 @@
|
||||||
use std::sync::{Arc, Mutex};
|
use std::{
|
||||||
|
io::BufRead,
|
||||||
|
sync::{mpsc::channel, Arc, Mutex},
|
||||||
|
thread,
|
||||||
|
};
|
||||||
|
|
||||||
use nebkor_maelstrom::{Body, Message, Node, Runner};
|
use nebkor_maelstrom::{Body, Message, Node, Runner};
|
||||||
|
|
||||||
struct Echo;
|
struct Echo;
|
||||||
|
|
||||||
impl Node for Echo {
|
impl Node for Echo {
|
||||||
fn handle(&mut self, runner: &Runner, msg: Message) {
|
fn handle(&mut self, runner: &Runner, msg: &Message) {
|
||||||
let typ = &msg.body.typ;
|
let typ = &msg.body.typ;
|
||||||
if typ.as_str() == "echo" {
|
if typ.as_str() == "echo" {
|
||||||
let body = Body::from_type("echo_ok").with_payload(msg.body.payload.clone());
|
let body = Body::from_type("echo_ok").with_payload(msg.body.payload.clone());
|
||||||
runner.reply(&msg, body);
|
runner.reply(msg, body);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn main() {
|
fn main() {
|
||||||
|
let out = std::io::stdout();
|
||||||
|
let std_in = Arc::new(std::io::stdin());
|
||||||
|
|
||||||
|
let (tx, rx) = channel();
|
||||||
|
|
||||||
|
let i = thread::spawn(move || {
|
||||||
|
let g = std_in.lock();
|
||||||
|
for line in g.lines().map_while(Result::ok) {
|
||||||
|
if let Ok(msg) = serde_json::from_str::<Message>(&line) {
|
||||||
|
tx.send(msg).unwrap();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
let node = Echo;
|
let node = Echo;
|
||||||
|
|
||||||
let node = Arc::new(Mutex::new(node));
|
let node = Arc::new(Mutex::new(node));
|
||||||
|
|
||||||
let runner = Runner::new(node);
|
let runner = Runner::new(node, out);
|
||||||
|
|
||||||
runner.run(None);
|
runner.run(rx);
|
||||||
|
i.join().unwrap();
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,12 +0,0 @@
|
||||||
[package]
|
|
||||||
name = "gg-g_counter"
|
|
||||||
edition = "2021"
|
|
||||||
version.workspace = true
|
|
||||||
authors.workspace = true
|
|
||||||
license-file.workspace = true
|
|
||||||
|
|
||||||
[dependencies]
|
|
||||||
serde_json.workspace = true
|
|
||||||
nebkor-maelstrom = { path = "../nebkor-maelstrom" }
|
|
||||||
rand.workspace = true
|
|
||||||
|
|
|
@ -1,85 +0,0 @@
|
||||||
use std::sync::{Arc, Mutex};
|
|
||||||
|
|
||||||
use nebkor_maelstrom::{mk_payload, Body, Message, Node, Runner};
|
|
||||||
|
|
||||||
const KEY: &str = "COUNTER";
|
|
||||||
|
|
||||||
#[derive(Clone, Default)]
|
|
||||||
struct Counter;
|
|
||||||
|
|
||||||
fn main() {
|
|
||||||
let node = Counter;
|
|
||||||
let node = Arc::new(Mutex::new(node));
|
|
||||||
|
|
||||||
let runner = Runner::new(node);
|
|
||||||
|
|
||||||
let on_init = |rnr: &Runner| {
|
|
||||||
let payload = mk_payload(&[
|
|
||||||
("key", KEY.into()),
|
|
||||||
("from", 0i64.into()),
|
|
||||||
("to", 0i64.into()),
|
|
||||||
("create_if_not_exists", true.into()),
|
|
||||||
]);
|
|
||||||
let body = Body::from_type("cas").with_payload(payload);
|
|
||||||
rnr.send("seq-kv", body);
|
|
||||||
};
|
|
||||||
|
|
||||||
let on_init = Box::new(on_init);
|
|
||||||
|
|
||||||
runner.run(Some(on_init));
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Node for Counter {
|
|
||||||
fn handle<'slf>(&'slf mut self, runner: &'slf Runner, req: Message) {
|
|
||||||
let read_payload = mk_payload(&[("key", KEY.into())]);
|
|
||||||
let read_body = Body::from_type("read").with_payload(read_payload);
|
|
||||||
let typ = req.body.typ.as_str();
|
|
||||||
//let frm = req.src.clone();
|
|
||||||
//let msg_id = req.body.msg_id.to_owned();
|
|
||||||
match typ {
|
|
||||||
"add" => {
|
|
||||||
let delta = req.body.payload.get("delta").unwrap().as_i64().unwrap();
|
|
||||||
let cur = runner
|
|
||||||
.rpc("seq-kv", read_body)
|
|
||||||
.recv()
|
|
||||||
.unwrap()
|
|
||||||
.body
|
|
||||||
.payload
|
|
||||||
.get("value")
|
|
||||||
.cloned()
|
|
||||||
.unwrap()
|
|
||||||
.as_i64()
|
|
||||||
.unwrap();
|
|
||||||
let cas_payload = mk_payload(&[
|
|
||||||
("key", KEY.into()),
|
|
||||||
("from", cur.into()),
|
|
||||||
("to", (cur + delta).into()),
|
|
||||||
]);
|
|
||||||
// ERRORS BE HERE
|
|
||||||
runner
|
|
||||||
.rpc("seq-kv", Body::from_type("cas").with_payload(cas_payload))
|
|
||||||
.recv()
|
|
||||||
.unwrap();
|
|
||||||
runner.reply(&req, Body::from_type("add_ok"));
|
|
||||||
}
|
|
||||||
"read" => {
|
|
||||||
let payload = mk_payload(&[("key", KEY.into())]);
|
|
||||||
let body = Body::from_type("read").with_payload(payload);
|
|
||||||
let val = runner
|
|
||||||
.rpc("seq-kv", body)
|
|
||||||
.recv()
|
|
||||||
.unwrap()
|
|
||||||
.body
|
|
||||||
.payload
|
|
||||||
.get("value")
|
|
||||||
.cloned()
|
|
||||||
.unwrap();
|
|
||||||
let body = Body::from_type("read_ok").with_payload(mk_payload(&[("value", val)]));
|
|
||||||
runner.reply(&req, body);
|
|
||||||
}
|
|
||||||
_ => {
|
|
||||||
eprintln!("unknown type: {req:?}");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,21 +1,40 @@
|
||||||
use std::sync::{Arc, Mutex};
|
use std::{
|
||||||
|
io::BufRead,
|
||||||
|
sync::{mpsc::channel, Arc, Mutex},
|
||||||
|
thread,
|
||||||
|
};
|
||||||
|
|
||||||
use nebkor_maelstrom::{protocol::Payload, Body, Message, Node, Runner};
|
use nebkor_maelstrom::{protocol::Payload, Body, Message, Node, Runner};
|
||||||
|
|
||||||
fn main() {
|
fn main() {
|
||||||
|
let out = std::io::stdout();
|
||||||
|
let std_in = Arc::new(std::io::stdin());
|
||||||
|
|
||||||
|
let (tx, rx) = channel();
|
||||||
|
|
||||||
|
let i = thread::spawn(move || {
|
||||||
|
let g = std_in.lock();
|
||||||
|
for line in g.lines().map_while(Result::ok) {
|
||||||
|
if let Ok(msg) = serde_json::from_str::<Message>(&line) {
|
||||||
|
tx.send(msg).unwrap();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
let node = GenUid;
|
let node = GenUid;
|
||||||
let node = Arc::new(Mutex::new(node));
|
let node = Arc::new(Mutex::new(node));
|
||||||
|
|
||||||
let runner = Runner::new(node);
|
let runner = Runner::new(node, out);
|
||||||
|
|
||||||
runner.run(None);
|
runner.run(rx);
|
||||||
|
i.join().unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, Default)]
|
#[derive(Clone, Default)]
|
||||||
struct GenUid;
|
struct GenUid;
|
||||||
|
|
||||||
impl Node for GenUid {
|
impl Node for GenUid {
|
||||||
fn handle(&mut self, runner: &Runner, msg: Message) {
|
fn handle(&mut self, runner: &Runner, msg: &Message) {
|
||||||
let id = runner.node_id();
|
let id = runner.node_id();
|
||||||
let mid = runner.next_msg_id();
|
let mid = runner.next_msg_id();
|
||||||
if msg.body.typ == "generate" {
|
if msg.body.typ == "generate" {
|
||||||
|
@ -26,7 +45,7 @@ impl Node for GenUid {
|
||||||
.with_msg_id(mid)
|
.with_msg_id(mid)
|
||||||
.with_payload(payload);
|
.with_payload(payload);
|
||||||
|
|
||||||
runner.reply(&msg, body);
|
runner.reply(msg, body);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,69 +0,0 @@
|
||||||
# A synchronous and simple Maelstrom crate
|
|
||||||
|
|
||||||
`nebkor-maelstreom` is a lean and simple synchronous library for writing
|
|
||||||
[Maelstrom](https://github.com/jepsen-io/maelstrom/tree/0186f398f96564a0453dda97c07daeac67c3d8d7)-compatible
|
|
||||||
distributed actors. It has three dependencies:
|
|
||||||
|
|
||||||
- serde
|
|
||||||
- serde_json
|
|
||||||
- serde_repr
|
|
||||||
|
|
||||||
For a simple example, see the [gg-ehco](../gg-echo/src/main.rs) program:
|
|
||||||
|
|
||||||
``` rust
|
|
||||||
use std::sync::{Arc, Mutex};
|
|
||||||
|
|
||||||
use nebkor_maelstrom::{Body, Message, Node, Runner};
|
|
||||||
|
|
||||||
struct Echo;
|
|
||||||
|
|
||||||
impl Node for Echo {
|
|
||||||
fn handle(&mut self, runner: &Runner, msg: Message) {
|
|
||||||
let typ = &msg.body.typ;
|
|
||||||
if typ.as_str() == "echo" {
|
|
||||||
let body = Body::from_type("echo_ok").with_payload(msg.body.payload.clone());
|
|
||||||
runner.reply(&msg, body);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn main() {
|
|
||||||
let node = Echo;
|
|
||||||
|
|
||||||
let node = Arc::new(Mutex::new(node));
|
|
||||||
|
|
||||||
let runner = Runner::new(node);
|
|
||||||
|
|
||||||
runner.run(None);
|
|
||||||
}
|
|
||||||
```
|
|
||||||
|
|
||||||
## How to use
|
|
||||||
|
|
||||||
Create a struct and implement `nebkor_maelstrom::Node` for it, which involves a single method,
|
|
||||||
`handle(&mut self, &Runner, Message)`. This method is passed a `Runner` which contains methods like
|
|
||||||
`send`, `reply`, and `rpc`.
|
|
||||||
|
|
||||||
In your main function, instantiate that struct and wrap it in an `Arc<Mutex<>>`, then pass that into
|
|
||||||
`Runner::new()` to get a Runner. The `run()` method takes an optional closure that will be run when
|
|
||||||
the `init` Message is received; see the [broadcast](../gg-broadcast/src/main.rs) program for an
|
|
||||||
example of that, where it spawns a thread to send periodic messages to the node.
|
|
||||||
|
|
||||||
## Design considerations
|
|
||||||
|
|
||||||
I wanted the client code to be as simple as possible, with the least amount of boilerplate. Using
|
|
||||||
`&mut self` as the receiver for the `handle()` method lets you easily mutate state in your node if
|
|
||||||
you need to, without the ceremony of `Rc<Mutex<>>` and the like. Eschewing `async` results in an
|
|
||||||
order of magnitude fewer dependencies, and the entire workspace (crate and clients) can be compiled
|
|
||||||
in a couple seconds.
|
|
||||||
|
|
||||||
## Acknowledgments
|
|
||||||
|
|
||||||
I straight-up stole the design of the IO/network system from
|
|
||||||
[Maelbreaker](https://github.com/rafibayer/maelbreaker/), which allowed me to get a working RPC
|
|
||||||
call. Thanks!
|
|
||||||
|
|
||||||
|
|
||||||
## TODO
|
|
||||||
|
|
||||||
- add error handling.
|
|
|
@ -1,51 +0,0 @@
|
||||||
/*
|
|
||||||
use std::{rc::Rc, sync::Arc};
|
|
||||||
|
|
||||||
use serde_json::Value;
|
|
||||||
|
|
||||||
use crate::{mk_payload, Body, Message, Result, Runner};
|
|
||||||
|
|
||||||
#[derive(Debug, Default, Clone)]
|
|
||||||
pub struct Kv {
|
|
||||||
pub service: String,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Kv {
|
|
||||||
pub fn seq() -> Self {
|
|
||||||
Kv {
|
|
||||||
service: "seq-kv".to_string(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn lin() -> Self {
|
|
||||||
Kv {
|
|
||||||
service: "lin-kv".to_string(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn lww() -> Self {
|
|
||||||
Kv {
|
|
||||||
service: "lww-kv".to_string(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn read(&self, runner: &Runner, key: &str) -> Result<Value> {
|
|
||||||
todo!()
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn write(&self, runner: &Runner, key: &str, val: Value, create: bool) -> Result<()> {
|
|
||||||
todo!()
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn cas(
|
|
||||||
&self,
|
|
||||||
runner: &Runner,
|
|
||||||
key: &str,
|
|
||||||
from: Value,
|
|
||||||
to: Value,
|
|
||||||
create: bool,
|
|
||||||
) -> Result<()> {
|
|
||||||
todo!()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
*/
|
|
|
@ -1,174 +1,66 @@
|
||||||
use std::{
|
use std::{
|
||||||
collections::HashMap,
|
io::{Stdout, Write},
|
||||||
io::{BufRead, Write},
|
|
||||||
sync::{
|
sync::{
|
||||||
atomic::{AtomicU64, AtomicUsize, Ordering},
|
atomic::{AtomicU64, AtomicUsize, Ordering},
|
||||||
mpsc::{channel, Receiver, Sender},
|
|
||||||
Arc, Mutex, OnceLock,
|
Arc, Mutex, OnceLock,
|
||||||
},
|
},
|
||||||
thread::{self},
|
|
||||||
};
|
};
|
||||||
|
|
||||||
pub mod protocol;
|
pub mod protocol;
|
||||||
use protocol::ErrorCode;
|
pub use protocol::{Body, Message};
|
||||||
pub use protocol::{Body, Message, Payload};
|
|
||||||
use serde_json::Value;
|
|
||||||
|
|
||||||
pub mod kv;
|
|
||||||
|
|
||||||
pub type DynNode = Arc<Mutex<dyn Node>>;
|
pub type DynNode = Arc<Mutex<dyn Node>>;
|
||||||
pub type OnInit = Box<dyn Fn(&Runner)>;
|
|
||||||
|
|
||||||
pub type Result<T> = std::result::Result<T, ErrorCode>;
|
|
||||||
|
|
||||||
pub type RpcPromise = Receiver<Message>;
|
|
||||||
|
|
||||||
static MSG_ID: AtomicU64 = AtomicU64::new(0);
|
|
||||||
|
|
||||||
pub trait Node {
|
pub trait Node {
|
||||||
fn handle(&mut self, runner: &Runner, msg: Message);
|
fn handle(&mut self, runner: &Runner, msg: &Message);
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
|
||||||
pub struct Network {
|
|
||||||
promises: Arc<Mutex<HashMap<u64, Sender<Message>>>>,
|
|
||||||
node_output_tx: Sender<Message>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Network {
|
|
||||||
pub fn new() -> (Self, Receiver<Message>) {
|
|
||||||
let (node_output_tx, node_output_rx) = channel();
|
|
||||||
let net = Self {
|
|
||||||
node_output_tx,
|
|
||||||
promises: Arc::new(Mutex::new(HashMap::default())),
|
|
||||||
};
|
|
||||||
(net, node_output_rx)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn send(&self, msg: Message) {
|
|
||||||
self.node_output_tx.send(msg).unwrap();
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn rpc(&self, msg: Message) -> RpcPromise {
|
|
||||||
let (tx, rx) = channel();
|
|
||||||
{
|
|
||||||
let msg_id = msg.body.msg_id;
|
|
||||||
let mut g = self.promises.lock().unwrap();
|
|
||||||
g.insert(msg_id, tx);
|
|
||||||
}
|
|
||||||
self.send(msg);
|
|
||||||
rx
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct Runner {
|
pub struct Runner {
|
||||||
|
msg_id: AtomicU64,
|
||||||
node: DynNode,
|
node: DynNode,
|
||||||
node_id: OnceLock<String>,
|
node_id: OnceLock<String>,
|
||||||
nodes: OnceLock<Vec<String>>,
|
nodes: OnceLock<Vec<String>>,
|
||||||
network: OnceLock<Network>,
|
steps: AtomicUsize,
|
||||||
backdoor: OnceLock<Sender<Message>>,
|
output: Arc<Mutex<Stdout>>,
|
||||||
steps: Arc<AtomicUsize>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Runner {
|
impl Runner {
|
||||||
pub fn new(node: DynNode) -> Self {
|
pub fn new(node: DynNode, output: Stdout) -> Self {
|
||||||
Runner {
|
Runner {
|
||||||
node,
|
node,
|
||||||
|
msg_id: AtomicU64::new(1),
|
||||||
nodes: OnceLock::new(),
|
nodes: OnceLock::new(),
|
||||||
node_id: OnceLock::new(),
|
node_id: OnceLock::new(),
|
||||||
network: OnceLock::new(),
|
steps: AtomicUsize::new(0),
|
||||||
backdoor: OnceLock::new(),
|
output: Arc::new(Mutex::new(output)),
|
||||||
steps: Arc::new(AtomicUsize::new(0)),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn run(&self, on_init: Option<OnInit>) {
|
pub fn run(&self, input: std::sync::mpsc::Receiver<Message>) {
|
||||||
let (stdin_tx, stdin_rx) = channel();
|
for msg in input.iter() {
|
||||||
let (stdout_tx, stdout_rx) = channel();
|
self.steps.fetch_add(1, Ordering::SeqCst);
|
||||||
|
let typ = &msg.body.typ;
|
||||||
thread::spawn(move || {
|
if let "init" = typ.as_str() {
|
||||||
let stdin = std::io::stdin().lock().lines();
|
|
||||||
for line in stdin {
|
|
||||||
stdin_tx.send(line.unwrap()).unwrap();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
thread::spawn(move || {
|
|
||||||
let mut stdout = std::io::stdout().lock();
|
|
||||||
for msg in stdout_rx {
|
|
||||||
writeln!(&mut stdout, "{msg}").unwrap();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
self.run_internal(stdout_tx, stdin_rx, on_init);
|
|
||||||
}
|
|
||||||
|
|
||||||
fn run_internal(
|
|
||||||
&self,
|
|
||||||
stdout_tx: Sender<String>,
|
|
||||||
stdin_rx: Receiver<String>,
|
|
||||||
on_init: Option<OnInit>,
|
|
||||||
) {
|
|
||||||
let (network, node_receiver) = Network::new();
|
|
||||||
let _ = self.network.get_or_init(|| network.clone());
|
|
||||||
self.run_output(stdout_tx, node_receiver);
|
|
||||||
self.run_input(stdin_rx, network, on_init);
|
|
||||||
}
|
|
||||||
|
|
||||||
fn run_input(&self, stdin_rx: Receiver<String>, network: Network, on_init: Option<OnInit>) {
|
|
||||||
let (json_tx, json_rx) = channel();
|
|
||||||
let _ = self.backdoor.get_or_init(|| json_tx.clone());
|
|
||||||
|
|
||||||
thread::spawn(move || {
|
|
||||||
for line in stdin_rx {
|
|
||||||
let msg: Message = serde_json::from_str(&line).unwrap();
|
|
||||||
let irt = msg.body.in_reply_to;
|
|
||||||
if let Some(tx) = network.promises.lock().unwrap().remove(&irt) {
|
|
||||||
tx.send(msg).unwrap();
|
|
||||||
} else {
|
|
||||||
json_tx.send(msg).unwrap();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
for msg in json_rx {
|
|
||||||
let mut node = self.node.lock().unwrap();
|
|
||||||
if msg.body.typ.as_str() == "init" {
|
|
||||||
self.init(&msg);
|
self.init(&msg);
|
||||||
let body = Body::from_type("init_ok");
|
let body = Body::from_type("init_ok");
|
||||||
self.reply(&msg, body);
|
self.reply(&msg, body);
|
||||||
if let Some(ref on_init) = on_init {
|
|
||||||
on_init(self);
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
node.handle(self, msg);
|
let mut n = self.node.lock().unwrap();
|
||||||
|
n.handle(self, &msg);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn run_output(&self, stdout_tx: Sender<String>, node_output_rx: Receiver<Message>) {
|
|
||||||
thread::spawn(move || {
|
|
||||||
while let Ok(msg) = node_output_rx.recv() {
|
|
||||||
let msg = serde_json::to_string(&msg).unwrap();
|
|
||||||
stdout_tx.send(msg).unwrap();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn get_backdoor(&self) -> Sender<Message> {
|
|
||||||
self.backdoor.get().unwrap().clone()
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn node_id(&self) -> String {
|
pub fn node_id(&self) -> String {
|
||||||
self.node_id.get().cloned().unwrap_or_default()
|
self.node_id.get().cloned().unwrap_or_default()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn next_msg_id(&self) -> u64 {
|
pub fn next_msg_id(&self) -> u64 {
|
||||||
MSG_ID.fetch_add(1, Ordering::SeqCst)
|
self.msg_id.fetch_add(1, Ordering::SeqCst)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn cur_msg_id(&self) -> u64 {
|
pub fn cur_msg_id(&self) -> u64 {
|
||||||
MSG_ID.load(Ordering::SeqCst)
|
self.msg_id.load(Ordering::SeqCst)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn nodes(&self) -> &[String] {
|
pub fn nodes(&self) -> &[String] {
|
||||||
|
@ -198,18 +90,18 @@ impl Runner {
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.iter()
|
.iter()
|
||||||
.map(|s| s.as_str().unwrap().to_string())
|
.map(|s| s.as_str().unwrap().to_string())
|
||||||
.collect();
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
let _ = self.node_id.get_or_init(|| node_id);
|
let _ = self.node_id.get_or_init(|| node_id.to_owned());
|
||||||
let _ = self.nodes.get_or_init(|| nodes);
|
let _ = self.nodes.get_or_init(|| nodes.to_vec());
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn reply(&self, req: &Message, body: Body) {
|
pub fn reply(&self, req: &Message, body: Body) {
|
||||||
let mut body = body;
|
let mut body = body;
|
||||||
let dest = req.src.as_str();
|
let dest = req.src.clone();
|
||||||
let in_reply_to = req.body.msg_id;
|
let in_reply_to = req.body.msg_id;
|
||||||
body.in_reply_to = in_reply_to;
|
body.in_reply_to = in_reply_to;
|
||||||
self.send(dest, body);
|
self.send(&dest, body);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn send(&self, dest: &str, body: Body) {
|
pub fn send(&self, dest: &str, body: Body) {
|
||||||
|
@ -222,26 +114,14 @@ impl Runner {
|
||||||
dest: dest.to_string(),
|
dest: dest.to_string(),
|
||||||
body,
|
body,
|
||||||
};
|
};
|
||||||
self.network.get().unwrap().send(msg);
|
let msg = serde_json::to_string(&msg).unwrap();
|
||||||
|
self.writeln(&msg);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn rpc(&self, dest: &str, body: Body) -> RpcPromise {
|
fn writeln(&self, msg: &str) {
|
||||||
let mut body = body;
|
let mut out = self.output.lock().unwrap();
|
||||||
if body.msg_id == 0 {
|
let msg = format!("{msg}\n");
|
||||||
body.msg_id = self.next_msg_id();
|
out.write_all(msg.as_bytes()).unwrap();
|
||||||
}
|
out.flush().unwrap();
|
||||||
let msg = Message {
|
|
||||||
src: self.node_id().to_string(),
|
|
||||||
dest: dest.to_string(),
|
|
||||||
body,
|
|
||||||
};
|
|
||||||
self.network.get().unwrap().rpc(msg)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn mk_payload(payload: &[(&str, Value)]) -> Payload {
|
|
||||||
payload
|
|
||||||
.iter()
|
|
||||||
.map(|p| (p.0.to_string(), p.1.clone()))
|
|
||||||
.collect()
|
|
||||||
}
|
|
||||||
|
|
Loading…
Reference in a new issue