From 5222d55a6164b3357f13568f6df99db4ad319698 Mon Sep 17 00:00:00 2001 From: Joe Ardent Date: Thu, 6 Jun 2024 22:19:13 -0700 Subject: [PATCH] passes 5a --- Cargo.lock | 8 +++ Cargo.toml | 2 +- gg-kafka/Cargo.toml | 10 ++++ gg-kafka/src/main.rs | 130 +++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 149 insertions(+), 1 deletion(-) create mode 100644 gg-kafka/Cargo.toml create mode 100644 gg-kafka/src/main.rs diff --git a/Cargo.lock b/Cargo.lock index da8e96b..9de2658 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -45,6 +45,14 @@ dependencies = [ "serde_json", ] +[[package]] +name = "gg-kafka" +version = "0.0.1" +dependencies = [ + "nebkor-maelstrom", + "serde_json", +] + [[package]] name = "gg-uid" version = "0.0.1" diff --git a/Cargo.toml b/Cargo.toml index 5a26b77..1d66af7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,5 @@ [workspace] -members = ["gg-echo", "gg-uid", "gg-broadcast", "gg-g_counter"] +members = ["gg-echo", "gg-uid", "gg-broadcast", "gg-g_counter", "gg-kafka"] resolver = "2" [workspace.package] diff --git a/gg-kafka/Cargo.toml b/gg-kafka/Cargo.toml new file mode 100644 index 0000000..5597232 --- /dev/null +++ b/gg-kafka/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "gg-kafka" +edition = "2021" +version.workspace = true +authors.workspace = true +license-file.workspace = true + +[dependencies] +nebkor-maelstrom.workspace = true +serde_json.workspace = true diff --git a/gg-kafka/src/main.rs b/gg-kafka/src/main.rs new file mode 100644 index 0000000..9c64b69 --- /dev/null +++ b/gg-kafka/src/main.rs @@ -0,0 +1,130 @@ +use std::collections::{BTreeMap, HashMap}; + +use nebkor_maelstrom::{mk_payload, Body, Message, Node, Payload, Runner}; + +type Logs = HashMap>; + +fn main() { + let node = Roach::new(20); + let runner = Runner::new(node); + let on_init = Box::new(on_init); + runner.run(Some(on_init)); +} + +fn on_init(runner: &Runner) { + let msg = Message::from_body(Body::from_type("init_log")); + runner.get_backdoor().send(msg).unwrap(); +} + +#[derive(Debug, Default)] +struct Roach { + // these first four get written to as messages come in + logs: Logs, + last_pages: HashMap, + current_chapter: usize, + committed_offsets: HashMap, + + // these are initialized on init and then just read + offset_base: usize, + sub_chapters: usize, // number of nodes + pages: usize, +} + +impl Roach { + pub fn new(pages: usize) -> Self { + Roach { + pages, + ..Default::default() + } + } + + fn next_offset(&mut self, key: &str) -> u64 { + let last_page = self.last_pages.entry(key.to_string()).or_default(); + if *last_page + 1 < self.pages as u64 { + *last_page += 1; + } else { + *last_page = 0; + self.current_chapter += 1; + } + let cur_chap = self.current_chapter; + let offset_base = self.offset_base; + let start = (offset_base * self.pages) + (cur_chap * (self.pages * self.sub_chapters)); + start as u64 + *last_page + } + + fn init_log(&mut self, runner: &Runner) { + self.sub_chapters = runner.nodes().len(); + let id = runner.node_id(); + let mut nodes = runner.nodes().to_owned(); + nodes.sort(); + let idx = nodes + .iter() + .enumerate() + .find(|e| e.1.as_str() == id) + .unwrap() + .0; + self.offset_base = idx; + } +} + +impl Node for Roach { + fn handle(&mut self, runner: &Runner, msg: Message) { + let typ = msg.typ(); + + match typ { + "init_log" => { + self.init_log(runner); + } + "send" => { + let key = msg.body.payload.get("key").unwrap().as_str().unwrap(); + let val = msg.body.payload.get("msg").unwrap().as_u64().unwrap(); + let offset = self.next_offset(key); + self.logs + .entry(key.to_string()) + .or_default() + .entry(offset) + .or_insert(val); + + let payload = mk_payload(&[("offset", offset.into())]); + let body = Body::from_type("send_ok").with_payload(payload); + runner.reply(&msg, body); + } + "poll" => { + let offsets = msg + .body + .payload + .get("offsets") + .unwrap() + .as_object() + .unwrap(); + let mut payload = Payload::default(); + for req in offsets.iter() { + if let Some(offsets) = self.logs.get(req.0.as_str()) { + let start = req.1.as_u64().unwrap(); + let subset = offsets + .range(start..) + .map(|(&offset, &val)| vec![offset, val]) + .collect::>(); + payload.insert(req.0.to_string(), subset.into()); + } + } + let payload = mk_payload(&[("msgs", payload.into())]); + let body = Body::from_type("poll_ok").with_payload(payload); + runner.reply(&msg, body); + } + "commit_offsets" => { + runner.reply(&msg, Body::from_type("commit_offsets_ok")); + } + "list_committed_offsets" => { + let offsets = self + .committed_offsets + .iter() + .map(|(s, &n)| (s.clone(), >::into(n))) + .collect::(); + let mut payload = Payload::new(); + payload.insert("offsets".to_string(), offsets.into()); + } + _ => eprintln!("unknown Message type: {msg:?}"), + } + } +}