From 75aaf3418be3e5911c1bf29ca870b954a98b798b Mon Sep 17 00:00:00 2001 From: Joe Ardent Date: Fri, 17 May 2024 18:05:57 -0700 Subject: [PATCH] adds gossiping; passes multibroadcast, not fault-tolerant. --- gg-broadcast/src/main.rs | 41 +++++++++++++++++++++++++++++++--------- 1 file changed, 32 insertions(+), 9 deletions(-) diff --git a/gg-broadcast/src/main.rs b/gg-broadcast/src/main.rs index e837813..ff2e39d 100644 --- a/gg-broadcast/src/main.rs +++ b/gg-broadcast/src/main.rs @@ -1,14 +1,14 @@ use async_trait::async_trait; use maelstrom::protocol::Message; use maelstrom::{done, protocol::MessageBody, Node, Result, Runtime}; -//use serde_json::{Map, Value}; +use serde_json::{Map, Value}; use std::borrow::BorrowMut; use std::collections::HashMap; use std::sync::{Arc, Mutex}; #[tokio::main] async fn main() { - let handler = Arc::new(Handler::default()); + let handler = Arc::new(BCaster::default()); let _ = Runtime::new() .with_handler(handler) .run() @@ -19,27 +19,39 @@ async fn main() { type Store = Arc>>; #[derive(Clone, Default)] -struct Handler { +struct BCaster { pub store: Store, } #[async_trait] -impl Node for Handler { +impl Node for BCaster { async fn process(&self, runtime: Runtime, req: Message) -> Result<()> { let typ = req.get_type(); - + let frm = req.src.as_str(); + let mid = req.body.msg_id; + let key = format!("{frm}{mid}"); + let key = key.as_str(); match typ { "broadcast" => { let val = req.body.extra.get("message").and_then(|v| v.as_i64()); if let Some(val) = val { - let frm = req.src.as_str(); - let mid = req.body.msg_id; - let key = format!("{frm}{mid}"); { // there's only one thread, safe to unwrap let mut guard = self.store.lock().unwrap(); - guard.borrow_mut().entry(key).or_insert(val); + guard.borrow_mut().entry(key.to_owned()).or_insert(val); } + + let extra: Map = [ + ("key".to_string(), key.into()), + ("val".to_string(), val.into()), + ] + .into_iter() + .collect(); + let gossip = MessageBody::from_extra(extra).with_type("gossip"); + for node in runtime.neighbours() { + runtime.send(node, gossip.clone()).await?; + } + let body = MessageBody::new().with_type("broadcast_ok"); return runtime.reply(req, body).await; } @@ -61,6 +73,17 @@ impl Node for Handler { let body = MessageBody::new().with_type("topology_ok"); return runtime.reply(req, body).await; } + "gossip" => { + let key = req.body.extra.get("key").unwrap().as_str().unwrap(); + let val = req.body.extra.get("val").unwrap().as_i64().unwrap(); + { + // there's only one thread, safe to unwrap + let mut guard = self.store.lock().unwrap(); + guard.borrow_mut().entry(key.to_string()).or_insert(val); + } + let body = MessageBody::new().with_type("gossip_ok"); + return runtime.reply(req, body).await; + } _ => {} }