diff --git a/gg-broadcast/src/main.rs b/gg-broadcast/src/main.rs index b12e282..5dd3b5a 100644 --- a/gg-broadcast/src/main.rs +++ b/gg-broadcast/src/main.rs @@ -19,7 +19,7 @@ fn main() { } fn on_init(runner: &Runner) { - let tx = runner.get_input(); + let tx = runner.get_backdoor(); thread::spawn(move || loop { let millis = rand::thread_rng().gen_range(400..=800); thread::sleep(Duration::from_millis(millis)); diff --git a/gg-g_counter/src/main.rs b/gg-g_counter/src/main.rs index ca4bbc0..ef3cc66 100644 --- a/gg-g_counter/src/main.rs +++ b/gg-g_counter/src/main.rs @@ -2,6 +2,11 @@ use std::sync::{Arc, Mutex}; use nebkor_maelstrom::{mk_payload, Body, Message, Node, Runner}; +const KEY: &str = "COUNTER"; + +#[derive(Clone, Default)] +struct Counter; + fn main() { let node = Counter; let node = Arc::new(Mutex::new(node)); @@ -24,19 +29,38 @@ fn main() { runner.run(Some(on_init)); } -const KEY: &str = "COUNTER"; - -#[derive(Clone, Default)] -struct Counter; - impl Node for Counter { fn handle<'slf>(&'slf mut self, runner: &'slf Runner, req: Message) { + let read_payload = mk_payload(&[("key", KEY.into())]); + let read_body = Body::from_type("read").with_payload(read_payload); let typ = req.body.typ.as_str(); - let frm = req.src.clone(); - let msg_id = req.body.msg_id.to_owned(); + //let frm = req.src.clone(); + //let msg_id = req.body.msg_id.to_owned(); match typ { "add" => { let delta = req.body.payload.get("delta").unwrap().as_i64().unwrap(); + let cur = runner + .rpc("seq-kv", read_body) + .recv() + .unwrap() + .body + .payload + .get("value") + .cloned() + .unwrap() + .as_i64() + .unwrap(); + let cas_payload = mk_payload(&[ + ("key", KEY.into()), + ("from", cur.into()), + ("to", (cur + delta).into()), + ]); + // ERRORS BE HERE + runner + .rpc("seq-kv", Body::from_type("cas").with_payload(cas_payload)) + .recv() + .unwrap(); + runner.reply(&req, Body::from_type("add_ok")); } "read" => { let payload = mk_payload(&[("key", KEY.into())]); diff --git a/nebkor-maelstrom/src/lib.rs b/nebkor-maelstrom/src/lib.rs index 06acdc6..ebcf869 100644 --- a/nebkor-maelstrom/src/lib.rs +++ b/nebkor-maelstrom/src/lib.rs @@ -29,15 +29,45 @@ pub trait Node { fn handle(&mut self, runner: &Runner, msg: Message); } -#[derive(Clone)] +#[derive(Debug, Clone)] +pub struct Network { + promises: Arc>>>, + node_output_tx: Sender, +} + +impl Network { + pub fn new() -> (Self, Receiver) { + let (node_output_tx, node_output_rx) = channel(); + let net = Self { + node_output_tx, + promises: Arc::new(Mutex::new(HashMap::default())), + }; + (net, node_output_rx) + } + + pub fn send(&self, msg: Message) { + self.node_output_tx.send(msg).unwrap(); + } + + pub fn rpc(&self, msg: Message) -> RpcPromise { + let (tx, rx) = channel(); + { + let msg_id = msg.body.msg_id; + let mut g = self.promises.lock().unwrap(); + g.insert(msg_id, tx); + } + self.send(msg); + rx + } +} + pub struct Runner { node: DynNode, node_id: OnceLock, nodes: OnceLock>, + network: OnceLock, + backdoor: OnceLock>, steps: Arc, - promises: Arc>>>, - input: OnceLock>, - output: OnceLock>, } impl Runner { @@ -46,24 +76,64 @@ impl Runner { node, nodes: OnceLock::new(), node_id: OnceLock::new(), + network: OnceLock::new(), + backdoor: OnceLock::new(), steps: Arc::new(AtomicUsize::new(0)), - promises: Default::default(), - input: OnceLock::new(), - output: OnceLock::new(), } } pub fn run(&self, on_init: Option) { - let (stdin_tx, stdin_rx) = run_stdin(); - let _ = self.input.get_or_init(|| stdin_tx); - + let (stdin_tx, stdin_rx) = channel(); let (stdout_tx, stdout_rx) = channel(); - let _ = self.output.get_or_init(|| stdout_tx); - run_stdout(stdout_rx); - for msg in stdin_rx { - let typ = &msg.body.typ; - if let "init" = typ.as_str() { + thread::spawn(move || { + let stdin = std::io::stdin().lock().lines(); + for line in stdin { + stdin_tx.send(line.unwrap()).unwrap(); + } + }); + + thread::spawn(move || { + let mut stdout = std::io::stdout().lock(); + for msg in stdout_rx { + writeln!(&mut stdout, "{msg}").unwrap(); + } + }); + + self.run_internal(stdout_tx, stdin_rx, on_init); + } + + fn run_internal( + &self, + stdout_tx: Sender, + stdin_rx: Receiver, + on_init: Option, + ) { + let (network, node_receiver) = Network::new(); + let _ = self.network.get_or_init(|| network.clone()); + self.run_output(stdout_tx, node_receiver); + self.run_input(stdin_rx, network, on_init); + } + + fn run_input(&self, stdin_rx: Receiver, network: Network, on_init: Option) { + let (json_tx, json_rx) = channel(); + let _ = self.backdoor.get_or_init(|| json_tx.clone()); + + thread::spawn(move || { + for line in stdin_rx { + let msg: Message = serde_json::from_str(&line).unwrap(); + let irt = msg.body.in_reply_to; + if let Some(tx) = network.promises.lock().unwrap().remove(&irt) { + tx.send(msg).unwrap(); + } else { + json_tx.send(msg).unwrap(); + } + } + }); + + for msg in json_rx { + let mut node = self.node.lock().unwrap(); + if msg.body.typ.as_str() == "init" { self.init(&msg); let body = Body::from_type("init_ok"); self.reply(&msg, body); @@ -71,35 +141,22 @@ impl Runner { on_init(self); } } else { - let irt = msg.body.in_reply_to; - { - let mut g = self.promises.lock().unwrap(); - if let Some(h) = g.remove(&irt) { - h.send(msg.clone()).unwrap(); - continue; - } - } - let mut n = self.node.lock().unwrap(); - n.handle(self, msg); + node.handle(self, msg); } } } - pub fn rpc(&self, dest: &str, body: Body) -> RpcPromise { - let mut body = body; - let msg_id = self.next_msg_id(); - body.msg_id = msg_id; - let (tx, rx) = channel(); - { - let mut g = self.promises.lock().unwrap(); - g.insert(msg_id, tx); - } - self.send(dest, body); - rx + fn run_output(&self, stdout_tx: Sender, node_output_rx: Receiver) { + thread::spawn(move || { + while let Ok(msg) = node_output_rx.recv() { + let msg = serde_json::to_string(&msg).unwrap(); + stdout_tx.send(msg).unwrap(); + } + }); } - pub fn get_input(&self) -> Sender { - self.input.get().cloned().unwrap() + pub fn get_backdoor(&self) -> Sender { + self.backdoor.get().unwrap().clone() } pub fn node_id(&self) -> String { @@ -165,37 +222,21 @@ impl Runner { dest: dest.to_string(), body, }; - self.output.get().unwrap().send(msg).unwrap(); + self.network.get().unwrap().send(msg); } -} -/// Feeds lines from stdin to the MPSC Sender, so that the Receiver can be used -/// in the Runner::run() method. Clone the Sender if you want to inject messages -/// into the Runner. Join the handle after `run()`. -pub fn run_stdin() -> (Sender, Receiver) { - let stdin = std::io::stdin(); - let (tx, rx) = channel(); - let xtra_input = tx.clone(); - - thread::spawn(move || { - let g = stdin.lock(); - for line in g.lines().map_while(std::result::Result::ok) { - if let Ok(msg) = serde_json::from_str(&line) { - tx.send(msg).unwrap(); - } + pub fn rpc(&self, dest: &str, body: Body) -> RpcPromise { + let mut body = body; + if body.msg_id == 0 { + body.msg_id = self.next_msg_id(); } - }); - - (xtra_input, rx) -} - -fn run_stdout(rx: Receiver) { - thread::spawn(move || { - let mut stdout = std::io::stdout().lock(); - while let Ok(msg) = rx.recv() { - writeln!(stdout, "{}", serde_json::to_string(&msg).unwrap()).unwrap(); - } - }); + let msg = Message { + src: self.node_id().to_string(), + dest: dest.to_string(), + body, + }; + self.network.get().unwrap().rpc(msg) + } } pub fn mk_payload(payload: &[(&str, Value)]) -> Payload {