Compare commits
4 commits
0be4e33b36
...
634ba4a2ba
Author | SHA1 | Date | |
---|---|---|---|
|
634ba4a2ba | ||
|
35e89d7c7d | ||
|
11d6734d29 | ||
|
cdc191a3a1 |
7 changed files with 303 additions and 40 deletions
25
Cargo.lock
generated
25
Cargo.lock
generated
|
@ -198,9 +198,8 @@ dependencies = [
|
||||||
name = "gg-echo"
|
name = "gg-echo"
|
||||||
version = "0.0.1"
|
version = "0.0.1"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"async-trait",
|
"maelstrom-protocol",
|
||||||
"maelstrom-node",
|
"serde_json",
|
||||||
"tokio",
|
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
|
@ -288,6 +287,15 @@ dependencies = [
|
||||||
"tokio-util",
|
"tokio-util",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "maelstrom-protocol"
|
||||||
|
version = "0.0.1"
|
||||||
|
dependencies = [
|
||||||
|
"serde",
|
||||||
|
"serde_json",
|
||||||
|
"serde_repr",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "memchr"
|
name = "memchr"
|
||||||
version = "2.7.2"
|
version = "2.7.2"
|
||||||
|
@ -473,6 +481,17 @@ dependencies = [
|
||||||
"serde",
|
"serde",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "serde_repr"
|
||||||
|
version = "0.1.19"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "6c64451ba24fc7a6a2d60fc75dd9c83c90903b19028d4eff35e88fc1e86564e9"
|
||||||
|
dependencies = [
|
||||||
|
"proc-macro2",
|
||||||
|
"quote",
|
||||||
|
"syn",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "signal-hook-registry"
|
name = "signal-hook-registry"
|
||||||
version = "1.4.2"
|
version = "1.4.2"
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
[workspace]
|
[workspace]
|
||||||
members = ["gg-echo", "gg-uid", "gg-broadcast"]
|
members = ["maelstrom-protocol", "gg-echo", "gg-uid", "gg-broadcast"]
|
||||||
resolver = "2"
|
resolver = "2"
|
||||||
|
|
||||||
[workspace.package]
|
[workspace.package]
|
||||||
|
|
|
@ -25,12 +25,15 @@ struct BCaster {
|
||||||
pub gossips: Gossips,
|
pub gossips: Gossips,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static GOSSIP: &str = "gossip";
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl Node for BCaster {
|
impl Node for BCaster {
|
||||||
async fn process(&self, runtime: Runtime, req: Message) -> Result<()> {
|
async fn process(&self, runtime: Runtime, req: Message) -> Result<()> {
|
||||||
let typ = req.get_type();
|
let typ = req.get_type();
|
||||||
let frm = req.src.as_str();
|
let frm = req.src.as_str();
|
||||||
let mid = req.body.msg_id;
|
let mid = req.body.msg_id;
|
||||||
|
let nid = runtime.node_id();
|
||||||
let key = format!("{frm}{mid}");
|
let key = format!("{frm}{mid}");
|
||||||
let key = key.as_str();
|
let key = key.as_str();
|
||||||
|
|
||||||
|
@ -46,6 +49,17 @@ impl Node for BCaster {
|
||||||
}
|
}
|
||||||
|
|
||||||
match typ {
|
match typ {
|
||||||
|
// "g_ok" => {
|
||||||
|
// let id = req.body.in_reply_to;
|
||||||
|
// let key = (id, frm.to_owned());
|
||||||
|
// eprintln!("{nid} got gossip_ok for msg {id} from {frm}");
|
||||||
|
// {
|
||||||
|
// let mut g = self.gossips.lock().unwrap();
|
||||||
|
// if g.remove(&key).is_some() {
|
||||||
|
// eprintln!("{} removed message {id}", runtime.node_id());
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
// }
|
||||||
"broadcast" => {
|
"broadcast" => {
|
||||||
let val = req.body.extra.get("message").and_then(|v| v.as_i64());
|
let val = req.body.extra.get("message").and_then(|v| v.as_i64());
|
||||||
if let Some(val) = val {
|
if let Some(val) = val {
|
||||||
|
@ -61,7 +75,7 @@ impl Node for BCaster {
|
||||||
]
|
]
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.collect();
|
.collect();
|
||||||
let gossip = MessageBody::from_extra(extra).with_type("gossip");
|
let gossip = MessageBody::from_extra(extra).with_type(GOSSIP);
|
||||||
for node in runtime.neighbours() {
|
for node in runtime.neighbours() {
|
||||||
let id = runtime.next_msg_id();
|
let id = runtime.next_msg_id();
|
||||||
let gossip = gossip.clone().and_msg_id(id);
|
let gossip = gossip.clone().and_msg_id(id);
|
||||||
|
@ -69,7 +83,7 @@ impl Node for BCaster {
|
||||||
let mut g = self.gossips.lock().unwrap();
|
let mut g = self.gossips.lock().unwrap();
|
||||||
g.insert((id, node.to_owned()), gossip.clone());
|
g.insert((id, node.to_owned()), gossip.clone());
|
||||||
}
|
}
|
||||||
runtime.send(node, gossip).await?;
|
runtime.rpc(node, gossip).await.unwrap_or_default();
|
||||||
}
|
}
|
||||||
|
|
||||||
let body = MessageBody::new().with_type("broadcast_ok");
|
let body = MessageBody::new().with_type("broadcast_ok");
|
||||||
|
@ -77,6 +91,7 @@ impl Node for BCaster {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
"read" => {
|
"read" => {
|
||||||
|
eprintln!("{req:?}");
|
||||||
let vals = {
|
let vals = {
|
||||||
let g = self.store.lock().unwrap();
|
let g = self.store.lock().unwrap();
|
||||||
g.values().cloned().collect::<Vec<_>>()
|
g.values().cloned().collect::<Vec<_>>()
|
||||||
|
@ -99,21 +114,15 @@ impl Node for BCaster {
|
||||||
{
|
{
|
||||||
// there's only one thread, safe to unwrap
|
// there's only one thread, safe to unwrap
|
||||||
let mut guard = self.store.lock().unwrap();
|
let mut guard = self.store.lock().unwrap();
|
||||||
guard.borrow_mut().entry(key.to_string()).or_insert(val);
|
guard.entry(key.to_string()).or_insert(val);
|
||||||
}
|
}
|
||||||
let body = MessageBody::new().with_type("gossip_ok");
|
let body = MessageBody::new().with_type("g_ok");
|
||||||
return runtime.reply(req, body).await;
|
return runtime.reply(req, body).await;
|
||||||
}
|
}
|
||||||
"gossip_ok" => {
|
_ => {
|
||||||
let id = req.body.in_reply_to;
|
eprintln!("unknown type: {req:?}");
|
||||||
let key = (id, frm.to_owned());
|
|
||||||
{
|
|
||||||
let mut g = self.gossips.lock().unwrap();
|
|
||||||
g.remove(&key);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
_ => {}
|
|
||||||
}
|
|
||||||
|
|
||||||
done(runtime, req)
|
done(runtime, req)
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,6 +5,5 @@ version.workspace = true
|
||||||
authors.workspace = true
|
authors.workspace = true
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
async-trait.workspace = true
|
serde_json.workspace = true
|
||||||
maelstrom-node.workspace = true
|
maelstrom-protocol = { path = "../maelstrom-protocol" }
|
||||||
tokio.workspace = true
|
|
||||||
|
|
|
@ -1,29 +1,139 @@
|
||||||
use async_trait::async_trait;
|
use maelstrom_protocol as proto;
|
||||||
use maelstrom::protocol::Message;
|
use proto::{Body, Message};
|
||||||
use maelstrom::{done, Node, Result, Runtime};
|
use std::{
|
||||||
use std::sync::Arc;
|
cell::OnceCell,
|
||||||
|
io::{BufRead, StdinLock, StdoutLock, Write},
|
||||||
|
sync::Mutex,
|
||||||
|
};
|
||||||
|
|
||||||
#[tokio::main]
|
fn main() {
|
||||||
async fn main() {
|
let out = std::io::stdout().lock();
|
||||||
let handler = Arc::new(Handler::default());
|
let input = std::io::stdin().lock();
|
||||||
let _ = Runtime::new()
|
|
||||||
.with_handler(handler)
|
let runner = &Runner::new(out);
|
||||||
.run()
|
let handler = |msg: &Message| {
|
||||||
.await
|
let typ = &msg.body.typ;
|
||||||
.unwrap_or_default();
|
match typ.as_str() {
|
||||||
|
"echo" => {
|
||||||
|
let body = Body::from_type("echo_ok").with_payload(msg.body.payload.clone());
|
||||||
|
runner.reply(&msg, body);
|
||||||
|
}
|
||||||
|
_ => {}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
runner.run(input, &handler);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, Default)]
|
struct Runner<'io> {
|
||||||
struct Handler {}
|
msg_id: Mutex<u64>,
|
||||||
|
node_id: OnceCell<String>,
|
||||||
#[async_trait]
|
nodes: OnceCell<Vec<String>>,
|
||||||
impl Node for Handler {
|
output: Mutex<StdoutLock<'io>>,
|
||||||
async fn process(&self, runtime: Runtime, req: Message) -> Result<()> {
|
|
||||||
if req.get_type() == "echo" {
|
|
||||||
let echo = req.body.clone().with_type("echo_ok");
|
|
||||||
return runtime.reply(req, echo).await;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
done(runtime, req)
|
impl<'io> Runner<'io> {
|
||||||
|
pub fn new(output: StdoutLock<'io>) -> Self {
|
||||||
|
Runner {
|
||||||
|
output: Mutex::new(output),
|
||||||
|
msg_id: Mutex::new(1),
|
||||||
|
nodes: OnceCell::new(),
|
||||||
|
node_id: OnceCell::new(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn run(&self, input: StdinLock, handler: &dyn Fn(&Message)) {
|
||||||
|
for line in input.lines() {
|
||||||
|
match line {
|
||||||
|
Ok(line) => {
|
||||||
|
if let Ok(msg) = serde_json::from_str::<proto::Message>(&line) {
|
||||||
|
let typ = &msg.body.typ;
|
||||||
|
match typ.as_str() {
|
||||||
|
"init" => {
|
||||||
|
self.init(&msg);
|
||||||
|
|
||||||
|
let body = Body::from_type("init_ok");
|
||||||
|
self.reply(&msg, body);
|
||||||
|
}
|
||||||
|
_ => {
|
||||||
|
handler(&msg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_ => {}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn node_id(&self) -> String {
|
||||||
|
self.node_id.get().cloned().unwrap_or("".to_string())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn msg_id(&self) -> u64 {
|
||||||
|
*self.msg_id.lock().unwrap()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn init(&self, msg: &Message) {
|
||||||
|
let node_id = msg
|
||||||
|
.body
|
||||||
|
.payload
|
||||||
|
.get("node_id")
|
||||||
|
.unwrap()
|
||||||
|
.as_str()
|
||||||
|
.unwrap()
|
||||||
|
.to_owned();
|
||||||
|
let nodes = msg
|
||||||
|
.body
|
||||||
|
.payload
|
||||||
|
.get("node_ids")
|
||||||
|
.unwrap()
|
||||||
|
.as_array()
|
||||||
|
.unwrap()
|
||||||
|
.iter()
|
||||||
|
.map(|s| s.as_str().unwrap().to_string())
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
|
let _ = self.node_id.get_or_init(|| node_id.to_owned());
|
||||||
|
let _ = self.nodes.get_or_init(|| nodes.to_vec());
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn reply(&self, req: &Message, body: Body) {
|
||||||
|
let mut body = body;
|
||||||
|
let src = self.node_id.get().unwrap().to_owned();
|
||||||
|
let dest = req.src.clone();
|
||||||
|
let in_reply_to = req.body.msg_id;
|
||||||
|
body.in_reply_to = in_reply_to;
|
||||||
|
let msg = Message { src, dest, body };
|
||||||
|
self.send(msg);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn send(&self, msg: Message) {
|
||||||
|
let mut msg = msg;
|
||||||
|
if msg.body.msg_id == 0 {
|
||||||
|
let mid = {
|
||||||
|
let mut g = self.msg_id.lock().unwrap();
|
||||||
|
let m = *g;
|
||||||
|
*g += 1;
|
||||||
|
m
|
||||||
|
};
|
||||||
|
msg.body.msg_id = mid;
|
||||||
|
}
|
||||||
|
let msg = serde_json::to_string(&msg).unwrap();
|
||||||
|
let msg = format!("{msg}\n");
|
||||||
|
self.writeln(&msg);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn writeln(&self, msg: &str) {
|
||||||
|
let mut out = self.output.lock().unwrap();
|
||||||
|
out.write_all(msg.as_bytes()).unwrap();
|
||||||
|
out.flush().unwrap();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn message(dest: &str, src: &str, body: Body) -> Message {
|
||||||
|
Message {
|
||||||
|
dest: dest.to_owned(),
|
||||||
|
src: src.to_owned(),
|
||||||
|
body,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
10
maelstrom-protocol/Cargo.toml
Normal file
10
maelstrom-protocol/Cargo.toml
Normal file
|
@ -0,0 +1,10 @@
|
||||||
|
[package]
|
||||||
|
name = "maelstrom-protocol"
|
||||||
|
edition = "2021"
|
||||||
|
version.workspace = true
|
||||||
|
authors.workspace = true
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
serde = { version = "1", default-features = false, features = ["derive"] }
|
||||||
|
serde_json = { version = "1", default-features = false, features = ["std"] }
|
||||||
|
serde_repr = "0.1"
|
116
maelstrom-protocol/src/lib.rs
Normal file
116
maelstrom-protocol/src/lib.rs
Normal file
|
@ -0,0 +1,116 @@
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use serde_json::{Map, Value};
|
||||||
|
use serde_repr::{Deserialize_repr, Serialize_repr};
|
||||||
|
|
||||||
|
pub type Payload = Map<String, Value>;
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
pub struct Message {
|
||||||
|
pub src: String,
|
||||||
|
pub dest: String,
|
||||||
|
pub body: Body,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
|
||||||
|
pub struct Body {
|
||||||
|
#[serde(rename = "type")]
|
||||||
|
pub typ: String,
|
||||||
|
#[serde(default, skip_serializing_if = "u64_zero_by_ref")]
|
||||||
|
pub msg_id: u64,
|
||||||
|
#[serde(default, skip_serializing_if = "u64_zero_by_ref")]
|
||||||
|
pub in_reply_to: u64,
|
||||||
|
#[serde(flatten)]
|
||||||
|
pub payload: Payload,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Body {
|
||||||
|
pub fn from_type(typ: &str) -> Self {
|
||||||
|
Body {
|
||||||
|
typ: typ.to_string(),
|
||||||
|
..Default::default()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn with_msg_id(self, msg_id: u64) -> Self {
|
||||||
|
let mut b = self;
|
||||||
|
b.msg_id = msg_id;
|
||||||
|
b
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn with_in_reply_to(self, in_reply_to: u64) -> Self {
|
||||||
|
let mut b = self;
|
||||||
|
b.in_reply_to = in_reply_to;
|
||||||
|
b
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn with_payload(self, payload: Payload) -> Self {
|
||||||
|
let mut b = self;
|
||||||
|
b.payload = payload;
|
||||||
|
b
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn init_ok(msg_id: u64, in_reply_to: u64) -> Body {
|
||||||
|
Body::from_type("init_ok")
|
||||||
|
.with_msg_id(msg_id)
|
||||||
|
.with_in_reply_to(in_reply_to)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
#[serde(untagged)]
|
||||||
|
pub enum ErrorCode {
|
||||||
|
Definite(DefiniteError),
|
||||||
|
Indefinite(IndefiniteError),
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize_repr, Deserialize_repr)]
|
||||||
|
#[repr(u64)]
|
||||||
|
pub enum DefiniteError {
|
||||||
|
NodeNotFound = 2,
|
||||||
|
NotSupported = 10,
|
||||||
|
TemporarilyUnavailable = 11,
|
||||||
|
MalformedRequest = 12,
|
||||||
|
Abort = 14,
|
||||||
|
KeyNotFound = 20,
|
||||||
|
KeyAlreadyExists = 21,
|
||||||
|
PreconditionFailed = 22,
|
||||||
|
TxnConflict = 30,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize_repr, Deserialize_repr)]
|
||||||
|
#[repr(u64)]
|
||||||
|
pub enum IndefiniteError {
|
||||||
|
Timeout = 0,
|
||||||
|
Crash = 13,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn error(msg_id: u64, in_reply_to: u64, code: ErrorCode, text: Option<&str>) -> Body {
|
||||||
|
Body {
|
||||||
|
typ: "error".to_string(),
|
||||||
|
msg_id,
|
||||||
|
in_reply_to,
|
||||||
|
payload: [
|
||||||
|
("code".to_string(), serde_json::to_value(code).unwrap()),
|
||||||
|
("text".to_string(), serde_json::to_value(text).unwrap()),
|
||||||
|
]
|
||||||
|
.into_iter()
|
||||||
|
.collect(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[allow(clippy::trivially_copy_pass_by_ref)]
|
||||||
|
fn u64_zero_by_ref(num: &u64) -> bool {
|
||||||
|
*num == 0
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod test {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn error_codes() {
|
||||||
|
let ec = ErrorCode::Definite(DefiniteError::Abort);
|
||||||
|
let e = serde_json::to_string(&ec).unwrap();
|
||||||
|
assert_eq!(&e, "14");
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in a new issue