diff --git a/gg-broadcast/src/main.rs b/gg-broadcast/src/main.rs index 3537cb7..f0085d7 100644 --- a/gg-broadcast/src/main.rs +++ b/gg-broadcast/src/main.rs @@ -42,7 +42,7 @@ fn main() { tx.send(msg).unwrap(); }); - runner.run(rx); + runner.run(rx, None); let _ = i.join(); let _ = g.join(); } diff --git a/gg-echo/src/main.rs b/gg-echo/src/main.rs index f6024e6..4105455 100644 --- a/gg-echo/src/main.rs +++ b/gg-echo/src/main.rs @@ -39,6 +39,6 @@ fn main() { let runner = Runner::new(node, out); - runner.run(rx); + runner.run(rx, None); i.join().unwrap(); } diff --git a/gg-g_counter/src/main.rs b/gg-g_counter/src/main.rs index 08df8c3..9dbdf77 100644 --- a/gg-g_counter/src/main.rs +++ b/gg-g_counter/src/main.rs @@ -1,8 +1,4 @@ -use std::{ - sync::{Arc, Mutex}, - thread, - time::Duration, -}; +use std::sync::{Arc, Mutex}; use nebkor_maelstrom::{mk_payload, mk_stdin, Body, Message, Node, Runner}; @@ -14,19 +10,22 @@ fn main() { let runner = Runner::new(node, out); - let (i, extra_input, rx) = mk_stdin(); - let init = thread::spawn(move || { - thread::sleep(Duration::from_millis(10)); - extra_input - .send(Message { - body: Body::from_type("kv_init"), - ..Default::default() - }) - .unwrap(); - }); + let (i, _, rx) = mk_stdin(); - runner.run(rx); - init.join().unwrap(); + let on_init = |rnr: &Runner| { + let payload = mk_payload(&[ + ("key", KEY.into()), + ("from", 0i64.into()), + ("to", 0i64.into()), + ("create_if_not_exists", true.into()), + ]); + let body = Body::from_type("cas").with_payload(payload); + rnr.send("seq-kv", body); + }; + + let on_init = Box::new(on_init); + + runner.run(rx, Some(on_init)); i.join().unwrap(); } @@ -42,16 +41,6 @@ impl Node for Counter { let msg_id = req.body.msg_id.to_owned(); match typ { - "kv_init" => { - let payload = mk_payload(&[ - ("key", KEY.into()), - ("from", 0i64.into()), - ("to", 0i64.into()), - ("create_if_not_exists", true.into()), - ]); - let body = Body::from_type("cas").with_payload(payload); - runner.send("seq-kv", body); - } "add" => { runner.reply(&req, Body::from_type("add_ok")); } diff --git a/gg-uid/src/main.rs b/gg-uid/src/main.rs index 3c3bb85..fb8979a 100644 --- a/gg-uid/src/main.rs +++ b/gg-uid/src/main.rs @@ -26,7 +26,7 @@ fn main() { let runner = Runner::new(node, out); - runner.run(rx); + runner.run(rx, None); i.join().unwrap(); } diff --git a/nebkor-maelstrom/src/lib.rs b/nebkor-maelstrom/src/lib.rs index b496692..212582b 100644 --- a/nebkor-maelstrom/src/lib.rs +++ b/nebkor-maelstrom/src/lib.rs @@ -17,6 +17,7 @@ pub mod kv; pub type DynNode = Arc>; pub type Handler = Box; +pub type OnInit = Box; pub trait Node { fn handle(&mut self, runner: &Runner, msg: Message); @@ -46,13 +47,16 @@ impl Runner { } } - pub fn run(&self, input: Receiver) { + pub fn run(&self, input: Receiver, on_init: Option) { for msg in input.iter() { let typ = &msg.body.typ; if let "init" = typ.as_str() { self.init(&msg); let body = Body::from_type("init_ok"); self.reply(&msg, body); + if let Some(ref h) = on_init { + h(self); + } } else { let irt = msg.body.in_reply_to; {