diff --git a/gg-broadcast/src/main.rs b/gg-broadcast/src/main.rs index 1cfe631..d658442 100644 --- a/gg-broadcast/src/main.rs +++ b/gg-broadcast/src/main.rs @@ -25,12 +25,15 @@ struct BCaster { pub gossips: Gossips, } +static GOSSIP: &str = "gossip"; + #[async_trait] impl Node for BCaster { async fn process(&self, runtime: Runtime, req: Message) -> Result<()> { let typ = req.get_type(); let frm = req.src.as_str(); let mid = req.body.msg_id; + let nid = runtime.node_id(); let key = format!("{frm}{mid}"); let key = key.as_str(); @@ -46,6 +49,17 @@ impl Node for BCaster { } match typ { + // "g_ok" => { + // let id = req.body.in_reply_to; + // let key = (id, frm.to_owned()); + // eprintln!("{nid} got gossip_ok for msg {id} from {frm}"); + // { + // let mut g = self.gossips.lock().unwrap(); + // if g.remove(&key).is_some() { + // eprintln!("{} removed message {id}", runtime.node_id()); + // } + // } + // } "broadcast" => { let val = req.body.extra.get("message").and_then(|v| v.as_i64()); if let Some(val) = val { @@ -61,7 +75,7 @@ impl Node for BCaster { ] .into_iter() .collect(); - let gossip = MessageBody::from_extra(extra).with_type("gossip"); + let gossip = MessageBody::from_extra(extra).with_type(GOSSIP); for node in runtime.neighbours() { let id = runtime.next_msg_id(); let gossip = gossip.clone().and_msg_id(id); @@ -69,7 +83,7 @@ impl Node for BCaster { let mut g = self.gossips.lock().unwrap(); g.insert((id, node.to_owned()), gossip.clone()); } - runtime.send(node, gossip).await?; + runtime.rpc(node, gossip).await.unwrap_or_default(); } let body = MessageBody::new().with_type("broadcast_ok"); @@ -77,6 +91,7 @@ impl Node for BCaster { } } "read" => { + eprintln!("{req:?}"); let vals = { let g = self.store.lock().unwrap(); g.values().cloned().collect::>() @@ -99,20 +114,14 @@ impl Node for BCaster { { // there's only one thread, safe to unwrap let mut guard = self.store.lock().unwrap(); - guard.borrow_mut().entry(key.to_string()).or_insert(val); + guard.entry(key.to_string()).or_insert(val); } - let body = MessageBody::new().with_type("gossip_ok"); + let body = MessageBody::new().with_type("g_ok"); return runtime.reply(req, body).await; } - "gossip_ok" => { - let id = req.body.in_reply_to; - let key = (id, frm.to_owned()); - { - let mut g = self.gossips.lock().unwrap(); - g.remove(&key); - } + _ => { + eprintln!("unknown type: {req:?}"); } - _ => {} } done(runtime, req)