From a492ca56a4c5b081baeccee72b3d29a8e9d0eb39 Mon Sep 17 00:00:00 2001 From: Joe Ardent Date: Fri, 7 Jun 2024 13:12:37 -0700 Subject: [PATCH] actually implement list_committed_offsets --- gg-kafka/src/main.rs | 41 ++++++++++++++++++++++++++++++----------- 1 file changed, 30 insertions(+), 11 deletions(-) diff --git a/gg-kafka/src/main.rs b/gg-kafka/src/main.rs index 9c64b69..79dbd6d 100644 --- a/gg-kafka/src/main.rs +++ b/gg-kafka/src/main.rs @@ -17,11 +17,16 @@ fn on_init(runner: &Runner) { } #[derive(Debug, Default)] -struct Roach { - // these first four get written to as messages come in - logs: Logs, - last_pages: HashMap, +struct OffsetState { + last_page: usize, current_chapter: usize, +} + +#[derive(Debug, Default)] +struct Roach { + // these first three get written to as messages come in + logs: Logs, + state: HashMap, committed_offsets: HashMap, // these are initialized on init and then just read @@ -39,17 +44,17 @@ impl Roach { } fn next_offset(&mut self, key: &str) -> u64 { - let last_page = self.last_pages.entry(key.to_string()).or_default(); - if *last_page + 1 < self.pages as u64 { - *last_page += 1; + let state = self.state.entry(key.to_string()).or_default(); + if state.last_page + 1 < self.pages { + state.last_page += 1; } else { - *last_page = 0; - self.current_chapter += 1; + state.last_page = 0; + state.current_chapter += 1; } - let cur_chap = self.current_chapter; + let cur_chap = state.current_chapter; let offset_base = self.offset_base; let start = (offset_base * self.pages) + (cur_chap * (self.pages * self.sub_chapters)); - start as u64 + *last_page + (start + state.last_page) as u64 } fn init_log(&mut self, runner: &Runner) { @@ -113,6 +118,18 @@ impl Node for Roach { runner.reply(&msg, body); } "commit_offsets" => { + let offsets = msg + .body + .payload + .get("offsets") + .unwrap() + .as_object() + .unwrap(); + for (k, v) in offsets.iter() { + self.committed_offsets + .entry(k.to_string()) + .or_insert(v.as_u64().unwrap()); + } runner.reply(&msg, Body::from_type("commit_offsets_ok")); } "list_committed_offsets" => { @@ -123,6 +140,8 @@ impl Node for Roach { .collect::(); let mut payload = Payload::new(); payload.insert("offsets".to_string(), offsets.into()); + let body = Body::from_type("list_committed_offsets_ok").with_payload(payload); + runner.reply(&msg, body); } _ => eprintln!("unknown Message type: {msg:?}"), }