diff --git a/nebkor-maelstrom/src/lib.rs b/nebkor-maelstrom/src/lib.rs index 7f048b4..a4dded8 100644 --- a/nebkor-maelstrom/src/lib.rs +++ b/nebkor-maelstrom/src/lib.rs @@ -32,6 +32,7 @@ pub struct Runner { backdoor: OnceLock>, promises: Arc>>>, outbound_tx: OnceLock>, + msg_id: AtomicU64, } impl Runner { @@ -44,6 +45,7 @@ impl Runner { backdoor: OnceLock::new(), outbound_tx: OnceLock::new(), promises: Default::default(), + msg_id: AtomicU64::new(1), } } @@ -80,11 +82,11 @@ impl Runner { } pub fn next_msg_id(&self) -> u64 { - MSG_ID.fetch_add(1, Ordering::SeqCst) + self.msg_id.fetch_add(1, Ordering::SeqCst) } pub fn cur_msg_id(&self) -> u64 { - MSG_ID.load(Ordering::SeqCst) + self.msg_id.load(Ordering::SeqCst) } pub fn nodes(&self) -> &[String] { @@ -100,28 +102,12 @@ impl Runner { } pub fn send(&self, dest: &str, body: Body) { - let mut body = body; - if body.msg_id == 0 { - body.msg_id = self.next_msg_id(); - } - let msg = Message { - src: self.node_id().to_string(), - dest: dest.to_string(), - body, - }; + let msg = self.mk_msg(dest, body); self.outbound_tx.get().unwrap().send(msg).unwrap(); } pub fn rpc(&self, dest: &str, body: Body) -> RpcPromise { - let mut body = body; - if body.msg_id == 0 { - body.msg_id = self.next_msg_id(); - } - let msg = Message { - src: self.node_id().to_string(), - dest: dest.to_string(), - body, - }; + let msg = self.mk_msg(dest, body); let (tx, rx) = channel(); { let msg_id = msg.body.msg_id; @@ -197,6 +183,18 @@ impl Runner { } }); } + + fn mk_msg(&self, dest: &str, body: Body) -> Message { + let mut body = body; + if body.msg_id == 0 { + body.msg_id = self.next_msg_id(); + } + Message { + src: self.node_id().to_string(), + dest: dest.to_string(), + body, + } + } } pub fn check_err(msg: &Message) -> RpcResult { @@ -213,5 +211,3 @@ pub fn mk_payload(payload: &[(&str, Value)]) -> Payload { .map(|p| (p.0.to_string(), p.1.clone())) .collect() } - -static MSG_ID: AtomicU64 = AtomicU64::new(0);