Send less data; no KV.
This has good messages/op (2.3), but bad highest lag (39 seconds): {:perf {:latency-graph {:valid? true}, :rate-graph {:valid? true}, :valid? true}, :timeline {:valid? true}, :exceptions {:valid? true}, :stats {:valid? true, :count 18643, :ok-count 18635, :fail-count 0, :info-count 8, :by-f {:assign {:valid? true, :count 2258, :ok-count 2258, :fail-count 0, :info-count 0}, :crash {:valid? false, :count 8, :ok-count 0, :fail-count 0, :info-count 8}, :poll {:valid? true, :count 8537, :ok-count 8537, :fail-count 0, :info-count 0}, :send {:valid? true, :count 7840, :ok-count 7840, :fail-count 0, :info-count 0}}}, :availability {:valid? true, :ok-fraction 0.9995709}, :net {:all {:send-count 43768, :recv-count 43768, :msg-count 43768, :msgs-per-op 2.3476908}, :clients {:send-count 43768, :recv-count 43768, :msg-count 43768}, :servers {:send-count 0, :recv-count 0, :msg-count 0, :msgs-per-op 0.0}, :valid? true}, :workload {:valid? true, :worst-realtime-lag {:time 39.787998673, :process 6, :key "9", :lag 39.771318684}, :bad-error-types (), :error-types (), :info-txn-causes ()}, :valid? true}
This commit is contained in:
parent
a492ca56a4
commit
5f6312dfb8
1 changed files with 13 additions and 9 deletions
|
@ -103,14 +103,14 @@ impl Node for Roach {
|
|||
.as_object()
|
||||
.unwrap();
|
||||
let mut payload = Payload::default();
|
||||
for req in offsets.iter() {
|
||||
if let Some(offsets) = self.logs.get(req.0.as_str()) {
|
||||
let start = req.1.as_u64().unwrap();
|
||||
for (key, start) in offsets.iter() {
|
||||
if let Some(offsets) = self.logs.get(key.as_str()) {
|
||||
let start = start.as_u64().unwrap();
|
||||
let subset = offsets
|
||||
.range(start..)
|
||||
.map(|(&offset, &val)| vec![offset, val])
|
||||
.collect::<Vec<_>>();
|
||||
payload.insert(req.0.to_string(), subset.into());
|
||||
payload.insert(key.to_owned(), subset.into());
|
||||
}
|
||||
}
|
||||
let payload = mk_payload(&[("msgs", payload.into())]);
|
||||
|
@ -133,11 +133,15 @@ impl Node for Roach {
|
|||
runner.reply(&msg, Body::from_type("commit_offsets_ok"));
|
||||
}
|
||||
"list_committed_offsets" => {
|
||||
let offsets = self
|
||||
.committed_offsets
|
||||
.iter()
|
||||
.map(|(s, &n)| (s.clone(), <u64 as Into<serde_json::Value>>::into(n)))
|
||||
.collect::<Payload>();
|
||||
let keys = msg.body.payload.get("keys").unwrap().as_array().unwrap();
|
||||
let mut offsets = Payload::default();
|
||||
|
||||
for key in keys.iter() {
|
||||
let key = key.as_str().unwrap();
|
||||
if let Some(&offset) = self.committed_offsets.get(key) {
|
||||
offsets.insert(key.to_string(), offset.into());
|
||||
}
|
||||
}
|
||||
let mut payload = Payload::new();
|
||||
payload.insert("offsets".to_string(), offsets.into());
|
||||
let body = Body::from_type("list_committed_offsets_ok").with_payload(payload);
|
||||
|
|
Loading…
Reference in a new issue