non-working gg-counter, empty kv impl

This commit is contained in:
Joe Ardent 2024-05-28 13:35:32 -07:00
parent 6ed80d17fb
commit 9e1d05983d
7 changed files with 189 additions and 6 deletions

9
Cargo.lock generated
View file

@ -36,6 +36,15 @@ dependencies = [
"serde_json", "serde_json",
] ]
[[package]]
name = "gg-g_counter"
version = "0.0.1"
dependencies = [
"nebkor-maelstrom",
"rand",
"serde_json",
]
[[package]] [[package]]
name = "gg-uid" name = "gg-uid"
version = "0.0.1" version = "0.0.1"

View file

@ -1,5 +1,5 @@
[workspace] [workspace]
members = ["gg-echo", "gg-uid", "gg-broadcast", "nebkor-maelstrom"] members = ["gg-echo", "gg-uid", "gg-broadcast", "nebkor-maelstrom", "gg-g_counter"]
resolver = "2" resolver = "2"
[workspace.package] [workspace.package]
@ -9,3 +9,5 @@ license-file = "LICENSE.md"
[workspace.dependencies] [workspace.dependencies]
serde_json = "1" serde_json = "1"
rand = "0.8"

View file

@ -8,4 +8,4 @@ license-file.workspace = true
[dependencies] [dependencies]
serde_json.workspace = true serde_json.workspace = true
nebkor-maelstrom = { path = "../nebkor-maelstrom" } nebkor-maelstrom = { path = "../nebkor-maelstrom" }
rand = "0.8.5" rand.workspace = true

12
gg-g_counter/Cargo.toml Normal file
View file

@ -0,0 +1,12 @@
[package]
name = "gg-g_counter"
edition = "2021"
version.workspace = true
authors.workspace = true
license-file.workspace = true
[dependencies]
serde_json.workspace = true
nebkor-maelstrom = { path = "../nebkor-maelstrom" }
rand.workspace = true

78
gg-g_counter/src/main.rs Normal file
View file

@ -0,0 +1,78 @@
use std::{
collections::{HashMap, HashSet},
io::BufRead,
sync::{mpsc::channel, Arc, Mutex},
thread,
time::Duration,
};
use nebkor_maelstrom::{mk_payload, mk_stdin, protocol::Payload, Body, Message, Node, Runner};
use rand::Rng;
fn main() {
let out = std::io::stdout();
let node = Counter;
let node = Arc::new(Mutex::new(node));
let runner = Runner::new(node, out);
let runner = &runner;
let (i, extra_input, rx) = mk_stdin();
let init = thread::spawn(move || {
thread::sleep(Duration::from_millis(101));
extra_input
.send(Message {
body: Body::from_type("kv_init"),
..Default::default()
})
.unwrap();
});
runner.run(rx);
init.join().unwrap();
i.join().unwrap();
}
const KEY: &str = "COUNTER";
#[derive(Clone, Default)]
struct Counter;
impl Node for Counter {
fn handle(&mut self, runner: &Runner, req: &Message) {
let typ = req.body.typ.as_str();
let frm = req.src.as_str();
let nid = runner.node_id();
let nid = nid.as_str();
let msg_id = req.body.msg_id;
match typ {
"kv_init" => {
let mut nodes = runner.nodes().to_vec();
nodes.sort_unstable();
if nodes[0] == nid {
let payload = mk_payload(&[
("key", KEY.into()),
("from", 0i64.into()),
("to", 0i64.into()),
("create_if_not_exists", true.into()),
]);
let body = Body::from_type("cas").with_payload(payload);
runner.send("seq-kv", body);
}
}
"add" => {
let mut h = |msg: &Message| {
let irt = msg_id;
let body = Body::from_type("add_ok").with_in_reply_to(irt);
runner.send(frm, body);
};
}
"read" => {}
_ => {
eprintln!("unknown type: {req:?}");
}
}
}
}

View file

@ -0,0 +1,28 @@
use serde::{Deserialize, Serialize};
use serde_json::Value;
use crate::Body;
#[derive(Debug, Default, Clone)]
pub struct Kv {
pub service: String,
}
impl Kv {
pub fn new(service: &str) -> Self {
Kv {
service: service.to_string(),
}
}
pub fn read(&self, key: &str) -> Body {
todo!()
}
pub fn write(&self, key: &str, val: Value, create: bool) -> Body {
todo!()
}
pub fn cas(&self, val: Value) -> Body {
todo!()
}
}

View file

@ -1,15 +1,23 @@
use std::{ use std::{
io::{Stdout, Write}, any::Any,
collections::HashMap,
io::{BufRead, Stdout, Write},
sync::{ sync::{
atomic::{AtomicU64, AtomicUsize, Ordering}, atomic::{AtomicU64, AtomicUsize, Ordering},
mpsc::{channel, Receiver, Sender},
Arc, Mutex, OnceLock, Arc, Mutex, OnceLock,
}, },
thread::{self, JoinHandle},
}; };
pub mod protocol; pub mod protocol;
pub use protocol::{Body, Message}; pub use protocol::{Body, Message, Payload};
use serde_json::Value;
pub mod kv;
pub type DynNode = Arc<Mutex<dyn Node>>; pub type DynNode = Arc<Mutex<dyn Node>>;
pub type Handler = Box<dyn FnMut(&Message)>;
pub trait Node { pub trait Node {
fn handle(&mut self, runner: &Runner, msg: &Message); fn handle(&mut self, runner: &Runner, msg: &Message);
@ -22,6 +30,7 @@ pub struct Runner {
nodes: OnceLock<Vec<String>>, nodes: OnceLock<Vec<String>>,
steps: AtomicUsize, steps: AtomicUsize,
output: Arc<Mutex<Stdout>>, output: Arc<Mutex<Stdout>>,
handlers: Arc<Mutex<HashMap<u64, Handler>>>,
} }
impl Runner { impl Runner {
@ -33,24 +42,42 @@ impl Runner {
node_id: OnceLock::new(), node_id: OnceLock::new(),
steps: AtomicUsize::new(0), steps: AtomicUsize::new(0),
output: Arc::new(Mutex::new(output)), output: Arc::new(Mutex::new(output)),
handlers: Default::default(),
} }
} }
pub fn run(&self, input: std::sync::mpsc::Receiver<Message>) { pub fn run(&self, input: Receiver<Message>) {
for msg in input.iter() { for msg in input.iter() {
self.steps.fetch_add(1, Ordering::SeqCst);
let typ = &msg.body.typ; let typ = &msg.body.typ;
if let "init" = typ.as_str() { if let "init" = typ.as_str() {
self.init(&msg); self.init(&msg);
let body = Body::from_type("init_ok"); let body = Body::from_type("init_ok");
self.reply(&msg, body); self.reply(&msg, body);
} else { } else {
let irt = msg.body.in_reply_to;
{
let mut g = self.handlers.lock().unwrap();
if let Some(mut h) = g.remove(&irt) {
h(&msg);
}
}
let mut n = self.node.lock().unwrap(); let mut n = self.node.lock().unwrap();
n.handle(self, &msg); n.handle(self, &msg);
} }
} }
} }
pub fn rpc(&self, dest: &str, body: Body, handler: Handler) {
let mut body = body;
let msg_id = self.next_msg_id();
body.msg_id = msg_id;
{
let mut g = self.handlers.lock().unwrap();
g.insert(msg_id, handler);
}
self.send(dest, body);
}
pub fn node_id(&self) -> String { pub fn node_id(&self) -> String {
self.node_id.get().cloned().unwrap_or_default() self.node_id.get().cloned().unwrap_or_default()
} }
@ -125,3 +152,30 @@ impl Runner {
out.flush().unwrap(); out.flush().unwrap();
} }
} }
/// Feeds lines from stdin to the MPSC Sender, so that the Receiver can be used
/// in the Runner::run() method. Clone the Sender if you want to inject messages
/// into the Runner. Join the handle after `run()`.
pub fn mk_stdin() -> (JoinHandle<()>, Sender<Message>, Receiver<Message>) {
let stdin = std::io::stdin();
let (tx, rx) = channel();
let xtra_input = tx.clone();
let join = thread::spawn(move || {
let g = stdin.lock();
for line in g.lines().map_while(Result::ok) {
if let Ok(msg) = serde_json::from_str(&line) {
tx.send(msg).unwrap();
}
}
});
(join, xtra_input, rx)
}
pub fn mk_payload(payload: &[(&str, Value)]) -> Payload {
payload
.iter()
.map(|p| (p.0.to_string(), p.1.clone()))
.collect()
}