use lin-kv and gossip to reduce worst-case lag

This commit is contained in:
Joe Ardent 2024-06-07 16:29:14 -07:00
parent 5f6312dfb8
commit 745206b29b

View file

@ -1,75 +1,42 @@
use std::collections::{BTreeMap, HashMap}; use std::collections::{BTreeMap, HashMap};
use nebkor_maelstrom::{mk_payload, Body, Message, Node, Payload, Runner}; use nebkor_maelstrom::{kv::Kv, mk_payload, Body, Message, Node, Payload, Runner};
type Logs = HashMap<String, BTreeMap<u64, u64>>; type Logs = HashMap<String, BTreeMap<u64, u64>>;
fn main() { fn main() {
let node = Roach::new(20); let node = Roach::default();
let runner = Runner::new(node); let runner = Runner::new(node);
let on_init = Box::new(on_init); let on_init = Box::new(on_init);
runner.run(Some(on_init)); runner.run(Some(on_init));
} }
fn on_init(runner: &Runner) { fn on_init(runner: &Runner) {
let msg = Message::from_body(Body::from_type("init_log")); let msg = Message::from_body(Body::from_type("do_init"));
runner.get_backdoor().send(msg).unwrap(); runner.get_backdoor().send(msg).unwrap();
} }
#[derive(Debug, Default)]
struct OffsetState {
last_page: usize,
current_chapter: usize,
}
#[derive(Debug, Default)] #[derive(Debug, Default)]
struct Roach { struct Roach {
// these first three get written to as messages come in
logs: Logs, logs: Logs,
state: HashMap<String, OffsetState>,
committed_offsets: HashMap<String, u64>, committed_offsets: HashMap<String, u64>,
last_offsets: HashMap<String, u64>,
// these are initialized on init and then just read neighbors: Vec<String>,
offset_base: usize,
sub_chapters: usize, // number of nodes
pages: usize,
} }
impl Roach { impl Roach {
pub fn new(pages: usize) -> Self { fn gossip(&mut self, runner: &Runner, key: &str, val: u64) {
Roach { let offset = self.last_offsets.get(key).unwrap();
pages, for neighbor in self.neighbors.iter() {
..Default::default() let payload = mk_payload(&[
("key", key.into()),
("value", val.into()),
("offset", (*offset).into()),
]);
let body = Body::from_type("gossip").with_payload(payload);
runner.send(neighbor, body);
} }
} }
fn next_offset(&mut self, key: &str) -> u64 {
let state = self.state.entry(key.to_string()).or_default();
if state.last_page + 1 < self.pages {
state.last_page += 1;
} else {
state.last_page = 0;
state.current_chapter += 1;
}
let cur_chap = state.current_chapter;
let offset_base = self.offset_base;
let start = (offset_base * self.pages) + (cur_chap * (self.pages * self.sub_chapters));
(start + state.last_page) as u64
}
fn init_log(&mut self, runner: &Runner) {
self.sub_chapters = runner.nodes().len();
let id = runner.node_id();
let mut nodes = runner.nodes().to_owned();
nodes.sort();
let idx = nodes
.iter()
.enumerate()
.find(|e| e.1.as_str() == id)
.unwrap()
.0;
self.offset_base = idx;
}
} }
impl Node for Roach { impl Node for Roach {
@ -77,21 +44,59 @@ impl Node for Roach {
let typ = msg.typ(); let typ = msg.typ();
match typ { match typ {
"init_log" => { "do_init" => {
self.init_log(runner); let id = runner.node_id();
let nodes = runner
.nodes()
.iter()
.filter(|n| n.as_str() != id)
.cloned()
.collect();
self.neighbors = nodes;
}
"gossip" => {
let offset = msg.body.payload.get("offset").unwrap().as_u64().unwrap();
let key = msg.body.payload.get("key").unwrap().as_str().unwrap();
let val = msg.body.payload.get("value").unwrap().as_u64().unwrap();
self.logs
.entry(key.to_owned())
.or_default()
.entry(offset)
.or_insert(val);
let last = self.last_offsets.entry(key.to_string()).or_default();
*last = offset.max(*last);
} }
"send" => { "send" => {
let key = msg.body.payload.get("key").unwrap().as_str().unwrap(); let key = msg.body.payload.get("key").unwrap().as_str().unwrap();
let val = msg.body.payload.get("msg").unwrap().as_u64().unwrap(); let val = msg.body.payload.get("msg").unwrap().as_u64().unwrap();
let offset = self.next_offset(key); let kv = Kv::lin();
let offset = self.last_offsets.entry(key.to_string()).or_default();
let mut cur = *offset;
let mut new = cur + 1;
loop {
match kv.cas(runner, key, cur.into(), new.into(), true) {
Ok(_) => {
*offset = new;
break;
}
_ => {
cur = kv.read(runner, key).unwrap().unwrap().as_u64().unwrap();
new = cur + 1;
}
}
}
self.logs self.logs
.entry(key.to_string()) .entry(key.to_string())
.or_default() .or_default()
.entry(offset) .entry(*offset)
.or_insert(val); .or_insert(val);
let payload = mk_payload(&[("offset", offset.into())]); let payload = mk_payload(&[("offset", (*offset).into())]);
let body = Body::from_type("send_ok").with_payload(payload); let body = Body::from_type("send_ok").with_payload(payload);
self.gossip(runner, key, val);
runner.reply(&msg, body); runner.reply(&msg, body);
} }
"poll" => { "poll" => {