chatty-catties/gg-kafka/src/main.rs
2024-06-07 13:12:37 -07:00

149 lines
4.9 KiB
Rust

use std::collections::{BTreeMap, HashMap};
use nebkor_maelstrom::{mk_payload, Body, Message, Node, Payload, Runner};
type Logs = HashMap<String, BTreeMap<u64, u64>>;
fn main() {
let node = Roach::new(20);
let runner = Runner::new(node);
let on_init = Box::new(on_init);
runner.run(Some(on_init));
}
fn on_init(runner: &Runner) {
let msg = Message::from_body(Body::from_type("init_log"));
runner.get_backdoor().send(msg).unwrap();
}
#[derive(Debug, Default)]
struct OffsetState {
last_page: usize,
current_chapter: usize,
}
#[derive(Debug, Default)]
struct Roach {
// these first three get written to as messages come in
logs: Logs,
state: HashMap<String, OffsetState>,
committed_offsets: HashMap<String, u64>,
// these are initialized on init and then just read
offset_base: usize,
sub_chapters: usize, // number of nodes
pages: usize,
}
impl Roach {
pub fn new(pages: usize) -> Self {
Roach {
pages,
..Default::default()
}
}
fn next_offset(&mut self, key: &str) -> u64 {
let state = self.state.entry(key.to_string()).or_default();
if state.last_page + 1 < self.pages {
state.last_page += 1;
} else {
state.last_page = 0;
state.current_chapter += 1;
}
let cur_chap = state.current_chapter;
let offset_base = self.offset_base;
let start = (offset_base * self.pages) + (cur_chap * (self.pages * self.sub_chapters));
(start + state.last_page) as u64
}
fn init_log(&mut self, runner: &Runner) {
self.sub_chapters = runner.nodes().len();
let id = runner.node_id();
let mut nodes = runner.nodes().to_owned();
nodes.sort();
let idx = nodes
.iter()
.enumerate()
.find(|e| e.1.as_str() == id)
.unwrap()
.0;
self.offset_base = idx;
}
}
impl Node for Roach {
fn handle(&mut self, runner: &Runner, msg: Message) {
let typ = msg.typ();
match typ {
"init_log" => {
self.init_log(runner);
}
"send" => {
let key = msg.body.payload.get("key").unwrap().as_str().unwrap();
let val = msg.body.payload.get("msg").unwrap().as_u64().unwrap();
let offset = self.next_offset(key);
self.logs
.entry(key.to_string())
.or_default()
.entry(offset)
.or_insert(val);
let payload = mk_payload(&[("offset", offset.into())]);
let body = Body::from_type("send_ok").with_payload(payload);
runner.reply(&msg, body);
}
"poll" => {
let offsets = msg
.body
.payload
.get("offsets")
.unwrap()
.as_object()
.unwrap();
let mut payload = Payload::default();
for req in offsets.iter() {
if let Some(offsets) = self.logs.get(req.0.as_str()) {
let start = req.1.as_u64().unwrap();
let subset = offsets
.range(start..)
.map(|(&offset, &val)| vec![offset, val])
.collect::<Vec<_>>();
payload.insert(req.0.to_string(), subset.into());
}
}
let payload = mk_payload(&[("msgs", payload.into())]);
let body = Body::from_type("poll_ok").with_payload(payload);
runner.reply(&msg, body);
}
"commit_offsets" => {
let offsets = msg
.body
.payload
.get("offsets")
.unwrap()
.as_object()
.unwrap();
for (k, v) in offsets.iter() {
self.committed_offsets
.entry(k.to_string())
.or_insert(v.as_u64().unwrap());
}
runner.reply(&msg, Body::from_type("commit_offsets_ok"));
}
"list_committed_offsets" => {
let offsets = self
.committed_offsets
.iter()
.map(|(s, &n)| (s.clone(), <u64 as Into<serde_json::Value>>::into(n)))
.collect::<Payload>();
let mut payload = Payload::new();
payload.insert("offsets".to_string(), offsets.into());
let body = Body::from_type("list_committed_offsets_ok").with_payload(payload);
runner.reply(&msg, body);
}
_ => eprintln!("unknown Message type: {msg:?}"),
}
}
}