From 5f6312dfb81d07417bffc5b22581c53fe1ee1345 Mon Sep 17 00:00:00 2001 From: Joe Ardent Date: Fri, 7 Jun 2024 14:13:10 -0700 Subject: [PATCH] Send less data; no KV. This has good messages/op (2.3), but bad highest lag (39 seconds): {:perf {:latency-graph {:valid? true}, :rate-graph {:valid? true}, :valid? true}, :timeline {:valid? true}, :exceptions {:valid? true}, :stats {:valid? true, :count 18643, :ok-count 18635, :fail-count 0, :info-count 8, :by-f {:assign {:valid? true, :count 2258, :ok-count 2258, :fail-count 0, :info-count 0}, :crash {:valid? false, :count 8, :ok-count 0, :fail-count 0, :info-count 8}, :poll {:valid? true, :count 8537, :ok-count 8537, :fail-count 0, :info-count 0}, :send {:valid? true, :count 7840, :ok-count 7840, :fail-count 0, :info-count 0}}}, :availability {:valid? true, :ok-fraction 0.9995709}, :net {:all {:send-count 43768, :recv-count 43768, :msg-count 43768, :msgs-per-op 2.3476908}, :clients {:send-count 43768, :recv-count 43768, :msg-count 43768}, :servers {:send-count 0, :recv-count 0, :msg-count 0, :msgs-per-op 0.0}, :valid? true}, :workload {:valid? true, :worst-realtime-lag {:time 39.787998673, :process 6, :key "9", :lag 39.771318684}, :bad-error-types (), :error-types (), :info-txn-causes ()}, :valid? true} --- gg-kafka/src/main.rs | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/gg-kafka/src/main.rs b/gg-kafka/src/main.rs index 79dbd6d..0baff4e 100644 --- a/gg-kafka/src/main.rs +++ b/gg-kafka/src/main.rs @@ -103,14 +103,14 @@ impl Node for Roach { .as_object() .unwrap(); let mut payload = Payload::default(); - for req in offsets.iter() { - if let Some(offsets) = self.logs.get(req.0.as_str()) { - let start = req.1.as_u64().unwrap(); + for (key, start) in offsets.iter() { + if let Some(offsets) = self.logs.get(key.as_str()) { + let start = start.as_u64().unwrap(); let subset = offsets .range(start..) .map(|(&offset, &val)| vec![offset, val]) .collect::>(); - payload.insert(req.0.to_string(), subset.into()); + payload.insert(key.to_owned(), subset.into()); } } let payload = mk_payload(&[("msgs", payload.into())]); @@ -133,11 +133,15 @@ impl Node for Roach { runner.reply(&msg, Body::from_type("commit_offsets_ok")); } "list_committed_offsets" => { - let offsets = self - .committed_offsets - .iter() - .map(|(s, &n)| (s.clone(), >::into(n))) - .collect::(); + let keys = msg.body.payload.get("keys").unwrap().as_array().unwrap(); + let mut offsets = Payload::default(); + + for key in keys.iter() { + let key = key.as_str().unwrap(); + if let Some(&offset) = self.committed_offsets.get(key) { + offsets.insert(key.to_string(), offset.into()); + } + } let mut payload = Payload::new(); payload.insert("offsets".to_string(), offsets.into()); let body = Body::from_type("list_committed_offsets_ok").with_payload(payload);