adds gossiping; passes multibroadcast, not fault-tolerant.

This commit is contained in:
Joe Ardent 2024-05-17 18:05:57 -07:00
parent 8f869ff0d5
commit 75aaf3418b
1 changed files with 32 additions and 9 deletions

View File

@ -1,14 +1,14 @@
use async_trait::async_trait; use async_trait::async_trait;
use maelstrom::protocol::Message; use maelstrom::protocol::Message;
use maelstrom::{done, protocol::MessageBody, Node, Result, Runtime}; use maelstrom::{done, protocol::MessageBody, Node, Result, Runtime};
//use serde_json::{Map, Value}; use serde_json::{Map, Value};
use std::borrow::BorrowMut; use std::borrow::BorrowMut;
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
let handler = Arc::new(Handler::default()); let handler = Arc::new(BCaster::default());
let _ = Runtime::new() let _ = Runtime::new()
.with_handler(handler) .with_handler(handler)
.run() .run()
@ -19,27 +19,39 @@ async fn main() {
type Store = Arc<Mutex<HashMap<String, i64>>>; type Store = Arc<Mutex<HashMap<String, i64>>>;
#[derive(Clone, Default)] #[derive(Clone, Default)]
struct Handler { struct BCaster {
pub store: Store, pub store: Store,
} }
#[async_trait] #[async_trait]
impl Node for Handler { impl Node for BCaster {
async fn process(&self, runtime: Runtime, req: Message) -> Result<()> { async fn process(&self, runtime: Runtime, req: Message) -> Result<()> {
let typ = req.get_type(); let typ = req.get_type();
let frm = req.src.as_str();
let mid = req.body.msg_id;
let key = format!("{frm}{mid}");
let key = key.as_str();
match typ { match typ {
"broadcast" => { "broadcast" => {
let val = req.body.extra.get("message").and_then(|v| v.as_i64()); let val = req.body.extra.get("message").and_then(|v| v.as_i64());
if let Some(val) = val { if let Some(val) = val {
let frm = req.src.as_str();
let mid = req.body.msg_id;
let key = format!("{frm}{mid}");
{ {
// there's only one thread, safe to unwrap // there's only one thread, safe to unwrap
let mut guard = self.store.lock().unwrap(); let mut guard = self.store.lock().unwrap();
guard.borrow_mut().entry(key).or_insert(val); guard.borrow_mut().entry(key.to_owned()).or_insert(val);
} }
let extra: Map<String, Value> = [
("key".to_string(), key.into()),
("val".to_string(), val.into()),
]
.into_iter()
.collect();
let gossip = MessageBody::from_extra(extra).with_type("gossip");
for node in runtime.neighbours() {
runtime.send(node, gossip.clone()).await?;
}
let body = MessageBody::new().with_type("broadcast_ok"); let body = MessageBody::new().with_type("broadcast_ok");
return runtime.reply(req, body).await; return runtime.reply(req, body).await;
} }
@ -61,6 +73,17 @@ impl Node for Handler {
let body = MessageBody::new().with_type("topology_ok"); let body = MessageBody::new().with_type("topology_ok");
return runtime.reply(req, body).await; return runtime.reply(req, body).await;
} }
"gossip" => {
let key = req.body.extra.get("key").unwrap().as_str().unwrap();
let val = req.body.extra.get("val").unwrap().as_i64().unwrap();
{
// there's only one thread, safe to unwrap
let mut guard = self.store.lock().unwrap();
guard.borrow_mut().entry(key.to_string()).or_insert(val);
}
let body = MessageBody::new().with_type("gossip_ok");
return runtime.reply(req, body).await;
}
_ => {} _ => {}
} }