Compare commits
4 commits
0be4e33b36
...
634ba4a2ba
Author | SHA1 | Date | |
---|---|---|---|
|
634ba4a2ba | ||
|
35e89d7c7d | ||
|
11d6734d29 | ||
|
cdc191a3a1 |
7 changed files with 303 additions and 40 deletions
25
Cargo.lock
generated
25
Cargo.lock
generated
|
@ -198,9 +198,8 @@ dependencies = [
|
|||
name = "gg-echo"
|
||||
version = "0.0.1"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"maelstrom-node",
|
||||
"tokio",
|
||||
"maelstrom-protocol",
|
||||
"serde_json",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -288,6 +287,15 @@ dependencies = [
|
|||
"tokio-util",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "maelstrom-protocol"
|
||||
version = "0.0.1"
|
||||
dependencies = [
|
||||
"serde",
|
||||
"serde_json",
|
||||
"serde_repr",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "memchr"
|
||||
version = "2.7.2"
|
||||
|
@ -473,6 +481,17 @@ dependencies = [
|
|||
"serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "serde_repr"
|
||||
version = "0.1.19"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6c64451ba24fc7a6a2d60fc75dd9c83c90903b19028d4eff35e88fc1e86564e9"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "signal-hook-registry"
|
||||
version = "1.4.2"
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
[workspace]
|
||||
members = ["gg-echo", "gg-uid", "gg-broadcast"]
|
||||
members = ["maelstrom-protocol", "gg-echo", "gg-uid", "gg-broadcast"]
|
||||
resolver = "2"
|
||||
|
||||
[workspace.package]
|
||||
|
|
|
@ -25,12 +25,15 @@ struct BCaster {
|
|||
pub gossips: Gossips,
|
||||
}
|
||||
|
||||
static GOSSIP: &str = "gossip";
|
||||
|
||||
#[async_trait]
|
||||
impl Node for BCaster {
|
||||
async fn process(&self, runtime: Runtime, req: Message) -> Result<()> {
|
||||
let typ = req.get_type();
|
||||
let frm = req.src.as_str();
|
||||
let mid = req.body.msg_id;
|
||||
let nid = runtime.node_id();
|
||||
let key = format!("{frm}{mid}");
|
||||
let key = key.as_str();
|
||||
|
||||
|
@ -46,6 +49,17 @@ impl Node for BCaster {
|
|||
}
|
||||
|
||||
match typ {
|
||||
// "g_ok" => {
|
||||
// let id = req.body.in_reply_to;
|
||||
// let key = (id, frm.to_owned());
|
||||
// eprintln!("{nid} got gossip_ok for msg {id} from {frm}");
|
||||
// {
|
||||
// let mut g = self.gossips.lock().unwrap();
|
||||
// if g.remove(&key).is_some() {
|
||||
// eprintln!("{} removed message {id}", runtime.node_id());
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
"broadcast" => {
|
||||
let val = req.body.extra.get("message").and_then(|v| v.as_i64());
|
||||
if let Some(val) = val {
|
||||
|
@ -61,7 +75,7 @@ impl Node for BCaster {
|
|||
]
|
||||
.into_iter()
|
||||
.collect();
|
||||
let gossip = MessageBody::from_extra(extra).with_type("gossip");
|
||||
let gossip = MessageBody::from_extra(extra).with_type(GOSSIP);
|
||||
for node in runtime.neighbours() {
|
||||
let id = runtime.next_msg_id();
|
||||
let gossip = gossip.clone().and_msg_id(id);
|
||||
|
@ -69,7 +83,7 @@ impl Node for BCaster {
|
|||
let mut g = self.gossips.lock().unwrap();
|
||||
g.insert((id, node.to_owned()), gossip.clone());
|
||||
}
|
||||
runtime.send(node, gossip).await?;
|
||||
runtime.rpc(node, gossip).await.unwrap_or_default();
|
||||
}
|
||||
|
||||
let body = MessageBody::new().with_type("broadcast_ok");
|
||||
|
@ -77,6 +91,7 @@ impl Node for BCaster {
|
|||
}
|
||||
}
|
||||
"read" => {
|
||||
eprintln!("{req:?}");
|
||||
let vals = {
|
||||
let g = self.store.lock().unwrap();
|
||||
g.values().cloned().collect::<Vec<_>>()
|
||||
|
@ -99,21 +114,15 @@ impl Node for BCaster {
|
|||
{
|
||||
// there's only one thread, safe to unwrap
|
||||
let mut guard = self.store.lock().unwrap();
|
||||
guard.borrow_mut().entry(key.to_string()).or_insert(val);
|
||||
guard.entry(key.to_string()).or_insert(val);
|
||||
}
|
||||
let body = MessageBody::new().with_type("gossip_ok");
|
||||
let body = MessageBody::new().with_type("g_ok");
|
||||
return runtime.reply(req, body).await;
|
||||
}
|
||||
"gossip_ok" => {
|
||||
let id = req.body.in_reply_to;
|
||||
let key = (id, frm.to_owned());
|
||||
{
|
||||
let mut g = self.gossips.lock().unwrap();
|
||||
g.remove(&key);
|
||||
_ => {
|
||||
eprintln!("unknown type: {req:?}");
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
|
||||
done(runtime, req)
|
||||
}
|
||||
|
|
|
@ -5,6 +5,5 @@ version.workspace = true
|
|||
authors.workspace = true
|
||||
|
||||
[dependencies]
|
||||
async-trait.workspace = true
|
||||
maelstrom-node.workspace = true
|
||||
tokio.workspace = true
|
||||
serde_json.workspace = true
|
||||
maelstrom-protocol = { path = "../maelstrom-protocol" }
|
||||
|
|
|
@ -1,29 +1,139 @@
|
|||
use async_trait::async_trait;
|
||||
use maelstrom::protocol::Message;
|
||||
use maelstrom::{done, Node, Result, Runtime};
|
||||
use std::sync::Arc;
|
||||
use maelstrom_protocol as proto;
|
||||
use proto::{Body, Message};
|
||||
use std::{
|
||||
cell::OnceCell,
|
||||
io::{BufRead, StdinLock, StdoutLock, Write},
|
||||
sync::Mutex,
|
||||
};
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
let handler = Arc::new(Handler::default());
|
||||
let _ = Runtime::new()
|
||||
.with_handler(handler)
|
||||
.run()
|
||||
.await
|
||||
.unwrap_or_default();
|
||||
fn main() {
|
||||
let out = std::io::stdout().lock();
|
||||
let input = std::io::stdin().lock();
|
||||
|
||||
let runner = &Runner::new(out);
|
||||
let handler = |msg: &Message| {
|
||||
let typ = &msg.body.typ;
|
||||
match typ.as_str() {
|
||||
"echo" => {
|
||||
let body = Body::from_type("echo_ok").with_payload(msg.body.payload.clone());
|
||||
runner.reply(&msg, body);
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
};
|
||||
runner.run(input, &handler);
|
||||
}
|
||||
|
||||
#[derive(Clone, Default)]
|
||||
struct Handler {}
|
||||
struct Runner<'io> {
|
||||
msg_id: Mutex<u64>,
|
||||
node_id: OnceCell<String>,
|
||||
nodes: OnceCell<Vec<String>>,
|
||||
output: Mutex<StdoutLock<'io>>,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl Node for Handler {
|
||||
async fn process(&self, runtime: Runtime, req: Message) -> Result<()> {
|
||||
if req.get_type() == "echo" {
|
||||
let echo = req.body.clone().with_type("echo_ok");
|
||||
return runtime.reply(req, echo).await;
|
||||
impl<'io> Runner<'io> {
|
||||
pub fn new(output: StdoutLock<'io>) -> Self {
|
||||
Runner {
|
||||
output: Mutex::new(output),
|
||||
msg_id: Mutex::new(1),
|
||||
nodes: OnceCell::new(),
|
||||
node_id: OnceCell::new(),
|
||||
}
|
||||
}
|
||||
|
||||
done(runtime, req)
|
||||
pub fn run(&self, input: StdinLock, handler: &dyn Fn(&Message)) {
|
||||
for line in input.lines() {
|
||||
match line {
|
||||
Ok(line) => {
|
||||
if let Ok(msg) = serde_json::from_str::<proto::Message>(&line) {
|
||||
let typ = &msg.body.typ;
|
||||
match typ.as_str() {
|
||||
"init" => {
|
||||
self.init(&msg);
|
||||
|
||||
let body = Body::from_type("init_ok");
|
||||
self.reply(&msg, body);
|
||||
}
|
||||
_ => {
|
||||
handler(&msg);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn node_id(&self) -> String {
|
||||
self.node_id.get().cloned().unwrap_or("".to_string())
|
||||
}
|
||||
|
||||
pub fn msg_id(&self) -> u64 {
|
||||
*self.msg_id.lock().unwrap()
|
||||
}
|
||||
|
||||
pub fn init(&self, msg: &Message) {
|
||||
let node_id = msg
|
||||
.body
|
||||
.payload
|
||||
.get("node_id")
|
||||
.unwrap()
|
||||
.as_str()
|
||||
.unwrap()
|
||||
.to_owned();
|
||||
let nodes = msg
|
||||
.body
|
||||
.payload
|
||||
.get("node_ids")
|
||||
.unwrap()
|
||||
.as_array()
|
||||
.unwrap()
|
||||
.iter()
|
||||
.map(|s| s.as_str().unwrap().to_string())
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let _ = self.node_id.get_or_init(|| node_id.to_owned());
|
||||
let _ = self.nodes.get_or_init(|| nodes.to_vec());
|
||||
}
|
||||
|
||||
pub fn reply(&self, req: &Message, body: Body) {
|
||||
let mut body = body;
|
||||
let src = self.node_id.get().unwrap().to_owned();
|
||||
let dest = req.src.clone();
|
||||
let in_reply_to = req.body.msg_id;
|
||||
body.in_reply_to = in_reply_to;
|
||||
let msg = Message { src, dest, body };
|
||||
self.send(msg);
|
||||
}
|
||||
|
||||
pub fn send(&self, msg: Message) {
|
||||
let mut msg = msg;
|
||||
if msg.body.msg_id == 0 {
|
||||
let mid = {
|
||||
let mut g = self.msg_id.lock().unwrap();
|
||||
let m = *g;
|
||||
*g += 1;
|
||||
m
|
||||
};
|
||||
msg.body.msg_id = mid;
|
||||
}
|
||||
let msg = serde_json::to_string(&msg).unwrap();
|
||||
let msg = format!("{msg}\n");
|
||||
self.writeln(&msg);
|
||||
}
|
||||
|
||||
fn writeln(&self, msg: &str) {
|
||||
let mut out = self.output.lock().unwrap();
|
||||
out.write_all(msg.as_bytes()).unwrap();
|
||||
out.flush().unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
fn message(dest: &str, src: &str, body: Body) -> Message {
|
||||
Message {
|
||||
dest: dest.to_owned(),
|
||||
src: src.to_owned(),
|
||||
body,
|
||||
}
|
||||
}
|
||||
|
|
10
maelstrom-protocol/Cargo.toml
Normal file
10
maelstrom-protocol/Cargo.toml
Normal file
|
@ -0,0 +1,10 @@
|
|||
[package]
|
||||
name = "maelstrom-protocol"
|
||||
edition = "2021"
|
||||
version.workspace = true
|
||||
authors.workspace = true
|
||||
|
||||
[dependencies]
|
||||
serde = { version = "1", default-features = false, features = ["derive"] }
|
||||
serde_json = { version = "1", default-features = false, features = ["std"] }
|
||||
serde_repr = "0.1"
|
116
maelstrom-protocol/src/lib.rs
Normal file
116
maelstrom-protocol/src/lib.rs
Normal file
|
@ -0,0 +1,116 @@
|
|||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::{Map, Value};
|
||||
use serde_repr::{Deserialize_repr, Serialize_repr};
|
||||
|
||||
pub type Payload = Map<String, Value>;
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct Message {
|
||||
pub src: String,
|
||||
pub dest: String,
|
||||
pub body: Body,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
|
||||
pub struct Body {
|
||||
#[serde(rename = "type")]
|
||||
pub typ: String,
|
||||
#[serde(default, skip_serializing_if = "u64_zero_by_ref")]
|
||||
pub msg_id: u64,
|
||||
#[serde(default, skip_serializing_if = "u64_zero_by_ref")]
|
||||
pub in_reply_to: u64,
|
||||
#[serde(flatten)]
|
||||
pub payload: Payload,
|
||||
}
|
||||
|
||||
impl Body {
|
||||
pub fn from_type(typ: &str) -> Self {
|
||||
Body {
|
||||
typ: typ.to_string(),
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
|
||||
pub fn with_msg_id(self, msg_id: u64) -> Self {
|
||||
let mut b = self;
|
||||
b.msg_id = msg_id;
|
||||
b
|
||||
}
|
||||
|
||||
pub fn with_in_reply_to(self, in_reply_to: u64) -> Self {
|
||||
let mut b = self;
|
||||
b.in_reply_to = in_reply_to;
|
||||
b
|
||||
}
|
||||
|
||||
pub fn with_payload(self, payload: Payload) -> Self {
|
||||
let mut b = self;
|
||||
b.payload = payload;
|
||||
b
|
||||
}
|
||||
}
|
||||
|
||||
pub fn init_ok(msg_id: u64, in_reply_to: u64) -> Body {
|
||||
Body::from_type("init_ok")
|
||||
.with_msg_id(msg_id)
|
||||
.with_in_reply_to(in_reply_to)
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
#[serde(untagged)]
|
||||
pub enum ErrorCode {
|
||||
Definite(DefiniteError),
|
||||
Indefinite(IndefiniteError),
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize_repr, Deserialize_repr)]
|
||||
#[repr(u64)]
|
||||
pub enum DefiniteError {
|
||||
NodeNotFound = 2,
|
||||
NotSupported = 10,
|
||||
TemporarilyUnavailable = 11,
|
||||
MalformedRequest = 12,
|
||||
Abort = 14,
|
||||
KeyNotFound = 20,
|
||||
KeyAlreadyExists = 21,
|
||||
PreconditionFailed = 22,
|
||||
TxnConflict = 30,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize_repr, Deserialize_repr)]
|
||||
#[repr(u64)]
|
||||
pub enum IndefiniteError {
|
||||
Timeout = 0,
|
||||
Crash = 13,
|
||||
}
|
||||
|
||||
pub fn error(msg_id: u64, in_reply_to: u64, code: ErrorCode, text: Option<&str>) -> Body {
|
||||
Body {
|
||||
typ: "error".to_string(),
|
||||
msg_id,
|
||||
in_reply_to,
|
||||
payload: [
|
||||
("code".to_string(), serde_json::to_value(code).unwrap()),
|
||||
("text".to_string(), serde_json::to_value(text).unwrap()),
|
||||
]
|
||||
.into_iter()
|
||||
.collect(),
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(clippy::trivially_copy_pass_by_ref)]
|
||||
fn u64_zero_by_ref(num: &u64) -> bool {
|
||||
*num == 0
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn error_codes() {
|
||||
let ec = ErrorCode::Definite(DefiniteError::Abort);
|
||||
let e = serde_json::to_string(&ec).unwrap();
|
||||
assert_eq!(&e, "14");
|
||||
}
|
||||
}
|
Loading…
Reference in a new issue