chatty-catties/gg-broadcast/src/main.rs
Joe Ardent 461087023d Make handle take Message by value.
This makes closures easier to deal with in RPC calls.
2024-05-29 13:58:11 -07:00

165 lines
4.8 KiB
Rust

use std::{
collections::{HashMap, HashSet},
io::BufRead,
sync::{mpsc::channel, Arc, Mutex},
thread,
time::Duration,
};
use nebkor_maelstrom::{protocol::Payload, Body, Message, Node, Runner};
use rand::Rng;
fn main() {
let out = std::io::stdout();
let std_in = Arc::new(std::io::stdin());
let (tx, rx) = channel();
let input = tx.clone();
let i = thread::spawn(move || {
let g = std_in.lock();
for line in g.lines().map_while(Result::ok) {
if let Ok(msg) = serde_json::from_str::<Message>(&line) {
input.send(msg).unwrap();
}
}
});
let node = BCaster::default();
let node = Arc::new(Mutex::new(node));
let runner = Runner::new(node, out);
let runner = &runner;
let g = thread::spawn(move || loop {
let millis = rand::thread_rng().gen_range(400..=800);
thread::sleep(Duration::from_millis(millis));
let body = Body::from_type("do_gossip");
let msg = Message {
body,
..Default::default()
};
tx.send(msg).unwrap();
});
runner.run(rx);
let _ = i.join();
let _ = g.join();
}
#[derive(Clone, Default)]
struct BCaster {
store: HashSet<i64>,
gossips: HashMap<String, HashSet<i64>>,
neighbors: HashSet<String>,
}
const NUM_NEIGHBORS: i32 = 5;
impl BCaster {
fn topology(&mut self, nodes: &[String], id: &str) {
let mut nodes = nodes.to_vec();
let len = nodes.len();
nodes.sort_unstable();
let idx = nodes
.iter()
.enumerate()
.find(|n| n.1.as_str() == id)
.unwrap()
.0;
for i in 1..=NUM_NEIGHBORS {
let ni = idx as i32 - i;
let left = if ni < 0 {
let ni = -ni;
let ni = ni as usize % len;
len - ni
} else {
ni as usize
};
let right = (idx + i as usize) % len;
self.neighbors.insert(nodes[left].clone());
self.neighbors.insert(nodes[right].clone());
}
}
fn gossip(&self, runner: &Runner) {
let mut rng = rand::thread_rng();
let rng = &mut rng;
for node in self.neighbors.iter() {
let mut goss = if let Some(neighbor) = self.gossips.get(node) {
self.store.difference(neighbor).cloned().collect()
} else {
self.store.clone()
};
for v in self.store.iter().filter(|_| rng.gen_bool(0.11)) {
goss.insert(*v);
}
if !goss.is_empty() {
let goss: Vec<_> = goss.into_iter().collect();
let payload: Payload = [("goss".to_string(), goss.into())].into_iter().collect();
let body = Body::from_type("gossip").with_payload(payload);
runner.send(node, body);
}
}
}
}
impl Node for BCaster {
fn handle(&mut self, runner: &Runner, req: Message) {
let typ = req.body.typ.as_str();
let frm = req.src.as_str();
let nid = runner.node_id();
let nid = nid.as_str();
match typ {
"do_gossip" => {
self.gossip(runner);
}
"broadcast" => {
let val = req.body.payload.get("message").and_then(|v| v.as_i64());
if let Some(val) = val {
self.store.insert(val);
let body = Body::from_type("broadcast_ok");
runner.reply(&req, body);
}
}
"read" => {
let vals = self.store.iter().copied().collect::<Vec<_>>();
let payload: Payload = [("messages".to_string(), vals.into())]
.into_iter()
.collect();
let body = Body::from_type("read_ok").with_payload(payload);
runner.reply(&req, body);
}
"topology" => {
self.topology(runner.nodes(), nid);
let body = Body::from_type("topology_ok");
runner.reply(&req, body);
}
"gossip" => {
let goss = req
.body
.payload
.get("goss")
.unwrap()
.as_array()
.unwrap()
.iter()
.map(|v| v.as_i64().unwrap())
.collect::<HashSet<_>>();
self.store.extend(goss.clone());
self.gossips
.entry(frm.to_string())
.or_default()
.extend(goss);
}
_ => {
eprintln!("unknown type: {req:?}");
}
}
}
}