diff --git a/Cargo.lock b/Cargo.lock index 21a7c2a..43dc217 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,11 +2,29 @@ # It is not intended for manual editing. version = 3 +[[package]] +name = "cfg-if" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" + +[[package]] +name = "getrandom" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4567c8db10ae91089c99af84c68c38da3ec2f087c3f82960bcdbf3656b6f4d7" +dependencies = [ + "cfg-if", + "libc", + "wasi", +] + [[package]] name = "gg-broadcast" version = "0.0.1" dependencies = [ "nebkor-maelstrom", + "rand", "serde_json", ] @@ -32,6 +50,12 @@ version = "1.0.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49f1f14873335454500d59611f1cf4a4b0f786f9ac11f4312a78e4cf2566695b" +[[package]] +name = "libc" +version = "0.2.155" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97b3888a4aecf77e811145cadf6eef5901f4782c53886191b2f693f24761847c" + [[package]] name = "nebkor-maelstrom" version = "0.0.1" @@ -41,6 +65,12 @@ dependencies = [ "serde_repr", ] +[[package]] +name = "ppv-lite86" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" + [[package]] name = "proc-macro2" version = "1.0.83" @@ -59,6 +89,36 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "rand" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" +dependencies = [ + "libc", + "rand_chacha", + "rand_core", +] + +[[package]] +name = "rand_chacha" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" +dependencies = [ + "ppv-lite86", + "rand_core", +] + +[[package]] +name = "rand_core" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" +dependencies = [ + "getrandom", +] + [[package]] name = "ryu" version = "1.0.18" @@ -123,3 +183,9 @@ name = "unicode-ident" version = "1.0.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3354b9ac3fae1ff6755cb6db53683adb661634f67557942dea4facebec0fee4b" + +[[package]] +name = "wasi" +version = "0.11.0+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" diff --git a/gg-broadcast/Cargo.toml b/gg-broadcast/Cargo.toml index ed7d5d3..60f714d 100644 --- a/gg-broadcast/Cargo.toml +++ b/gg-broadcast/Cargo.toml @@ -8,3 +8,4 @@ license-file.workspace = true [dependencies] serde_json.workspace = true nebkor-maelstrom = { path = "../nebkor-maelstrom" } +rand = "0.8.5" diff --git a/gg-broadcast/src/main.rs b/gg-broadcast/src/main.rs index 6201256..c648821 100644 --- a/gg-broadcast/src/main.rs +++ b/gg-broadcast/src/main.rs @@ -1,83 +1,131 @@ -use std::{collections::HashMap, rc::Rc, sync::Mutex}; +use std::{ + collections::{HashMap, HashSet}, + io::BufRead, + sync::{mpsc::channel, Arc, Mutex}, + thread, + time::Duration, +}; use nebkor_maelstrom::{protocol::Payload, Body, Message, Node, Runner}; +use rand::Rng; fn main() { - let out = std::io::stdout().lock(); - let input = std::io::stdin().lock(); + let out = std::io::stdout(); + let std_in = Arc::new(std::io::stdin()); + + let (tx, rx) = channel(); + let input = tx.clone(); + + let i = thread::spawn(move || { + let g = std_in.lock(); + for line in g.lines().map_while(Result::ok) { + if let Ok(msg) = serde_json::from_str::(&line) { + input.send(msg).unwrap(); + } + } + }); let node = BCaster::default(); - let node = Rc::new(Mutex::new(node)); + let node = Arc::new(Mutex::new(node)); - let runner = Runner::new(out, node); + let runner = Runner::new(node, out); + let runner = &runner; - runner.run(input); + let g = thread::spawn(move || loop { + let millis = rand::thread_rng().gen_range(120..=180); + thread::sleep(Duration::from_millis(millis)); + let body = Body::from_type("do_gossip"); + let msg = Message { + body, + ..Default::default() + }; + tx.send(msg).unwrap(); + }); + + runner.run(rx); + let _ = i.join(); + let _ = g.join(); } -type Store = HashMap; -type Gossips = HashMap; - #[derive(Clone, Default)] struct BCaster { - pub store: Store, - pub gossips: Gossips, + store: HashSet, + gossips: HashMap>, + neighbors: HashSet, +} + +const NUM_NEIGHBORS: i32 = 5; + +impl BCaster { + fn topology(&mut self, nodes: &[String], id: &str) { + let mut nodes = nodes.to_vec(); + let len = nodes.len(); + nodes.sort_unstable(); + let idx = nodes + .iter() + .enumerate() + .find(|n| n.1.as_str() == id) + .unwrap() + .0; + for i in 1..=NUM_NEIGHBORS { + let ni = idx as i32 - i; + let left = if ni < 0 { + let ni = -ni; + let ni = ni as usize % len; + len - ni + } else { + ni as usize + }; + + let right = (idx + i as usize) % len; + self.neighbors.insert(nodes[left].clone()); + self.neighbors.insert(nodes[right].clone()); + } + } + + fn gossip(&self, runner: &Runner) { + let mut rng = rand::thread_rng(); + let rng = &mut rng; + for node in self.neighbors.iter() { + let mut goss = if let Some(neighbor) = self.gossips.get(node) { + self.store.difference(neighbor).cloned().collect() + } else { + self.store.clone() + }; + for v in self.store.iter().filter(|_| rng.gen_bool(0.11)) { + goss.insert(*v); + } + if !goss.is_empty() { + let goss: Vec<_> = goss.into_iter().collect(); + let payload: Payload = [("goss".to_string(), goss.into())].into_iter().collect(); + let body = Body::from_type("gossip").with_payload(payload); + runner.send(node, body); + } + } + } } impl Node for BCaster { fn handle(&mut self, runner: &Runner, req: &Message) { let typ = req.body.typ.as_str(); let frm = req.src.as_str(); - let mid = req.body.msg_id; let nid = runner.node_id(); let nid = nid.as_str(); - let key = format!("{frm}-{mid}"); - let key = key.as_str(); - - { - for (_, gossip) in self.gossips.iter() { - runner.send(gossip.clone()); - } - } match typ { - "gossip_ok" => { - let id = req.body.in_reply_to; - self.gossips.remove(&id); + "do_gossip" => { + self.gossip(runner); } "broadcast" => { let val = req.body.payload.get("message").and_then(|v| v.as_i64()); if let Some(val) = val { - { - // there's only one thread, safe to unwrap - self.store.entry(key.to_owned()).or_insert(val); - } - - let extra: Payload = [ - ("key".to_string(), key.into()), - ("val".to_string(), val.into()), - ] - .into_iter() - .collect(); - let gossip = Body::from_type("gossip").with_payload(extra); - for node in runner.nodes().iter().filter(|n| n.as_str() != nid) { - let id = runner.next_msg_id(); - let gossip = gossip.clone().with_msg_id(id); - let msg = Message { - body: gossip, - src: nid.to_string(), - dest: node.to_string(), - }; - self.gossips.insert(id, msg.clone()); - - runner.send(msg); - } + self.store.insert(val); let body = Body::from_type("broadcast_ok"); runner.reply(req, body); } } "read" => { - let vals = self.store.values().cloned().collect::>(); - + let vals = self.store.iter().copied().collect::>(); let payload: Payload = [("messages".to_string(), vals.into())] .into_iter() .collect(); @@ -86,17 +134,29 @@ impl Node for BCaster { runner.reply(req, body); } "topology" => { + self.topology(runner.nodes(), nid); let body = Body::from_type("topology_ok"); runner.reply(req, body); } "gossip" => { - let key = req.body.payload.get("key").unwrap().as_str().unwrap(); - let val = req.body.payload.get("val").unwrap().as_i64().unwrap(); + let goss = req + .body + .payload + .get("goss") + .unwrap() + .as_array() + .unwrap() + .iter() + .map(|v| v.as_i64().unwrap()) + .collect::>(); - self.store.entry(key.to_string()).or_insert(val); - let body = Body::from_type("gossip_ok"); - runner.reply(req, body); + self.store.extend(goss.clone()); + self.gossips + .entry(frm.to_string()) + .or_default() + .extend(goss); } + _ => { eprintln!("unknown type: {req:?}"); } diff --git a/gg-echo/src/main.rs b/gg-echo/src/main.rs index 36b6144..29b1124 100644 --- a/gg-echo/src/main.rs +++ b/gg-echo/src/main.rs @@ -1,4 +1,8 @@ -use std::{rc::Rc, sync::Mutex}; +use std::{ + io::BufRead, + sync::{mpsc::channel, Arc, Mutex}, + thread, +}; use nebkor_maelstrom::{Body, Message, Node, Runner}; @@ -7,24 +11,34 @@ struct Echo; impl Node for Echo { fn handle(&mut self, runner: &Runner, msg: &Message) { let typ = &msg.body.typ; - match typ.as_str() { - "echo" => { - let body = Body::from_type("echo_ok").with_payload(msg.body.payload.clone()); - runner.reply(&msg, body); - } - _ => {} + if typ.as_str() == "echo" { + let body = Body::from_type("echo_ok").with_payload(msg.body.payload.clone()); + runner.reply(msg, body); } } } fn main() { - let out = std::io::stdout().lock(); - let input = std::io::stdin().lock(); + let out = std::io::stdout(); + let std_in = Arc::new(std::io::stdin()); + + let (tx, rx) = channel(); + + let i = thread::spawn(move || { + let g = std_in.lock(); + for line in g.lines().map_while(Result::ok) { + if let Ok(msg) = serde_json::from_str::(&line) { + tx.send(msg).unwrap(); + } + } + }); let node = Echo; - let node = Rc::new(Mutex::new(node)); - let runner = Runner::new(out, node); + let node = Arc::new(Mutex::new(node)); - runner.run(input); + let runner = Runner::new(node, out); + + runner.run(rx); + i.join().unwrap(); } diff --git a/gg-uid/src/main.rs b/gg-uid/src/main.rs index 2a6168a..72800cb 100644 --- a/gg-uid/src/main.rs +++ b/gg-uid/src/main.rs @@ -1,17 +1,33 @@ -use std::{rc::Rc, sync::Mutex}; +use std::{ + io::BufRead, + sync::{mpsc::channel, Arc, Mutex}, + thread, +}; use nebkor_maelstrom::{protocol::Payload, Body, Message, Node, Runner}; fn main() { - let out = std::io::stdout().lock(); - let input = std::io::stdin().lock(); + let out = std::io::stdout(); + let std_in = Arc::new(std::io::stdin()); + + let (tx, rx) = channel(); + + let i = thread::spawn(move || { + let g = std_in.lock(); + for line in g.lines().map_while(Result::ok) { + if let Ok(msg) = serde_json::from_str::(&line) { + tx.send(msg).unwrap(); + } + } + }); let node = GenUid; - let node = Rc::new(Mutex::new(node)); + let node = Arc::new(Mutex::new(node)); - let runner = Runner::new(out, node); + let runner = Runner::new(node, out); - runner.run(input); + runner.run(rx); + i.join().unwrap(); } #[derive(Clone, Default)] diff --git a/nebkor-maelstrom/src/lib.rs b/nebkor-maelstrom/src/lib.rs index cb38e55..f6c929a 100644 --- a/nebkor-maelstrom/src/lib.rs +++ b/nebkor-maelstrom/src/lib.rs @@ -1,57 +1,53 @@ use std::{ - cell::OnceCell, - io::{BufRead, StdinLock, StdoutLock, Write}, - rc::Rc, + io::{Stdout, Write}, sync::{ - atomic::{AtomicU64, Ordering}, - Mutex, + atomic::{AtomicU64, AtomicUsize, Ordering}, + Arc, Mutex, OnceLock, }, }; pub mod protocol; pub use protocol::{Body, Message}; -pub type DynNode = Rc>; +pub type DynNode = Arc>; pub trait Node { fn handle(&mut self, runner: &Runner, msg: &Message); } -pub struct Runner<'io> { +pub struct Runner { msg_id: AtomicU64, node: DynNode, - node_id: OnceCell, - nodes: OnceCell>, - output: Mutex>, - steps: AtomicU64, + node_id: OnceLock, + nodes: OnceLock>, + steps: AtomicUsize, + output: Arc>, } -impl<'io> Runner<'io> { - pub fn new(output: StdoutLock<'io>, node: DynNode) -> Self { +impl Runner { + pub fn new(node: DynNode, output: Stdout) -> Self { Runner { - output: Mutex::new(output), node, msg_id: AtomicU64::new(1), - nodes: OnceCell::new(), - node_id: OnceCell::new(), - steps: AtomicU64::new(0), + nodes: OnceLock::new(), + node_id: OnceLock::new(), + steps: AtomicUsize::new(0), + output: Arc::new(Mutex::new(output)), } } - pub fn run(&self, input: StdinLock) { - for line in input.lines().map_while(Result::ok) { - if let Ok(msg) = serde_json::from_str::(&line) { - let typ = &msg.body.typ; - if let "init" = typ.as_str() { - self.init(&msg); - let body = Body::from_type("init_ok"); - self.reply(&msg, body); - } else { - let mut n = self.node.lock().unwrap(); - n.handle(self, &msg); - } - } + pub fn run(&self, input: std::sync::mpsc::Receiver) { + for msg in input.iter() { self.steps.fetch_add(1, Ordering::SeqCst); + let typ = &msg.body.typ; + if let "init" = typ.as_str() { + self.init(&msg); + let body = Body::from_type("init_ok"); + self.reply(&msg, body); + } else { + let mut n = self.node.lock().unwrap(); + n.handle(self, &msg); + } } } @@ -63,10 +59,18 @@ impl<'io> Runner<'io> { self.msg_id.fetch_add(1, Ordering::SeqCst) } + pub fn cur_msg_id(&self) -> u64 { + self.msg_id.load(Ordering::SeqCst) + } + pub fn nodes(&self) -> &[String] { self.nodes.get().unwrap() } + pub fn steps(&self) -> usize { + self.steps.load(Ordering::SeqCst) + } + pub fn init(&self, msg: &Message) { let node_id = msg .body @@ -94,20 +98,22 @@ impl<'io> Runner<'io> { pub fn reply(&self, req: &Message, body: Body) { let mut body = body; - let src = self.node_id.get().unwrap().to_owned(); let dest = req.src.clone(); let in_reply_to = req.body.msg_id; body.in_reply_to = in_reply_to; - let msg = Message { src, dest, body }; - self.send(msg); + self.send(&dest, body); } - pub fn send(&self, msg: Message) { - let mut msg = msg; - if msg.body.msg_id == 0 { - let mid = self.next_msg_id(); - msg.body.msg_id = mid; + pub fn send(&self, dest: &str, body: Body) { + let mut body = body; + if body.msg_id == 0 { + body.msg_id = self.next_msg_id(); } + let msg = Message { + src: self.node_id().to_string(), + dest: dest.to_string(), + body, + }; let msg = serde_json::to_string(&msg).unwrap(); self.writeln(&msg); }