Compare commits
2 commits
e6d76ba37c
...
2793508a93
Author | SHA1 | Date | |
---|---|---|---|
|
2793508a93 | ||
|
6f45521660 |
5 changed files with 184 additions and 184 deletions
|
@ -1,7 +1,6 @@
|
|||
use std::{
|
||||
collections::{HashMap, HashSet},
|
||||
io::BufRead,
|
||||
sync::{mpsc::channel, Arc, Mutex},
|
||||
sync::{Arc, Mutex},
|
||||
thread,
|
||||
time::Duration,
|
||||
};
|
||||
|
@ -10,28 +9,18 @@ use nebkor_maelstrom::{protocol::Payload, Body, Message, Node, Runner};
|
|||
use rand::Rng;
|
||||
|
||||
fn main() {
|
||||
let out = std::io::stdout();
|
||||
let std_in = Arc::new(std::io::stdin());
|
||||
|
||||
let (tx, rx) = channel();
|
||||
let input = tx.clone();
|
||||
|
||||
let i = thread::spawn(move || {
|
||||
let g = std_in.lock();
|
||||
for line in g.lines().map_while(Result::ok) {
|
||||
if let Ok(msg) = serde_json::from_str::<Message>(&line) {
|
||||
input.send(msg).unwrap();
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
let node = BCaster::default();
|
||||
let node = Arc::new(Mutex::new(node));
|
||||
|
||||
let runner = Runner::new(node, out);
|
||||
let runner = Runner::new(node);
|
||||
let runner = &runner;
|
||||
|
||||
let g = thread::spawn(move || loop {
|
||||
runner.run(Some(Box::new(on_init)));
|
||||
}
|
||||
|
||||
fn on_init(runner: &Runner) {
|
||||
let tx = runner.get_backdoor();
|
||||
thread::spawn(move || loop {
|
||||
let millis = rand::thread_rng().gen_range(400..=800);
|
||||
thread::sleep(Duration::from_millis(millis));
|
||||
let body = Body::from_type("do_gossip");
|
||||
|
@ -41,10 +30,6 @@ fn main() {
|
|||
};
|
||||
tx.send(msg).unwrap();
|
||||
});
|
||||
|
||||
runner.run(rx, None);
|
||||
let _ = i.join();
|
||||
let _ = g.join();
|
||||
}
|
||||
|
||||
#[derive(Clone, Default)]
|
||||
|
|
|
@ -1,8 +1,4 @@
|
|||
use std::{
|
||||
io::BufRead,
|
||||
sync::{mpsc::channel, Arc, Mutex},
|
||||
thread,
|
||||
};
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
use nebkor_maelstrom::{Body, Message, Node, Runner};
|
||||
|
||||
|
@ -19,26 +15,11 @@ impl Node for Echo {
|
|||
}
|
||||
|
||||
fn main() {
|
||||
let out = std::io::stdout();
|
||||
let std_in = Arc::new(std::io::stdin());
|
||||
|
||||
let (tx, rx) = channel();
|
||||
|
||||
let i = thread::spawn(move || {
|
||||
let g = std_in.lock();
|
||||
for line in g.lines().map_while(Result::ok) {
|
||||
if let Ok(msg) = serde_json::from_str::<Message>(&line) {
|
||||
tx.send(msg).unwrap();
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
let node = Echo;
|
||||
|
||||
let node = Arc::new(Mutex::new(node));
|
||||
|
||||
let runner = Runner::new(node, out);
|
||||
let runner = Runner::new(node);
|
||||
|
||||
runner.run(rx, None);
|
||||
i.join().unwrap();
|
||||
runner.run(None);
|
||||
}
|
||||
|
|
|
@ -1,17 +1,17 @@
|
|||
use std::{
|
||||
rc::Rc,
|
||||
sync::{Arc, Mutex},
|
||||
};
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
use nebkor_maelstrom::{mk_payload, mk_stdin, Body, Message, Node, Runner};
|
||||
use nebkor_maelstrom::{mk_payload, Body, Message, Node, Runner};
|
||||
|
||||
const KEY: &str = "COUNTER";
|
||||
|
||||
#[derive(Clone, Default)]
|
||||
struct Counter;
|
||||
|
||||
fn main() {
|
||||
let node = Counter;
|
||||
let node = Arc::new(Mutex::new(node));
|
||||
|
||||
let runner = Rc::new(Runner::new(node));
|
||||
|
||||
let (i, _, rx) = mk_stdin();
|
||||
let runner = Runner::new(node);
|
||||
|
||||
let on_init = |rnr: &Runner| {
|
||||
let payload = mk_payload(&[
|
||||
|
@ -26,62 +26,56 @@ fn main() {
|
|||
|
||||
let on_init = Box::new(on_init);
|
||||
|
||||
runner.run(rx, Some(on_init));
|
||||
i.join().unwrap();
|
||||
runner.run(Some(on_init));
|
||||
}
|
||||
|
||||
const KEY: &str = "COUNTER";
|
||||
|
||||
#[derive(Clone, Default)]
|
||||
struct Counter;
|
||||
|
||||
impl Node for Counter {
|
||||
fn handle<'slf>(&'slf mut self, runner: &'slf Rc<Runner>, req: Message) {
|
||||
fn handle<'slf>(&'slf mut self, runner: &'slf Runner, req: Message) {
|
||||
let read_payload = mk_payload(&[("key", KEY.into())]);
|
||||
let read_body = Body::from_type("read").with_payload(read_payload);
|
||||
let typ = req.body.typ.as_str();
|
||||
let frm = req.src.clone();
|
||||
let msg_id = req.body.msg_id.to_owned();
|
||||
//let frm = req.src.clone();
|
||||
//let msg_id = req.body.msg_id.to_owned();
|
||||
match typ {
|
||||
"add" => {
|
||||
let read_runner = runner.clone();
|
||||
let cas_runner = runner.clone();
|
||||
let add_req = req.clone();
|
||||
|
||||
let cas_handler = move |_msg: Message| {
|
||||
let req = add_req;
|
||||
cas_runner.reply(&req, Body::from_type("add_ok"));
|
||||
};
|
||||
|
||||
let delta = req.body.payload.get("delta").unwrap().as_i64().unwrap();
|
||||
let read_handler = move |msg: Message| {
|
||||
let value = msg.body.payload.get("value").unwrap().as_i64().unwrap();
|
||||
let payload = mk_payload(&[
|
||||
("from", value.into()),
|
||||
("to", (value + delta).into()),
|
||||
("key", KEY.into()),
|
||||
]);
|
||||
let body = Body::from_type("cas").with_payload(payload);
|
||||
read_runner.rpc("seq-kv", body, Box::new(cas_handler));
|
||||
};
|
||||
// kick it off by calling "read" on seq-kv:
|
||||
let payload = mk_payload(&[("key", KEY.into())]);
|
||||
let body = Body::from_type("read").with_payload(payload);
|
||||
runner.rpc("seq-kv", body, Box::new(read_handler));
|
||||
let cur = runner
|
||||
.rpc("seq-kv", read_body)
|
||||
.recv()
|
||||
.unwrap()
|
||||
.body
|
||||
.payload
|
||||
.get("value")
|
||||
.cloned()
|
||||
.unwrap()
|
||||
.as_i64()
|
||||
.unwrap();
|
||||
let cas_payload = mk_payload(&[
|
||||
("key", KEY.into()),
|
||||
("from", cur.into()),
|
||||
("to", (cur + delta).into()),
|
||||
]);
|
||||
// ERRORS BE HERE
|
||||
runner
|
||||
.rpc("seq-kv", Body::from_type("cas").with_payload(cas_payload))
|
||||
.recv()
|
||||
.unwrap();
|
||||
runner.reply(&req, Body::from_type("add_ok"));
|
||||
}
|
||||
"read" => {
|
||||
let rn = runner.clone();
|
||||
let h = move |msg: Message| {
|
||||
let src = frm.clone();
|
||||
let value = msg.body.payload.get("value").unwrap().as_i64().unwrap();
|
||||
let irt = msg_id;
|
||||
let payload = mk_payload(&[("value", value.into())]);
|
||||
let body = Body::from_type("read_ok")
|
||||
.with_in_reply_to(irt)
|
||||
.with_payload(payload);
|
||||
rn.send(&src, body);
|
||||
};
|
||||
let payload = mk_payload(&[("key", KEY.into())]);
|
||||
let body = Body::from_type("read").with_payload(payload);
|
||||
runner.rpc("seq-kv", body, Box::new(h));
|
||||
let val = runner
|
||||
.rpc("seq-kv", body)
|
||||
.recv()
|
||||
.unwrap()
|
||||
.body
|
||||
.payload
|
||||
.get("value")
|
||||
.cloned()
|
||||
.unwrap();
|
||||
let body = Body::from_type("read_ok").with_payload(mk_payload(&[("value", val)]));
|
||||
runner.reply(&req, body);
|
||||
}
|
||||
_ => {
|
||||
eprintln!("unknown type: {req:?}");
|
||||
|
|
|
@ -1,33 +1,14 @@
|
|||
use std::{
|
||||
io::BufRead,
|
||||
sync::{mpsc::channel, Arc, Mutex},
|
||||
thread,
|
||||
};
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
use nebkor_maelstrom::{protocol::Payload, Body, Message, Node, Runner};
|
||||
|
||||
fn main() {
|
||||
let out = std::io::stdout();
|
||||
let std_in = Arc::new(std::io::stdin());
|
||||
|
||||
let (tx, rx) = channel();
|
||||
|
||||
let i = thread::spawn(move || {
|
||||
let g = std_in.lock();
|
||||
for line in g.lines().map_while(Result::ok) {
|
||||
if let Ok(msg) = serde_json::from_str::<Message>(&line) {
|
||||
tx.send(msg).unwrap();
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
let node = GenUid;
|
||||
let node = Arc::new(Mutex::new(node));
|
||||
|
||||
let runner = Runner::new(node, out);
|
||||
let runner = Runner::new(node);
|
||||
|
||||
runner.run(rx, None);
|
||||
i.join().unwrap();
|
||||
runner.run(None);
|
||||
}
|
||||
|
||||
#[derive(Clone, Default)]
|
||||
|
|
|
@ -1,13 +1,12 @@
|
|||
use std::{
|
||||
collections::HashMap,
|
||||
io::{BufRead, Stdout, Write},
|
||||
rc::Rc,
|
||||
io::{BufRead, Write},
|
||||
sync::{
|
||||
atomic::{AtomicU64, AtomicUsize, Ordering},
|
||||
mpsc::{channel, Receiver, Sender},
|
||||
Arc, LazyLock, Mutex, OnceLock,
|
||||
Arc, Mutex, OnceLock,
|
||||
},
|
||||
thread::{self, JoinHandle},
|
||||
thread::{self},
|
||||
};
|
||||
|
||||
pub mod protocol;
|
||||
|
@ -18,25 +17,57 @@ use serde_json::Value;
|
|||
pub mod kv;
|
||||
|
||||
pub type DynNode = Arc<Mutex<dyn Node>>;
|
||||
pub type Handler = Box<dyn FnOnce(Message)>; // -> Result<Option<Value>>>;
|
||||
pub type OnInit = Box<dyn Fn(&Runner)>;
|
||||
|
||||
pub type Result<T> = std::result::Result<T, ErrorCode>;
|
||||
|
||||
pub type RpcPromise = Receiver<Message>;
|
||||
|
||||
static MSG_ID: AtomicU64 = AtomicU64::new(0);
|
||||
static OUTPUT: LazyLock<Stdout> = LazyLock::new(std::io::stdout);
|
||||
|
||||
pub trait Node {
|
||||
fn handle(&mut self, runner: &Rc<Runner>, msg: Message);
|
||||
fn handle(&mut self, runner: &Runner, msg: Message);
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Network {
|
||||
promises: Arc<Mutex<HashMap<u64, Sender<Message>>>>,
|
||||
node_output_tx: Sender<Message>,
|
||||
}
|
||||
|
||||
impl Network {
|
||||
pub fn new() -> (Self, Receiver<Message>) {
|
||||
let (node_output_tx, node_output_rx) = channel();
|
||||
let net = Self {
|
||||
node_output_tx,
|
||||
promises: Arc::new(Mutex::new(HashMap::default())),
|
||||
};
|
||||
(net, node_output_rx)
|
||||
}
|
||||
|
||||
pub fn send(&self, msg: Message) {
|
||||
self.node_output_tx.send(msg).unwrap();
|
||||
}
|
||||
|
||||
pub fn rpc(&self, msg: Message) -> RpcPromise {
|
||||
let (tx, rx) = channel();
|
||||
{
|
||||
let msg_id = msg.body.msg_id;
|
||||
let mut g = self.promises.lock().unwrap();
|
||||
g.insert(msg_id, tx);
|
||||
}
|
||||
self.send(msg);
|
||||
rx
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct Runner {
|
||||
node: DynNode,
|
||||
node_id: OnceLock<String>,
|
||||
nodes: OnceLock<Vec<String>>,
|
||||
network: OnceLock<Network>,
|
||||
backdoor: OnceLock<Sender<Message>>,
|
||||
steps: Arc<AtomicUsize>,
|
||||
handlers: Arc<Mutex<HashMap<u64, Handler>>>,
|
||||
}
|
||||
|
||||
impl Runner {
|
||||
|
@ -45,44 +76,87 @@ impl Runner {
|
|||
node,
|
||||
nodes: OnceLock::new(),
|
||||
node_id: OnceLock::new(),
|
||||
network: OnceLock::new(),
|
||||
backdoor: OnceLock::new(),
|
||||
steps: Arc::new(AtomicUsize::new(0)),
|
||||
handlers: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn run(self: &Rc<Self>, input: Receiver<Message>, on_init: Option<OnInit>) {
|
||||
for msg in input.iter() {
|
||||
let typ = &msg.body.typ;
|
||||
if let "init" = typ.as_str() {
|
||||
pub fn run(&self, on_init: Option<OnInit>) {
|
||||
let (stdin_tx, stdin_rx) = channel();
|
||||
let (stdout_tx, stdout_rx) = channel();
|
||||
|
||||
thread::spawn(move || {
|
||||
let stdin = std::io::stdin().lock().lines();
|
||||
for line in stdin {
|
||||
stdin_tx.send(line.unwrap()).unwrap();
|
||||
}
|
||||
});
|
||||
|
||||
thread::spawn(move || {
|
||||
let mut stdout = std::io::stdout().lock();
|
||||
for msg in stdout_rx {
|
||||
writeln!(&mut stdout, "{msg}").unwrap();
|
||||
}
|
||||
});
|
||||
|
||||
self.run_internal(stdout_tx, stdin_rx, on_init);
|
||||
}
|
||||
|
||||
fn run_internal(
|
||||
&self,
|
||||
stdout_tx: Sender<String>,
|
||||
stdin_rx: Receiver<String>,
|
||||
on_init: Option<OnInit>,
|
||||
) {
|
||||
let (network, node_receiver) = Network::new();
|
||||
let _ = self.network.get_or_init(|| network.clone());
|
||||
self.run_output(stdout_tx, node_receiver);
|
||||
self.run_input(stdin_rx, network, on_init);
|
||||
}
|
||||
|
||||
fn run_input(&self, stdin_rx: Receiver<String>, network: Network, on_init: Option<OnInit>) {
|
||||
let (json_tx, json_rx) = channel();
|
||||
let _ = self.backdoor.get_or_init(|| json_tx.clone());
|
||||
|
||||
thread::spawn(move || {
|
||||
for line in stdin_rx {
|
||||
let msg: Message = serde_json::from_str(&line).unwrap();
|
||||
let irt = msg.body.in_reply_to;
|
||||
if let Some(tx) = network.promises.lock().unwrap().remove(&irt) {
|
||||
tx.send(msg).unwrap();
|
||||
} else {
|
||||
json_tx.send(msg).unwrap();
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
for msg in json_rx {
|
||||
let mut node = self.node.lock().unwrap();
|
||||
if msg.body.typ.as_str() == "init" {
|
||||
self.init(&msg);
|
||||
let body = Body::from_type("init_ok");
|
||||
self.reply(&msg, body);
|
||||
if let Some(ref h) = on_init {
|
||||
h(self);
|
||||
if let Some(ref on_init) = on_init {
|
||||
on_init(self);
|
||||
}
|
||||
} else {
|
||||
let irt = msg.body.in_reply_to;
|
||||
{
|
||||
let mut g = self.handlers.lock().unwrap();
|
||||
if let Some(h) = g.remove(&irt) {
|
||||
h(msg.clone());
|
||||
}
|
||||
}
|
||||
let mut n = self.node.lock().unwrap();
|
||||
n.handle(self, msg);
|
||||
node.handle(self, msg);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn rpc(&self, dest: &str, body: Body, handler: Handler) {
|
||||
let mut body = body;
|
||||
let msg_id = self.next_msg_id();
|
||||
body.msg_id = msg_id;
|
||||
{
|
||||
let mut g = self.handlers.lock().unwrap();
|
||||
g.insert(msg_id, handler);
|
||||
}
|
||||
self.send(dest, body);
|
||||
fn run_output(&self, stdout_tx: Sender<String>, node_output_rx: Receiver<Message>) {
|
||||
thread::spawn(move || {
|
||||
while let Ok(msg) = node_output_rx.recv() {
|
||||
let msg = serde_json::to_string(&msg).unwrap();
|
||||
stdout_tx.send(msg).unwrap();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
pub fn get_backdoor(&self) -> Sender<Message> {
|
||||
self.backdoor.get().unwrap().clone()
|
||||
}
|
||||
|
||||
pub fn node_id(&self) -> String {
|
||||
|
@ -124,10 +198,10 @@ impl Runner {
|
|||
.unwrap()
|
||||
.iter()
|
||||
.map(|s| s.as_str().unwrap().to_string())
|
||||
.collect::<Vec<_>>();
|
||||
.collect();
|
||||
|
||||
let _ = self.node_id.get_or_init(|| node_id.to_owned());
|
||||
let _ = self.nodes.get_or_init(|| nodes.to_vec());
|
||||
let _ = self.node_id.get_or_init(|| node_id);
|
||||
let _ = self.nodes.get_or_init(|| nodes);
|
||||
}
|
||||
|
||||
pub fn reply(&self, req: &Message, body: Body) {
|
||||
|
@ -148,36 +222,21 @@ impl Runner {
|
|||
dest: dest.to_string(),
|
||||
body,
|
||||
};
|
||||
let msg = serde_json::to_string(&msg).unwrap();
|
||||
self.writeln(&msg);
|
||||
self.network.get().unwrap().send(msg);
|
||||
}
|
||||
|
||||
fn writeln(&self, msg: &str) {
|
||||
let mut out = OUTPUT.lock();
|
||||
let msg = format!("{msg}\n");
|
||||
out.write_all(msg.as_bytes()).unwrap();
|
||||
out.flush().unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
/// Feeds lines from stdin to the MPSC Sender, so that the Receiver can be used
|
||||
/// in the Runner::run() method. Clone the Sender if you want to inject messages
|
||||
/// into the Runner. Join the handle after `run()`.
|
||||
pub fn mk_stdin() -> (JoinHandle<()>, Sender<Message>, Receiver<Message>) {
|
||||
let stdin = std::io::stdin();
|
||||
let (tx, rx) = channel();
|
||||
let xtra_input = tx.clone();
|
||||
|
||||
let join = thread::spawn(move || {
|
||||
let g = stdin.lock();
|
||||
for line in g.lines().map_while(std::result::Result::ok) {
|
||||
if let Ok(msg) = serde_json::from_str(&line) {
|
||||
tx.send(msg).unwrap();
|
||||
}
|
||||
pub fn rpc(&self, dest: &str, body: Body) -> RpcPromise {
|
||||
let mut body = body;
|
||||
if body.msg_id == 0 {
|
||||
body.msg_id = self.next_msg_id();
|
||||
}
|
||||
});
|
||||
|
||||
(join, xtra_input, rx)
|
||||
let msg = Message {
|
||||
src: self.node_id().to_string(),
|
||||
dest: dest.to_string(),
|
||||
body,
|
||||
};
|
||||
self.network.get().unwrap().rpc(msg)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn mk_payload(payload: &[(&str, Value)]) -> Payload {
|
||||
|
|
Loading…
Reference in a new issue