diff --git a/gg-kafka/src/main.rs b/gg-kafka/src/main.rs index 79dbd6d..0baff4e 100644 --- a/gg-kafka/src/main.rs +++ b/gg-kafka/src/main.rs @@ -103,14 +103,14 @@ impl Node for Roach { .as_object() .unwrap(); let mut payload = Payload::default(); - for req in offsets.iter() { - if let Some(offsets) = self.logs.get(req.0.as_str()) { - let start = req.1.as_u64().unwrap(); + for (key, start) in offsets.iter() { + if let Some(offsets) = self.logs.get(key.as_str()) { + let start = start.as_u64().unwrap(); let subset = offsets .range(start..) .map(|(&offset, &val)| vec![offset, val]) .collect::>(); - payload.insert(req.0.to_string(), subset.into()); + payload.insert(key.to_owned(), subset.into()); } } let payload = mk_payload(&[("msgs", payload.into())]); @@ -133,11 +133,15 @@ impl Node for Roach { runner.reply(&msg, Body::from_type("commit_offsets_ok")); } "list_committed_offsets" => { - let offsets = self - .committed_offsets - .iter() - .map(|(s, &n)| (s.clone(), >::into(n))) - .collect::(); + let keys = msg.body.payload.get("keys").unwrap().as_array().unwrap(); + let mut offsets = Payload::default(); + + for key in keys.iter() { + let key = key.as_str().unwrap(); + if let Some(&offset) = self.committed_offsets.get(key) { + offsets.insert(key.to_string(), offset.into()); + } + } let mut payload = Payload::new(); payload.insert("offsets".to_string(), offsets.into()); let body = Body::from_type("list_committed_offsets_ok").with_payload(payload);