diff --git a/gg-broadcast/src/main.rs b/gg-broadcast/src/main.rs index ff2e39d..1cfe631 100644 --- a/gg-broadcast/src/main.rs +++ b/gg-broadcast/src/main.rs @@ -17,10 +17,12 @@ async fn main() { } type Store = Arc>>; +type Gossips = Arc>>; #[derive(Clone, Default)] struct BCaster { pub store: Store, + pub gossips: Gossips, } #[async_trait] @@ -31,6 +33,18 @@ impl Node for BCaster { let mid = req.body.msg_id; let key = format!("{frm}{mid}"); let key = key.as_str(); + + { + let gossips: Vec<((_, String), MessageBody)> = { + let g = self.gossips.lock().unwrap(); + g.clone().into_iter().collect() + }; + + for ((_, node), gossip) in gossips.into_iter() { + runtime.send(node, gossip).await.unwrap_or_default(); + } + } + match typ { "broadcast" => { let val = req.body.extra.get("message").and_then(|v| v.as_i64()); @@ -49,7 +63,13 @@ impl Node for BCaster { .collect(); let gossip = MessageBody::from_extra(extra).with_type("gossip"); for node in runtime.neighbours() { - runtime.send(node, gossip.clone()).await?; + let id = runtime.next_msg_id(); + let gossip = gossip.clone().and_msg_id(id); + { + let mut g = self.gossips.lock().unwrap(); + g.insert((id, node.to_owned()), gossip.clone()); + } + runtime.send(node, gossip).await?; } let body = MessageBody::new().with_type("broadcast_ok"); @@ -84,6 +104,14 @@ impl Node for BCaster { let body = MessageBody::new().with_type("gossip_ok"); return runtime.reply(req, body).await; } + "gossip_ok" => { + let id = req.body.in_reply_to; + let key = (id, frm.to_owned()); + { + let mut g = self.gossips.lock().unwrap(); + g.remove(&key); + } + } _ => {} }