dry out message creation

This commit is contained in:
Joe Ardent 2024-06-04 12:18:24 -07:00
parent ecd137c662
commit 95588c0dad
1 changed files with 18 additions and 22 deletions

View File

@ -32,6 +32,7 @@ pub struct Runner {
backdoor: OnceLock<Sender<Message>>, backdoor: OnceLock<Sender<Message>>,
promises: Arc<Mutex<HashMap<u64, Sender<Message>>>>, promises: Arc<Mutex<HashMap<u64, Sender<Message>>>>,
outbound_tx: OnceLock<Sender<Message>>, outbound_tx: OnceLock<Sender<Message>>,
msg_id: AtomicU64,
} }
impl Runner { impl Runner {
@ -44,6 +45,7 @@ impl Runner {
backdoor: OnceLock::new(), backdoor: OnceLock::new(),
outbound_tx: OnceLock::new(), outbound_tx: OnceLock::new(),
promises: Default::default(), promises: Default::default(),
msg_id: AtomicU64::new(1),
} }
} }
@ -80,11 +82,11 @@ impl Runner {
} }
pub fn next_msg_id(&self) -> u64 { pub fn next_msg_id(&self) -> u64 {
MSG_ID.fetch_add(1, Ordering::SeqCst) self.msg_id.fetch_add(1, Ordering::SeqCst)
} }
pub fn cur_msg_id(&self) -> u64 { pub fn cur_msg_id(&self) -> u64 {
MSG_ID.load(Ordering::SeqCst) self.msg_id.load(Ordering::SeqCst)
} }
pub fn nodes(&self) -> &[String] { pub fn nodes(&self) -> &[String] {
@ -100,28 +102,12 @@ impl Runner {
} }
pub fn send(&self, dest: &str, body: Body) { pub fn send(&self, dest: &str, body: Body) {
let mut body = body; let msg = self.mk_msg(dest, body);
if body.msg_id == 0 {
body.msg_id = self.next_msg_id();
}
let msg = Message {
src: self.node_id().to_string(),
dest: dest.to_string(),
body,
};
self.outbound_tx.get().unwrap().send(msg).unwrap(); self.outbound_tx.get().unwrap().send(msg).unwrap();
} }
pub fn rpc(&self, dest: &str, body: Body) -> RpcPromise { pub fn rpc(&self, dest: &str, body: Body) -> RpcPromise {
let mut body = body; let msg = self.mk_msg(dest, body);
if body.msg_id == 0 {
body.msg_id = self.next_msg_id();
}
let msg = Message {
src: self.node_id().to_string(),
dest: dest.to_string(),
body,
};
let (tx, rx) = channel(); let (tx, rx) = channel();
{ {
let msg_id = msg.body.msg_id; let msg_id = msg.body.msg_id;
@ -197,6 +183,18 @@ impl Runner {
} }
}); });
} }
fn mk_msg(&self, dest: &str, body: Body) -> Message {
let mut body = body;
if body.msg_id == 0 {
body.msg_id = self.next_msg_id();
}
Message {
src: self.node_id().to_string(),
dest: dest.to_string(),
body,
}
}
} }
pub fn check_err(msg: &Message) -> RpcResult { pub fn check_err(msg: &Message) -> RpcResult {
@ -213,5 +211,3 @@ pub fn mk_payload(payload: &[(&str, Value)]) -> Payload {
.map(|p| (p.0.to_string(), p.1.clone())) .map(|p| (p.0.to_string(), p.1.clone()))
.collect() .collect()
} }
static MSG_ID: AtomicU64 = AtomicU64::new(0);