diff --git a/gg-kafka/src/main.rs b/gg-kafka/src/main.rs index 0baff4e..456eb7a 100644 --- a/gg-kafka/src/main.rs +++ b/gg-kafka/src/main.rs @@ -1,75 +1,42 @@ use std::collections::{BTreeMap, HashMap}; -use nebkor_maelstrom::{mk_payload, Body, Message, Node, Payload, Runner}; +use nebkor_maelstrom::{kv::Kv, mk_payload, Body, Message, Node, Payload, Runner}; type Logs = HashMap>; fn main() { - let node = Roach::new(20); + let node = Roach::default(); let runner = Runner::new(node); let on_init = Box::new(on_init); runner.run(Some(on_init)); } fn on_init(runner: &Runner) { - let msg = Message::from_body(Body::from_type("init_log")); + let msg = Message::from_body(Body::from_type("do_init")); runner.get_backdoor().send(msg).unwrap(); } -#[derive(Debug, Default)] -struct OffsetState { - last_page: usize, - current_chapter: usize, -} - #[derive(Debug, Default)] struct Roach { - // these first three get written to as messages come in logs: Logs, - state: HashMap, committed_offsets: HashMap, - - // these are initialized on init and then just read - offset_base: usize, - sub_chapters: usize, // number of nodes - pages: usize, + last_offsets: HashMap, + neighbors: Vec, } impl Roach { - pub fn new(pages: usize) -> Self { - Roach { - pages, - ..Default::default() + fn gossip(&mut self, runner: &Runner, key: &str, val: u64) { + let offset = self.last_offsets.get(key).unwrap(); + for neighbor in self.neighbors.iter() { + let payload = mk_payload(&[ + ("key", key.into()), + ("value", val.into()), + ("offset", (*offset).into()), + ]); + let body = Body::from_type("gossip").with_payload(payload); + runner.send(neighbor, body); } } - - fn next_offset(&mut self, key: &str) -> u64 { - let state = self.state.entry(key.to_string()).or_default(); - if state.last_page + 1 < self.pages { - state.last_page += 1; - } else { - state.last_page = 0; - state.current_chapter += 1; - } - let cur_chap = state.current_chapter; - let offset_base = self.offset_base; - let start = (offset_base * self.pages) + (cur_chap * (self.pages * self.sub_chapters)); - (start + state.last_page) as u64 - } - - fn init_log(&mut self, runner: &Runner) { - self.sub_chapters = runner.nodes().len(); - let id = runner.node_id(); - let mut nodes = runner.nodes().to_owned(); - nodes.sort(); - let idx = nodes - .iter() - .enumerate() - .find(|e| e.1.as_str() == id) - .unwrap() - .0; - self.offset_base = idx; - } } impl Node for Roach { @@ -77,21 +44,59 @@ impl Node for Roach { let typ = msg.typ(); match typ { - "init_log" => { - self.init_log(runner); + "do_init" => { + let id = runner.node_id(); + let nodes = runner + .nodes() + .iter() + .filter(|n| n.as_str() != id) + .cloned() + .collect(); + self.neighbors = nodes; + } + "gossip" => { + let offset = msg.body.payload.get("offset").unwrap().as_u64().unwrap(); + let key = msg.body.payload.get("key").unwrap().as_str().unwrap(); + let val = msg.body.payload.get("value").unwrap().as_u64().unwrap(); + self.logs + .entry(key.to_owned()) + .or_default() + .entry(offset) + .or_insert(val); + let last = self.last_offsets.entry(key.to_string()).or_default(); + *last = offset.max(*last); } "send" => { let key = msg.body.payload.get("key").unwrap().as_str().unwrap(); let val = msg.body.payload.get("msg").unwrap().as_u64().unwrap(); - let offset = self.next_offset(key); + let kv = Kv::lin(); + let offset = self.last_offsets.entry(key.to_string()).or_default(); + let mut cur = *offset; + let mut new = cur + 1; + + loop { + match kv.cas(runner, key, cur.into(), new.into(), true) { + Ok(_) => { + *offset = new; + break; + } + _ => { + cur = kv.read(runner, key).unwrap().unwrap().as_u64().unwrap(); + new = cur + 1; + } + } + } + self.logs .entry(key.to_string()) .or_default() - .entry(offset) + .entry(*offset) .or_insert(val); - let payload = mk_payload(&[("offset", offset.into())]); + let payload = mk_payload(&[("offset", (*offset).into())]); let body = Body::from_type("send_ok").with_payload(payload); + + self.gossip(runner, key, val); runner.reply(&msg, body); } "poll" => {