add support for callback on init in the runner.

This commit is contained in:
Joe Ardent 2024-05-29 14:46:09 -07:00
parent 461087023d
commit 8638ffe67f
5 changed files with 24 additions and 31 deletions

View file

@ -42,7 +42,7 @@ fn main() {
tx.send(msg).unwrap(); tx.send(msg).unwrap();
}); });
runner.run(rx); runner.run(rx, None);
let _ = i.join(); let _ = i.join();
let _ = g.join(); let _ = g.join();
} }

View file

@ -39,6 +39,6 @@ fn main() {
let runner = Runner::new(node, out); let runner = Runner::new(node, out);
runner.run(rx); runner.run(rx, None);
i.join().unwrap(); i.join().unwrap();
} }

View file

@ -1,8 +1,4 @@
use std::{ use std::sync::{Arc, Mutex};
sync::{Arc, Mutex},
thread,
time::Duration,
};
use nebkor_maelstrom::{mk_payload, mk_stdin, Body, Message, Node, Runner}; use nebkor_maelstrom::{mk_payload, mk_stdin, Body, Message, Node, Runner};
@ -14,19 +10,22 @@ fn main() {
let runner = Runner::new(node, out); let runner = Runner::new(node, out);
let (i, extra_input, rx) = mk_stdin(); let (i, _, rx) = mk_stdin();
let init = thread::spawn(move || {
thread::sleep(Duration::from_millis(10));
extra_input
.send(Message {
body: Body::from_type("kv_init"),
..Default::default()
})
.unwrap();
});
runner.run(rx); let on_init = |rnr: &Runner| {
init.join().unwrap(); let payload = mk_payload(&[
("key", KEY.into()),
("from", 0i64.into()),
("to", 0i64.into()),
("create_if_not_exists", true.into()),
]);
let body = Body::from_type("cas").with_payload(payload);
rnr.send("seq-kv", body);
};
let on_init = Box::new(on_init);
runner.run(rx, Some(on_init));
i.join().unwrap(); i.join().unwrap();
} }
@ -42,16 +41,6 @@ impl Node for Counter {
let msg_id = req.body.msg_id.to_owned(); let msg_id = req.body.msg_id.to_owned();
match typ { match typ {
"kv_init" => {
let payload = mk_payload(&[
("key", KEY.into()),
("from", 0i64.into()),
("to", 0i64.into()),
("create_if_not_exists", true.into()),
]);
let body = Body::from_type("cas").with_payload(payload);
runner.send("seq-kv", body);
}
"add" => { "add" => {
runner.reply(&req, Body::from_type("add_ok")); runner.reply(&req, Body::from_type("add_ok"));
} }

View file

@ -26,7 +26,7 @@ fn main() {
let runner = Runner::new(node, out); let runner = Runner::new(node, out);
runner.run(rx); runner.run(rx, None);
i.join().unwrap(); i.join().unwrap();
} }

View file

@ -17,6 +17,7 @@ pub mod kv;
pub type DynNode = Arc<Mutex<dyn Node>>; pub type DynNode = Arc<Mutex<dyn Node>>;
pub type Handler = Box<dyn FnMut(Message)>; pub type Handler = Box<dyn FnMut(Message)>;
pub type OnInit = Box<dyn Fn(&Runner)>;
pub trait Node { pub trait Node {
fn handle(&mut self, runner: &Runner, msg: Message); fn handle(&mut self, runner: &Runner, msg: Message);
@ -46,13 +47,16 @@ impl Runner {
} }
} }
pub fn run(&self, input: Receiver<Message>) { pub fn run(&self, input: Receiver<Message>, on_init: Option<OnInit>) {
for msg in input.iter() { for msg in input.iter() {
let typ = &msg.body.typ; let typ = &msg.body.typ;
if let "init" = typ.as_str() { if let "init" = typ.as_str() {
self.init(&msg); self.init(&msg);
let body = Body::from_type("init_ok"); let body = Body::from_type("init_ok");
self.reply(&msg, body); self.reply(&msg, body);
if let Some(ref h) = on_init {
h(self);
}
} else { } else {
let irt = msg.body.in_reply_to; let irt = msg.body.in_reply_to;
{ {