diff --git a/Cargo.lock b/Cargo.lock index 43dc217..f7b1a4a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -36,6 +36,15 @@ dependencies = [ "serde_json", ] +[[package]] +name = "gg-g_counter" +version = "0.0.1" +dependencies = [ + "nebkor-maelstrom", + "rand", + "serde_json", +] + [[package]] name = "gg-uid" version = "0.0.1" diff --git a/Cargo.toml b/Cargo.toml index d331696..bb1b15b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,5 @@ [workspace] -members = ["gg-echo", "gg-uid", "gg-broadcast", "nebkor-maelstrom"] +members = ["gg-echo", "gg-uid", "gg-broadcast", "nebkor-maelstrom", "gg-g_counter"] resolver = "2" [workspace.package] @@ -9,3 +9,5 @@ license-file = "LICENSE.md" [workspace.dependencies] serde_json = "1" +rand = "0.8" + diff --git a/gg-broadcast/Cargo.toml b/gg-broadcast/Cargo.toml index 60f714d..a713c61 100644 --- a/gg-broadcast/Cargo.toml +++ b/gg-broadcast/Cargo.toml @@ -8,4 +8,4 @@ license-file.workspace = true [dependencies] serde_json.workspace = true nebkor-maelstrom = { path = "../nebkor-maelstrom" } -rand = "0.8.5" +rand.workspace = true diff --git a/gg-g_counter/Cargo.toml b/gg-g_counter/Cargo.toml new file mode 100644 index 0000000..eb83a0d --- /dev/null +++ b/gg-g_counter/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "gg-g_counter" +edition = "2021" +version.workspace = true +authors.workspace = true +license-file.workspace = true + +[dependencies] +serde_json.workspace = true +nebkor-maelstrom = { path = "../nebkor-maelstrom" } +rand.workspace = true + diff --git a/gg-g_counter/src/main.rs b/gg-g_counter/src/main.rs new file mode 100644 index 0000000..cad704a --- /dev/null +++ b/gg-g_counter/src/main.rs @@ -0,0 +1,78 @@ +use std::{ + collections::{HashMap, HashSet}, + io::BufRead, + sync::{mpsc::channel, Arc, Mutex}, + thread, + time::Duration, +}; + +use nebkor_maelstrom::{mk_payload, mk_stdin, protocol::Payload, Body, Message, Node, Runner}; +use rand::Rng; + +fn main() { + let out = std::io::stdout(); + + let node = Counter; + let node = Arc::new(Mutex::new(node)); + + let runner = Runner::new(node, out); + let runner = &runner; + + let (i, extra_input, rx) = mk_stdin(); + let init = thread::spawn(move || { + thread::sleep(Duration::from_millis(101)); + extra_input + .send(Message { + body: Body::from_type("kv_init"), + ..Default::default() + }) + .unwrap(); + }); + + runner.run(rx); + init.join().unwrap(); + i.join().unwrap(); +} + +const KEY: &str = "COUNTER"; + +#[derive(Clone, Default)] +struct Counter; + +impl Node for Counter { + fn handle(&mut self, runner: &Runner, req: &Message) { + let typ = req.body.typ.as_str(); + let frm = req.src.as_str(); + let nid = runner.node_id(); + let nid = nid.as_str(); + let msg_id = req.body.msg_id; + + match typ { + "kv_init" => { + let mut nodes = runner.nodes().to_vec(); + nodes.sort_unstable(); + if nodes[0] == nid { + let payload = mk_payload(&[ + ("key", KEY.into()), + ("from", 0i64.into()), + ("to", 0i64.into()), + ("create_if_not_exists", true.into()), + ]); + let body = Body::from_type("cas").with_payload(payload); + runner.send("seq-kv", body); + } + } + "add" => { + let mut h = |msg: &Message| { + let irt = msg_id; + let body = Body::from_type("add_ok").with_in_reply_to(irt); + runner.send(frm, body); + }; + } + "read" => {} + _ => { + eprintln!("unknown type: {req:?}"); + } + } + } +} diff --git a/nebkor-maelstrom/src/kv.rs b/nebkor-maelstrom/src/kv.rs new file mode 100644 index 0000000..10296aa --- /dev/null +++ b/nebkor-maelstrom/src/kv.rs @@ -0,0 +1,28 @@ +use serde::{Deserialize, Serialize}; +use serde_json::Value; + +use crate::Body; + +#[derive(Debug, Default, Clone)] +pub struct Kv { + pub service: String, +} + +impl Kv { + pub fn new(service: &str) -> Self { + Kv { + service: service.to_string(), + } + } + pub fn read(&self, key: &str) -> Body { + todo!() + } + + pub fn write(&self, key: &str, val: Value, create: bool) -> Body { + todo!() + } + + pub fn cas(&self, val: Value) -> Body { + todo!() + } +} diff --git a/nebkor-maelstrom/src/lib.rs b/nebkor-maelstrom/src/lib.rs index f6c929a..f7df692 100644 --- a/nebkor-maelstrom/src/lib.rs +++ b/nebkor-maelstrom/src/lib.rs @@ -1,15 +1,23 @@ use std::{ - io::{Stdout, Write}, + any::Any, + collections::HashMap, + io::{BufRead, Stdout, Write}, sync::{ atomic::{AtomicU64, AtomicUsize, Ordering}, + mpsc::{channel, Receiver, Sender}, Arc, Mutex, OnceLock, }, + thread::{self, JoinHandle}, }; pub mod protocol; -pub use protocol::{Body, Message}; +pub use protocol::{Body, Message, Payload}; +use serde_json::Value; + +pub mod kv; pub type DynNode = Arc>; +pub type Handler = Box; pub trait Node { fn handle(&mut self, runner: &Runner, msg: &Message); @@ -22,6 +30,7 @@ pub struct Runner { nodes: OnceLock>, steps: AtomicUsize, output: Arc>, + handlers: Arc>>, } impl Runner { @@ -33,24 +42,42 @@ impl Runner { node_id: OnceLock::new(), steps: AtomicUsize::new(0), output: Arc::new(Mutex::new(output)), + handlers: Default::default(), } } - pub fn run(&self, input: std::sync::mpsc::Receiver) { + pub fn run(&self, input: Receiver) { for msg in input.iter() { - self.steps.fetch_add(1, Ordering::SeqCst); let typ = &msg.body.typ; if let "init" = typ.as_str() { self.init(&msg); let body = Body::from_type("init_ok"); self.reply(&msg, body); } else { + let irt = msg.body.in_reply_to; + { + let mut g = self.handlers.lock().unwrap(); + if let Some(mut h) = g.remove(&irt) { + h(&msg); + } + } let mut n = self.node.lock().unwrap(); n.handle(self, &msg); } } } + pub fn rpc(&self, dest: &str, body: Body, handler: Handler) { + let mut body = body; + let msg_id = self.next_msg_id(); + body.msg_id = msg_id; + { + let mut g = self.handlers.lock().unwrap(); + g.insert(msg_id, handler); + } + self.send(dest, body); + } + pub fn node_id(&self) -> String { self.node_id.get().cloned().unwrap_or_default() } @@ -125,3 +152,30 @@ impl Runner { out.flush().unwrap(); } } + +/// Feeds lines from stdin to the MPSC Sender, so that the Receiver can be used +/// in the Runner::run() method. Clone the Sender if you want to inject messages +/// into the Runner. Join the handle after `run()`. +pub fn mk_stdin() -> (JoinHandle<()>, Sender, Receiver) { + let stdin = std::io::stdin(); + let (tx, rx) = channel(); + let xtra_input = tx.clone(); + + let join = thread::spawn(move || { + let g = stdin.lock(); + for line in g.lines().map_while(Result::ok) { + if let Ok(msg) = serde_json::from_str(&line) { + tx.send(msg).unwrap(); + } + } + }); + + (join, xtra_input, rx) +} + +pub fn mk_payload(payload: &[(&str, Value)]) -> Payload { + payload + .iter() + .map(|p| (p.0.to_string(), p.1.clone())) + .collect() +}