This commit is contained in:
Joe Ardent 2024-05-19 15:22:20 -07:00
parent cdc191a3a1
commit 11d6734d29
1 changed files with 21 additions and 12 deletions

View File

@ -25,12 +25,15 @@ struct BCaster {
pub gossips: Gossips, pub gossips: Gossips,
} }
static GOSSIP: &str = "gossip";
#[async_trait] #[async_trait]
impl Node for BCaster { impl Node for BCaster {
async fn process(&self, runtime: Runtime, req: Message) -> Result<()> { async fn process(&self, runtime: Runtime, req: Message) -> Result<()> {
let typ = req.get_type(); let typ = req.get_type();
let frm = req.src.as_str(); let frm = req.src.as_str();
let mid = req.body.msg_id; let mid = req.body.msg_id;
let nid = runtime.node_id();
let key = format!("{frm}{mid}"); let key = format!("{frm}{mid}");
let key = key.as_str(); let key = key.as_str();
@ -46,6 +49,17 @@ impl Node for BCaster {
} }
match typ { match typ {
// "g_ok" => {
// let id = req.body.in_reply_to;
// let key = (id, frm.to_owned());
// eprintln!("{nid} got gossip_ok for msg {id} from {frm}");
// {
// let mut g = self.gossips.lock().unwrap();
// if g.remove(&key).is_some() {
// eprintln!("{} removed message {id}", runtime.node_id());
// }
// }
// }
"broadcast" => { "broadcast" => {
let val = req.body.extra.get("message").and_then(|v| v.as_i64()); let val = req.body.extra.get("message").and_then(|v| v.as_i64());
if let Some(val) = val { if let Some(val) = val {
@ -61,7 +75,7 @@ impl Node for BCaster {
] ]
.into_iter() .into_iter()
.collect(); .collect();
let gossip = MessageBody::from_extra(extra).with_type("gossip"); let gossip = MessageBody::from_extra(extra).with_type(GOSSIP);
for node in runtime.neighbours() { for node in runtime.neighbours() {
let id = runtime.next_msg_id(); let id = runtime.next_msg_id();
let gossip = gossip.clone().and_msg_id(id); let gossip = gossip.clone().and_msg_id(id);
@ -69,7 +83,7 @@ impl Node for BCaster {
let mut g = self.gossips.lock().unwrap(); let mut g = self.gossips.lock().unwrap();
g.insert((id, node.to_owned()), gossip.clone()); g.insert((id, node.to_owned()), gossip.clone());
} }
runtime.send(node, gossip).await?; runtime.rpc(node, gossip).await.unwrap_or_default();
} }
let body = MessageBody::new().with_type("broadcast_ok"); let body = MessageBody::new().with_type("broadcast_ok");
@ -77,6 +91,7 @@ impl Node for BCaster {
} }
} }
"read" => { "read" => {
eprintln!("{req:?}");
let vals = { let vals = {
let g = self.store.lock().unwrap(); let g = self.store.lock().unwrap();
g.values().cloned().collect::<Vec<_>>() g.values().cloned().collect::<Vec<_>>()
@ -99,21 +114,15 @@ impl Node for BCaster {
{ {
// there's only one thread, safe to unwrap // there's only one thread, safe to unwrap
let mut guard = self.store.lock().unwrap(); let mut guard = self.store.lock().unwrap();
guard.borrow_mut().entry(key.to_string()).or_insert(val); guard.entry(key.to_string()).or_insert(val);
} }
let body = MessageBody::new().with_type("gossip_ok"); let body = MessageBody::new().with_type("g_ok");
return runtime.reply(req, body).await; return runtime.reply(req, body).await;
} }
"gossip_ok" => { _ => {
let id = req.body.in_reply_to; eprintln!("unknown type: {req:?}");
let key = (id, frm.to_owned());
{
let mut g = self.gossips.lock().unwrap();
g.remove(&key);
} }
} }
_ => {}
}
done(runtime, req) done(runtime, req)
} }