passes 5a

This commit is contained in:
Joe Ardent 2024-06-06 22:19:13 -07:00
parent 8ada835f81
commit 5222d55a61
4 changed files with 149 additions and 1 deletions

8
Cargo.lock generated
View file

@ -45,6 +45,14 @@ dependencies = [
"serde_json",
]
[[package]]
name = "gg-kafka"
version = "0.0.1"
dependencies = [
"nebkor-maelstrom",
"serde_json",
]
[[package]]
name = "gg-uid"
version = "0.0.1"

View file

@ -1,5 +1,5 @@
[workspace]
members = ["gg-echo", "gg-uid", "gg-broadcast", "gg-g_counter"]
members = ["gg-echo", "gg-uid", "gg-broadcast", "gg-g_counter", "gg-kafka"]
resolver = "2"
[workspace.package]

10
gg-kafka/Cargo.toml Normal file
View file

@ -0,0 +1,10 @@
[package]
name = "gg-kafka"
edition = "2021"
version.workspace = true
authors.workspace = true
license-file.workspace = true
[dependencies]
nebkor-maelstrom.workspace = true
serde_json.workspace = true

130
gg-kafka/src/main.rs Normal file
View file

@ -0,0 +1,130 @@
use std::collections::{BTreeMap, HashMap};
use nebkor_maelstrom::{mk_payload, Body, Message, Node, Payload, Runner};
type Logs = HashMap<String, BTreeMap<u64, u64>>;
fn main() {
let node = Roach::new(20);
let runner = Runner::new(node);
let on_init = Box::new(on_init);
runner.run(Some(on_init));
}
fn on_init(runner: &Runner) {
let msg = Message::from_body(Body::from_type("init_log"));
runner.get_backdoor().send(msg).unwrap();
}
#[derive(Debug, Default)]
struct Roach {
// these first four get written to as messages come in
logs: Logs,
last_pages: HashMap<String, u64>,
current_chapter: usize,
committed_offsets: HashMap<String, u64>,
// these are initialized on init and then just read
offset_base: usize,
sub_chapters: usize, // number of nodes
pages: usize,
}
impl Roach {
pub fn new(pages: usize) -> Self {
Roach {
pages,
..Default::default()
}
}
fn next_offset(&mut self, key: &str) -> u64 {
let last_page = self.last_pages.entry(key.to_string()).or_default();
if *last_page + 1 < self.pages as u64 {
*last_page += 1;
} else {
*last_page = 0;
self.current_chapter += 1;
}
let cur_chap = self.current_chapter;
let offset_base = self.offset_base;
let start = (offset_base * self.pages) + (cur_chap * (self.pages * self.sub_chapters));
start as u64 + *last_page
}
fn init_log(&mut self, runner: &Runner) {
self.sub_chapters = runner.nodes().len();
let id = runner.node_id();
let mut nodes = runner.nodes().to_owned();
nodes.sort();
let idx = nodes
.iter()
.enumerate()
.find(|e| e.1.as_str() == id)
.unwrap()
.0;
self.offset_base = idx;
}
}
impl Node for Roach {
fn handle(&mut self, runner: &Runner, msg: Message) {
let typ = msg.typ();
match typ {
"init_log" => {
self.init_log(runner);
}
"send" => {
let key = msg.body.payload.get("key").unwrap().as_str().unwrap();
let val = msg.body.payload.get("msg").unwrap().as_u64().unwrap();
let offset = self.next_offset(key);
self.logs
.entry(key.to_string())
.or_default()
.entry(offset)
.or_insert(val);
let payload = mk_payload(&[("offset", offset.into())]);
let body = Body::from_type("send_ok").with_payload(payload);
runner.reply(&msg, body);
}
"poll" => {
let offsets = msg
.body
.payload
.get("offsets")
.unwrap()
.as_object()
.unwrap();
let mut payload = Payload::default();
for req in offsets.iter() {
if let Some(offsets) = self.logs.get(req.0.as_str()) {
let start = req.1.as_u64().unwrap();
let subset = offsets
.range(start..)
.map(|(&offset, &val)| vec![offset, val])
.collect::<Vec<_>>();
payload.insert(req.0.to_string(), subset.into());
}
}
let payload = mk_payload(&[("msgs", payload.into())]);
let body = Body::from_type("poll_ok").with_payload(payload);
runner.reply(&msg, body);
}
"commit_offsets" => {
runner.reply(&msg, Body::from_type("commit_offsets_ok"));
}
"list_committed_offsets" => {
let offsets = self
.committed_offsets
.iter()
.map(|(s, &n)| (s.clone(), <u64 as Into<serde_json::Value>>::into(n)))
.collect::<Payload>();
let mut payload = Payload::new();
payload.insert("offsets".to_string(), offsets.into());
}
_ => eprintln!("unknown Message type: {msg:?}"),
}
}
}