diff --git a/gg-broadcast/src/main.rs b/gg-broadcast/src/main.rs index 2598cb5..3537cb7 100644 --- a/gg-broadcast/src/main.rs +++ b/gg-broadcast/src/main.rs @@ -106,7 +106,7 @@ impl BCaster { } impl Node for BCaster { - fn handle(&mut self, runner: &Runner, req: &Message) { + fn handle(&mut self, runner: &Runner, req: Message) { let typ = req.body.typ.as_str(); let frm = req.src.as_str(); let nid = runner.node_id(); @@ -121,7 +121,7 @@ impl Node for BCaster { if let Some(val) = val { self.store.insert(val); let body = Body::from_type("broadcast_ok"); - runner.reply(req, body); + runner.reply(&req, body); } } "read" => { @@ -131,12 +131,12 @@ impl Node for BCaster { .collect(); let body = Body::from_type("read_ok").with_payload(payload); - runner.reply(req, body); + runner.reply(&req, body); } "topology" => { self.topology(runner.nodes(), nid); let body = Body::from_type("topology_ok"); - runner.reply(req, body); + runner.reply(&req, body); } "gossip" => { let goss = req diff --git a/gg-echo/src/main.rs b/gg-echo/src/main.rs index 29b1124..f6024e6 100644 --- a/gg-echo/src/main.rs +++ b/gg-echo/src/main.rs @@ -9,11 +9,11 @@ use nebkor_maelstrom::{Body, Message, Node, Runner}; struct Echo; impl Node for Echo { - fn handle(&mut self, runner: &Runner, msg: &Message) { + fn handle(&mut self, runner: &Runner, msg: Message) { let typ = &msg.body.typ; if typ.as_str() == "echo" { let body = Body::from_type("echo_ok").with_payload(msg.body.payload.clone()); - runner.reply(msg, body); + runner.reply(&msg, body); } } } diff --git a/gg-g_counter/src/main.rs b/gg-g_counter/src/main.rs index cad704a..08df8c3 100644 --- a/gg-g_counter/src/main.rs +++ b/gg-g_counter/src/main.rs @@ -1,13 +1,10 @@ use std::{ - collections::{HashMap, HashSet}, - io::BufRead, - sync::{mpsc::channel, Arc, Mutex}, + sync::{Arc, Mutex}, thread, time::Duration, }; -use nebkor_maelstrom::{mk_payload, mk_stdin, protocol::Payload, Body, Message, Node, Runner}; -use rand::Rng; +use nebkor_maelstrom::{mk_payload, mk_stdin, Body, Message, Node, Runner}; fn main() { let out = std::io::stdout(); @@ -16,11 +13,10 @@ fn main() { let node = Arc::new(Mutex::new(node)); let runner = Runner::new(node, out); - let runner = &runner; let (i, extra_input, rx) = mk_stdin(); let init = thread::spawn(move || { - thread::sleep(Duration::from_millis(101)); + thread::sleep(Duration::from_millis(10)); extra_input .send(Message { body: Body::from_type("kv_init"), @@ -40,36 +36,41 @@ const KEY: &str = "COUNTER"; struct Counter; impl Node for Counter { - fn handle(&mut self, runner: &Runner, req: &Message) { + fn handle<'slf>(&'slf mut self, runner: &'slf Runner, req: Message) { let typ = req.body.typ.as_str(); - let frm = req.src.as_str(); - let nid = runner.node_id(); - let nid = nid.as_str(); - let msg_id = req.body.msg_id; + let frm = req.src.clone(); + let msg_id = req.body.msg_id.to_owned(); match typ { "kv_init" => { - let mut nodes = runner.nodes().to_vec(); - nodes.sort_unstable(); - if nodes[0] == nid { - let payload = mk_payload(&[ - ("key", KEY.into()), - ("from", 0i64.into()), - ("to", 0i64.into()), - ("create_if_not_exists", true.into()), - ]); - let body = Body::from_type("cas").with_payload(payload); - runner.send("seq-kv", body); - } + let payload = mk_payload(&[ + ("key", KEY.into()), + ("from", 0i64.into()), + ("to", 0i64.into()), + ("create_if_not_exists", true.into()), + ]); + let body = Body::from_type("cas").with_payload(payload); + runner.send("seq-kv", body); } "add" => { - let mut h = |msg: &Message| { - let irt = msg_id; - let body = Body::from_type("add_ok").with_in_reply_to(irt); - runner.send(frm, body); - }; + runner.reply(&req, Body::from_type("add_ok")); + } + "read" => { + let rn = runner.clone(); + let h = move |msg: Message| { + let src = frm.clone(); + let value = msg.body.payload.get("value").unwrap().as_i64().unwrap(); + let irt = msg_id; + let payload = mk_payload(&[("value", value.into())]); + let body = Body::from_type("read_ok") + .with_in_reply_to(irt) + .with_payload(payload); + rn.send(&src, body); + }; + let payload = mk_payload(&[("key", KEY.into())]); + let body = Body::from_type("read").with_payload(payload); + runner.rpc("seq-kv", body, Box::new(h)); } - "read" => {} _ => { eprintln!("unknown type: {req:?}"); } diff --git a/gg-uid/src/main.rs b/gg-uid/src/main.rs index 72800cb..3c3bb85 100644 --- a/gg-uid/src/main.rs +++ b/gg-uid/src/main.rs @@ -34,7 +34,7 @@ fn main() { struct GenUid; impl Node for GenUid { - fn handle(&mut self, runner: &Runner, msg: &Message) { + fn handle(&mut self, runner: &Runner, msg: Message) { let id = runner.node_id(); let mid = runner.next_msg_id(); if msg.body.typ == "generate" { @@ -45,7 +45,7 @@ impl Node for GenUid { .with_msg_id(mid) .with_payload(payload); - runner.reply(msg, body); + runner.reply(&msg, body); } } } diff --git a/nebkor-maelstrom/src/kv.rs b/nebkor-maelstrom/src/kv.rs index 10296aa..3fc224b 100644 --- a/nebkor-maelstrom/src/kv.rs +++ b/nebkor-maelstrom/src/kv.rs @@ -1,4 +1,4 @@ -use serde::{Deserialize, Serialize}; +/* use serde_json::Value; use crate::Body; @@ -22,7 +22,8 @@ impl Kv { todo!() } - pub fn cas(&self, val: Value) -> Body { + pub fn cas(&self, key: &str, from: Value, to: Value, create: bool) -> Body { todo!() } } +*/ diff --git a/nebkor-maelstrom/src/lib.rs b/nebkor-maelstrom/src/lib.rs index f7df692..b496692 100644 --- a/nebkor-maelstrom/src/lib.rs +++ b/nebkor-maelstrom/src/lib.rs @@ -1,5 +1,4 @@ use std::{ - any::Any, collections::HashMap, io::{BufRead, Stdout, Write}, sync::{ @@ -17,18 +16,19 @@ use serde_json::Value; pub mod kv; pub type DynNode = Arc>; -pub type Handler = Box; +pub type Handler = Box; pub trait Node { - fn handle(&mut self, runner: &Runner, msg: &Message); + fn handle(&mut self, runner: &Runner, msg: Message); } +#[derive(Clone)] pub struct Runner { - msg_id: AtomicU64, + msg_id: Arc, node: DynNode, node_id: OnceLock, nodes: OnceLock>, - steps: AtomicUsize, + steps: Arc, output: Arc>, handlers: Arc>>, } @@ -37,10 +37,10 @@ impl Runner { pub fn new(node: DynNode, output: Stdout) -> Self { Runner { node, - msg_id: AtomicU64::new(1), + msg_id: Arc::new(AtomicU64::new(1)), nodes: OnceLock::new(), node_id: OnceLock::new(), - steps: AtomicUsize::new(0), + steps: Arc::new(AtomicUsize::new(0)), output: Arc::new(Mutex::new(output)), handlers: Default::default(), } @@ -58,11 +58,11 @@ impl Runner { { let mut g = self.handlers.lock().unwrap(); if let Some(mut h) = g.remove(&irt) { - h(&msg); + h(msg.clone()); } } let mut n = self.node.lock().unwrap(); - n.handle(self, &msg); + n.handle(self, msg); } } } @@ -125,10 +125,10 @@ impl Runner { pub fn reply(&self, req: &Message, body: Body) { let mut body = body; - let dest = req.src.clone(); + let dest = req.src.as_str(); let in_reply_to = req.body.msg_id; body.in_reply_to = in_reply_to; - self.send(&dest, body); + self.send(dest, body); } pub fn send(&self, dest: &str, body: Body) {