Compare commits

...

3 commits

Author SHA1 Message Date
Joe Ardent
e6d76ba37c still deadlocking 2024-05-30 21:52:52 -07:00
Joe Ardent
c53edd921a deadlocks on writing to stdout 2024-05-30 14:36:05 -07:00
Joe Ardent
8638ffe67f add support for callback on init in the runner. 2024-05-29 14:46:09 -07:00
6 changed files with 94 additions and 53 deletions

View file

@ -42,7 +42,7 @@ fn main() {
tx.send(msg).unwrap(); tx.send(msg).unwrap();
}); });
runner.run(rx); runner.run(rx, None);
let _ = i.join(); let _ = i.join();
let _ = g.join(); let _ = g.join();
} }

View file

@ -39,6 +39,6 @@ fn main() {
let runner = Runner::new(node, out); let runner = Runner::new(node, out);
runner.run(rx); runner.run(rx, None);
i.join().unwrap(); i.join().unwrap();
} }

View file

@ -1,32 +1,32 @@
use std::{ use std::{
rc::Rc,
sync::{Arc, Mutex}, sync::{Arc, Mutex},
thread,
time::Duration,
}; };
use nebkor_maelstrom::{mk_payload, mk_stdin, Body, Message, Node, Runner}; use nebkor_maelstrom::{mk_payload, mk_stdin, Body, Message, Node, Runner};
fn main() { fn main() {
let out = std::io::stdout();
let node = Counter; let node = Counter;
let node = Arc::new(Mutex::new(node)); let node = Arc::new(Mutex::new(node));
let runner = Runner::new(node, out); let runner = Rc::new(Runner::new(node));
let (i, extra_input, rx) = mk_stdin(); let (i, _, rx) = mk_stdin();
let init = thread::spawn(move || {
thread::sleep(Duration::from_millis(10));
extra_input
.send(Message {
body: Body::from_type("kv_init"),
..Default::default()
})
.unwrap();
});
runner.run(rx); let on_init = |rnr: &Runner| {
init.join().unwrap(); let payload = mk_payload(&[
("key", KEY.into()),
("from", 0i64.into()),
("to", 0i64.into()),
("create_if_not_exists", true.into()),
]);
let body = Body::from_type("cas").with_payload(payload);
rnr.send("seq-kv", body);
};
let on_init = Box::new(on_init);
runner.run(rx, Some(on_init));
i.join().unwrap(); i.join().unwrap();
} }
@ -36,24 +36,36 @@ const KEY: &str = "COUNTER";
struct Counter; struct Counter;
impl Node for Counter { impl Node for Counter {
fn handle<'slf>(&'slf mut self, runner: &'slf Runner, req: Message) { fn handle<'slf>(&'slf mut self, runner: &'slf Rc<Runner>, req: Message) {
let typ = req.body.typ.as_str(); let typ = req.body.typ.as_str();
let frm = req.src.clone(); let frm = req.src.clone();
let msg_id = req.body.msg_id.to_owned(); let msg_id = req.body.msg_id.to_owned();
match typ { match typ {
"kv_init" => {
let payload = mk_payload(&[
("key", KEY.into()),
("from", 0i64.into()),
("to", 0i64.into()),
("create_if_not_exists", true.into()),
]);
let body = Body::from_type("cas").with_payload(payload);
runner.send("seq-kv", body);
}
"add" => { "add" => {
runner.reply(&req, Body::from_type("add_ok")); let read_runner = runner.clone();
let cas_runner = runner.clone();
let add_req = req.clone();
let cas_handler = move |_msg: Message| {
let req = add_req;
cas_runner.reply(&req, Body::from_type("add_ok"));
};
let delta = req.body.payload.get("delta").unwrap().as_i64().unwrap();
let read_handler = move |msg: Message| {
let value = msg.body.payload.get("value").unwrap().as_i64().unwrap();
let payload = mk_payload(&[
("from", value.into()),
("to", (value + delta).into()),
("key", KEY.into()),
]);
let body = Body::from_type("cas").with_payload(payload);
read_runner.rpc("seq-kv", body, Box::new(cas_handler));
};
// kick it off by calling "read" on seq-kv:
let payload = mk_payload(&[("key", KEY.into())]);
let body = Body::from_type("read").with_payload(payload);
runner.rpc("seq-kv", body, Box::new(read_handler));
} }
"read" => { "read" => {
let rn = runner.clone(); let rn = runner.clone();

View file

@ -26,7 +26,7 @@ fn main() {
let runner = Runner::new(node, out); let runner = Runner::new(node, out);
runner.run(rx); runner.run(rx, None);
i.join().unwrap(); i.join().unwrap();
} }

View file

@ -1,7 +1,9 @@
/* /*
use std::{rc::Rc, sync::Arc};
use serde_json::Value; use serde_json::Value;
use crate::Body; use crate::{mk_payload, Body, Message, Result, Runner};
#[derive(Debug, Default, Clone)] #[derive(Debug, Default, Clone)]
pub struct Kv { pub struct Kv {
@ -9,20 +11,40 @@ pub struct Kv {
} }
impl Kv { impl Kv {
pub fn new(service: &str) -> Self { pub fn seq() -> Self {
Kv { Kv {
service: service.to_string(), service: "seq-kv".to_string(),
} }
} }
pub fn read(&self, key: &str) -> Body {
pub fn lin() -> Self {
Kv {
service: "lin-kv".to_string(),
}
}
pub fn lww() -> Self {
Kv {
service: "lww-kv".to_string(),
}
}
pub fn read(&self, runner: &Runner, key: &str) -> Result<Value> {
todo!() todo!()
} }
pub fn write(&self, key: &str, val: Value, create: bool) -> Body { pub fn write(&self, runner: &Runner, key: &str, val: Value, create: bool) -> Result<()> {
todo!() todo!()
} }
pub fn cas(&self, key: &str, from: Value, to: Value, create: bool) -> Body { pub fn cas(
&self,
runner: &Runner,
key: &str,
from: Value,
to: Value,
create: bool,
) -> Result<()> {
todo!() todo!()
} }
} }

View file

@ -1,63 +1,70 @@
use std::{ use std::{
collections::HashMap, collections::HashMap,
io::{BufRead, Stdout, Write}, io::{BufRead, Stdout, Write},
rc::Rc,
sync::{ sync::{
atomic::{AtomicU64, AtomicUsize, Ordering}, atomic::{AtomicU64, AtomicUsize, Ordering},
mpsc::{channel, Receiver, Sender}, mpsc::{channel, Receiver, Sender},
Arc, Mutex, OnceLock, Arc, LazyLock, Mutex, OnceLock,
}, },
thread::{self, JoinHandle}, thread::{self, JoinHandle},
}; };
pub mod protocol; pub mod protocol;
use protocol::ErrorCode;
pub use protocol::{Body, Message, Payload}; pub use protocol::{Body, Message, Payload};
use serde_json::Value; use serde_json::Value;
pub mod kv; pub mod kv;
pub type DynNode = Arc<Mutex<dyn Node>>; pub type DynNode = Arc<Mutex<dyn Node>>;
pub type Handler = Box<dyn FnMut(Message)>; pub type Handler = Box<dyn FnOnce(Message)>; // -> Result<Option<Value>>>;
pub type OnInit = Box<dyn Fn(&Runner)>;
pub type Result<T> = std::result::Result<T, ErrorCode>;
static MSG_ID: AtomicU64 = AtomicU64::new(0);
static OUTPUT: LazyLock<Stdout> = LazyLock::new(std::io::stdout);
pub trait Node { pub trait Node {
fn handle(&mut self, runner: &Runner, msg: Message); fn handle(&mut self, runner: &Rc<Runner>, msg: Message);
} }
#[derive(Clone)] #[derive(Clone)]
pub struct Runner { pub struct Runner {
msg_id: Arc<AtomicU64>,
node: DynNode, node: DynNode,
node_id: OnceLock<String>, node_id: OnceLock<String>,
nodes: OnceLock<Vec<String>>, nodes: OnceLock<Vec<String>>,
steps: Arc<AtomicUsize>, steps: Arc<AtomicUsize>,
output: Arc<Mutex<Stdout>>,
handlers: Arc<Mutex<HashMap<u64, Handler>>>, handlers: Arc<Mutex<HashMap<u64, Handler>>>,
} }
impl Runner { impl Runner {
pub fn new(node: DynNode, output: Stdout) -> Self { pub fn new(node: DynNode) -> Self {
Runner { Runner {
node, node,
msg_id: Arc::new(AtomicU64::new(1)),
nodes: OnceLock::new(), nodes: OnceLock::new(),
node_id: OnceLock::new(), node_id: OnceLock::new(),
steps: Arc::new(AtomicUsize::new(0)), steps: Arc::new(AtomicUsize::new(0)),
output: Arc::new(Mutex::new(output)),
handlers: Default::default(), handlers: Default::default(),
} }
} }
pub fn run(&self, input: Receiver<Message>) { pub fn run(self: &Rc<Self>, input: Receiver<Message>, on_init: Option<OnInit>) {
for msg in input.iter() { for msg in input.iter() {
let typ = &msg.body.typ; let typ = &msg.body.typ;
if let "init" = typ.as_str() { if let "init" = typ.as_str() {
self.init(&msg); self.init(&msg);
let body = Body::from_type("init_ok"); let body = Body::from_type("init_ok");
self.reply(&msg, body); self.reply(&msg, body);
if let Some(ref h) = on_init {
h(self);
}
} else { } else {
let irt = msg.body.in_reply_to; let irt = msg.body.in_reply_to;
{ {
let mut g = self.handlers.lock().unwrap(); let mut g = self.handlers.lock().unwrap();
if let Some(mut h) = g.remove(&irt) { if let Some(h) = g.remove(&irt) {
h(msg.clone()); h(msg.clone());
} }
} }
@ -83,11 +90,11 @@ impl Runner {
} }
pub fn next_msg_id(&self) -> u64 { pub fn next_msg_id(&self) -> u64 {
self.msg_id.fetch_add(1, Ordering::SeqCst) MSG_ID.fetch_add(1, Ordering::SeqCst)
} }
pub fn cur_msg_id(&self) -> u64 { pub fn cur_msg_id(&self) -> u64 {
self.msg_id.load(Ordering::SeqCst) MSG_ID.load(Ordering::SeqCst)
} }
pub fn nodes(&self) -> &[String] { pub fn nodes(&self) -> &[String] {
@ -146,7 +153,7 @@ impl Runner {
} }
fn writeln(&self, msg: &str) { fn writeln(&self, msg: &str) {
let mut out = self.output.lock().unwrap(); let mut out = OUTPUT.lock();
let msg = format!("{msg}\n"); let msg = format!("{msg}\n");
out.write_all(msg.as_bytes()).unwrap(); out.write_all(msg.as_bytes()).unwrap();
out.flush().unwrap(); out.flush().unwrap();
@ -163,7 +170,7 @@ pub fn mk_stdin() -> (JoinHandle<()>, Sender<Message>, Receiver<Message>) {
let join = thread::spawn(move || { let join = thread::spawn(move || {
let g = stdin.lock(); let g = stdin.lock();
for line in g.lines().map_while(Result::ok) { for line in g.lines().map_while(std::result::Result::ok) {
if let Ok(msg) = serde_json::from_str(&line) { if let Ok(msg) = serde_json::from_str(&line) {
tx.send(msg).unwrap(); tx.send(msg).unwrap();
} }