From 8f869ff0d569b198921d538bce58fcfd91022c04 Mon Sep 17 00:00:00 2001 From: Joe Ardent Date: Fri, 17 May 2024 17:27:47 -0700 Subject: [PATCH] single-node broadcast works --- Cargo.lock | 10 ++++++ Cargo.toml | 3 +- gg-broadcast/Cargo.toml | 12 +++++++ gg-broadcast/src/main.rs | 69 ++++++++++++++++++++++++++++++++++++++++ gg-uid/Cargo.toml | 2 +- 5 files changed, 94 insertions(+), 2 deletions(-) create mode 100644 gg-broadcast/Cargo.toml create mode 100644 gg-broadcast/src/main.rs diff --git a/Cargo.lock b/Cargo.lock index 3985187..02c4cdf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -184,6 +184,16 @@ dependencies = [ "slab", ] +[[package]] +name = "gg-broadcast" +version = "0.0.1" +dependencies = [ + "async-trait", + "maelstrom-node", + "serde_json", + "tokio", +] + [[package]] name = "gg-echo" version = "0.0.1" diff --git a/Cargo.toml b/Cargo.toml index 3222a10..728b983 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,5 @@ [workspace] -members = ["gg-echo", "gg-uid"] +members = ["gg-echo", "gg-uid", "gg-broadcast"] resolver = "2" [workspace.package] @@ -11,3 +11,4 @@ license-file = "LICENSE.md" async-trait = "0.1" maelstrom-node = "0.1" tokio = { version = "1", default-features = false, features = ["io-util", "io-std", "rt-multi-thread", "macros"] } +serde_json = "1" diff --git a/gg-broadcast/Cargo.toml b/gg-broadcast/Cargo.toml new file mode 100644 index 0000000..d6b9019 --- /dev/null +++ b/gg-broadcast/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "gg-broadcast" +edition = "2021" +version.workspace = true +authors.workspace = true +license-file.workspace = true + +[dependencies] +async-trait.workspace = true +maelstrom-node.workspace = true +serde_json.workspace = true +tokio.workspace = true diff --git a/gg-broadcast/src/main.rs b/gg-broadcast/src/main.rs new file mode 100644 index 0000000..e837813 --- /dev/null +++ b/gg-broadcast/src/main.rs @@ -0,0 +1,69 @@ +use async_trait::async_trait; +use maelstrom::protocol::Message; +use maelstrom::{done, protocol::MessageBody, Node, Result, Runtime}; +//use serde_json::{Map, Value}; +use std::borrow::BorrowMut; +use std::collections::HashMap; +use std::sync::{Arc, Mutex}; + +#[tokio::main] +async fn main() { + let handler = Arc::new(Handler::default()); + let _ = Runtime::new() + .with_handler(handler) + .run() + .await + .unwrap_or_default(); +} + +type Store = Arc>>; + +#[derive(Clone, Default)] +struct Handler { + pub store: Store, +} + +#[async_trait] +impl Node for Handler { + async fn process(&self, runtime: Runtime, req: Message) -> Result<()> { + let typ = req.get_type(); + + match typ { + "broadcast" => { + let val = req.body.extra.get("message").and_then(|v| v.as_i64()); + if let Some(val) = val { + let frm = req.src.as_str(); + let mid = req.body.msg_id; + let key = format!("{frm}{mid}"); + { + // there's only one thread, safe to unwrap + let mut guard = self.store.lock().unwrap(); + guard.borrow_mut().entry(key).or_insert(val); + } + let body = MessageBody::new().with_type("broadcast_ok"); + return runtime.reply(req, body).await; + } + } + "read" => { + let vals = { + let g = self.store.lock().unwrap(); + g.values().cloned().collect::>() + }; + let body = MessageBody::from_extra( + [("messages".to_string(), vals.into())] + .into_iter() + .collect(), + ) + .with_type("read_ok"); + return runtime.reply(req, body).await; + } + "topology" => { + let body = MessageBody::new().with_type("topology_ok"); + return runtime.reply(req, body).await; + } + _ => {} + } + + done(runtime, req) + } +} diff --git a/gg-uid/Cargo.toml b/gg-uid/Cargo.toml index d9a46e8..d3d06ac 100644 --- a/gg-uid/Cargo.toml +++ b/gg-uid/Cargo.toml @@ -8,5 +8,5 @@ license-file.workspace = true [dependencies] async-trait.workspace = true maelstrom-node.workspace = true -serde_json = "1.0.117" +serde_json.workspace = true tokio.workspace = true