From c53edd921af7eab779d1ef8d4e0da3d4d66ef5c2 Mon Sep 17 00:00:00 2001 From: Joe Ardent Date: Thu, 30 May 2024 14:36:05 -0700 Subject: [PATCH] deadlocks on writing to stdout --- gg-g_counter/src/main.rs | 30 +++++++++++++++++++++++++----- nebkor-maelstrom/src/kv.rs | 34 ++++++++++++++++++++++++++++------ nebkor-maelstrom/src/lib.rs | 26 ++++++++++++++------------ 3 files changed, 67 insertions(+), 23 deletions(-) diff --git a/gg-g_counter/src/main.rs b/gg-g_counter/src/main.rs index 9dbdf77..b526f55 100644 --- a/gg-g_counter/src/main.rs +++ b/gg-g_counter/src/main.rs @@ -3,12 +3,10 @@ use std::sync::{Arc, Mutex}; use nebkor_maelstrom::{mk_payload, mk_stdin, Body, Message, Node, Runner}; fn main() { - let out = std::io::stdout(); - let node = Counter; let node = Arc::new(Mutex::new(node)); - let runner = Runner::new(node, out); + let runner = Runner::new(node); let (i, _, rx) = mk_stdin(); @@ -39,10 +37,32 @@ impl Node for Counter { let typ = req.body.typ.as_str(); let frm = req.src.clone(); let msg_id = req.body.msg_id.to_owned(); - match typ { "add" => { - runner.reply(&req, Body::from_type("add_ok")); + let read_runner = runner.clone(); + let cas_runner = runner.clone(); + let add_req = req.clone(); + + let cas_handler = move |_msg: Message| { + let req = add_req; + cas_runner.reply(&req, Body::from_type("add_ok")); + }; + + let delta = req.body.payload.get("delta").unwrap().as_i64().unwrap(); + let read_handler = move |msg: Message| { + let value = msg.body.payload.get("value").unwrap().as_i64().unwrap(); + let payload = mk_payload(&[ + ("from", value.into()), + ("to", (value + delta).into()), + ("key", KEY.into()), + ]); + let body = Body::from_type("cas").with_payload(payload); + read_runner.rpc("seq-kv", body, Box::new(cas_handler)); + }; + // kick it off by calling "read" on seq-kv: + let payload = mk_payload(&[("key", KEY.into())]); + let body = Body::from_type("read").with_payload(payload); + runner.rpc("seq-kv", body, Box::new(read_handler)); } "read" => { let rn = runner.clone(); diff --git a/nebkor-maelstrom/src/kv.rs b/nebkor-maelstrom/src/kv.rs index 3fc224b..704698f 100644 --- a/nebkor-maelstrom/src/kv.rs +++ b/nebkor-maelstrom/src/kv.rs @@ -1,7 +1,9 @@ /* +use std::{rc::Rc, sync::Arc}; + use serde_json::Value; -use crate::Body; +use crate::{mk_payload, Body, Message, Result, Runner}; #[derive(Debug, Default, Clone)] pub struct Kv { @@ -9,20 +11,40 @@ pub struct Kv { } impl Kv { - pub fn new(service: &str) -> Self { + pub fn seq() -> Self { Kv { - service: service.to_string(), + service: "seq-kv".to_string(), } } - pub fn read(&self, key: &str) -> Body { + + pub fn lin() -> Self { + Kv { + service: "lin-kv".to_string(), + } + } + + pub fn lww() -> Self { + Kv { + service: "lww-kv".to_string(), + } + } + + pub fn read(&self, runner: &Runner, key: &str) -> Result { todo!() } - pub fn write(&self, key: &str, val: Value, create: bool) -> Body { + pub fn write(&self, runner: &Runner, key: &str, val: Value, create: bool) -> Result<()> { todo!() } - pub fn cas(&self, key: &str, from: Value, to: Value, create: bool) -> Body { + pub fn cas( + &self, + runner: &Runner, + key: &str, + from: Value, + to: Value, + create: bool, + ) -> Result<()> { todo!() } } diff --git a/nebkor-maelstrom/src/lib.rs b/nebkor-maelstrom/src/lib.rs index 212582b..7de0141 100644 --- a/nebkor-maelstrom/src/lib.rs +++ b/nebkor-maelstrom/src/lib.rs @@ -4,45 +4,47 @@ use std::{ sync::{ atomic::{AtomicU64, AtomicUsize, Ordering}, mpsc::{channel, Receiver, Sender}, - Arc, Mutex, OnceLock, + Arc, LazyLock, Mutex, OnceLock, }, thread::{self, JoinHandle}, }; pub mod protocol; +use protocol::ErrorCode; pub use protocol::{Body, Message, Payload}; use serde_json::Value; pub mod kv; pub type DynNode = Arc>; -pub type Handler = Box; +pub type Handler = Box; // -> Result>>; pub type OnInit = Box; +pub type Result = std::result::Result; + +static MSG_ID: AtomicU64 = AtomicU64::new(0); +static OUTPUT: LazyLock = LazyLock::new(std::io::stdout); + pub trait Node { fn handle(&mut self, runner: &Runner, msg: Message); } #[derive(Clone)] pub struct Runner { - msg_id: Arc, node: DynNode, node_id: OnceLock, nodes: OnceLock>, steps: Arc, - output: Arc>, handlers: Arc>>, } impl Runner { - pub fn new(node: DynNode, output: Stdout) -> Self { + pub fn new(node: DynNode) -> Self { Runner { node, - msg_id: Arc::new(AtomicU64::new(1)), nodes: OnceLock::new(), node_id: OnceLock::new(), steps: Arc::new(AtomicUsize::new(0)), - output: Arc::new(Mutex::new(output)), handlers: Default::default(), } } @@ -61,7 +63,7 @@ impl Runner { let irt = msg.body.in_reply_to; { let mut g = self.handlers.lock().unwrap(); - if let Some(mut h) = g.remove(&irt) { + if let Some(h) = g.remove(&irt) { h(msg.clone()); } } @@ -87,11 +89,11 @@ impl Runner { } pub fn next_msg_id(&self) -> u64 { - self.msg_id.fetch_add(1, Ordering::SeqCst) + MSG_ID.fetch_add(1, Ordering::SeqCst) } pub fn cur_msg_id(&self) -> u64 { - self.msg_id.load(Ordering::SeqCst) + MSG_ID.load(Ordering::SeqCst) } pub fn nodes(&self) -> &[String] { @@ -150,7 +152,7 @@ impl Runner { } fn writeln(&self, msg: &str) { - let mut out = self.output.lock().unwrap(); + let mut out = OUTPUT.lock(); let msg = format!("{msg}\n"); out.write_all(msg.as_bytes()).unwrap(); out.flush().unwrap(); @@ -167,7 +169,7 @@ pub fn mk_stdin() -> (JoinHandle<()>, Sender, Receiver) { let join = thread::spawn(move || { let g = stdin.lock(); - for line in g.lines().map_while(Result::ok) { + for line in g.lines().map_while(std::result::Result::ok) { if let Ok(msg) = serde_json::from_str(&line) { tx.send(msg).unwrap(); }