Compare commits
No commits in common. "634ba4a2ba3ffd7e1573f2aaa3ef574d5a0ff51d" and "0be4e33b36a17f1d73f5f8d6c56054cd73640e3e" have entirely different histories.
634ba4a2ba
...
0be4e33b36
7 changed files with 40 additions and 303 deletions
25
Cargo.lock
generated
25
Cargo.lock
generated
|
@ -198,8 +198,9 @@ dependencies = [
|
||||||
name = "gg-echo"
|
name = "gg-echo"
|
||||||
version = "0.0.1"
|
version = "0.0.1"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"maelstrom-protocol",
|
"async-trait",
|
||||||
"serde_json",
|
"maelstrom-node",
|
||||||
|
"tokio",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
|
@ -287,15 +288,6 @@ dependencies = [
|
||||||
"tokio-util",
|
"tokio-util",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "maelstrom-protocol"
|
|
||||||
version = "0.0.1"
|
|
||||||
dependencies = [
|
|
||||||
"serde",
|
|
||||||
"serde_json",
|
|
||||||
"serde_repr",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "memchr"
|
name = "memchr"
|
||||||
version = "2.7.2"
|
version = "2.7.2"
|
||||||
|
@ -481,17 +473,6 @@ dependencies = [
|
||||||
"serde",
|
"serde",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "serde_repr"
|
|
||||||
version = "0.1.19"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "6c64451ba24fc7a6a2d60fc75dd9c83c90903b19028d4eff35e88fc1e86564e9"
|
|
||||||
dependencies = [
|
|
||||||
"proc-macro2",
|
|
||||||
"quote",
|
|
||||||
"syn",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "signal-hook-registry"
|
name = "signal-hook-registry"
|
||||||
version = "1.4.2"
|
version = "1.4.2"
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
[workspace]
|
[workspace]
|
||||||
members = ["maelstrom-protocol", "gg-echo", "gg-uid", "gg-broadcast"]
|
members = ["gg-echo", "gg-uid", "gg-broadcast"]
|
||||||
resolver = "2"
|
resolver = "2"
|
||||||
|
|
||||||
[workspace.package]
|
[workspace.package]
|
||||||
|
|
|
@ -25,15 +25,12 @@ struct BCaster {
|
||||||
pub gossips: Gossips,
|
pub gossips: Gossips,
|
||||||
}
|
}
|
||||||
|
|
||||||
static GOSSIP: &str = "gossip";
|
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl Node for BCaster {
|
impl Node for BCaster {
|
||||||
async fn process(&self, runtime: Runtime, req: Message) -> Result<()> {
|
async fn process(&self, runtime: Runtime, req: Message) -> Result<()> {
|
||||||
let typ = req.get_type();
|
let typ = req.get_type();
|
||||||
let frm = req.src.as_str();
|
let frm = req.src.as_str();
|
||||||
let mid = req.body.msg_id;
|
let mid = req.body.msg_id;
|
||||||
let nid = runtime.node_id();
|
|
||||||
let key = format!("{frm}{mid}");
|
let key = format!("{frm}{mid}");
|
||||||
let key = key.as_str();
|
let key = key.as_str();
|
||||||
|
|
||||||
|
@ -49,17 +46,6 @@ impl Node for BCaster {
|
||||||
}
|
}
|
||||||
|
|
||||||
match typ {
|
match typ {
|
||||||
// "g_ok" => {
|
|
||||||
// let id = req.body.in_reply_to;
|
|
||||||
// let key = (id, frm.to_owned());
|
|
||||||
// eprintln!("{nid} got gossip_ok for msg {id} from {frm}");
|
|
||||||
// {
|
|
||||||
// let mut g = self.gossips.lock().unwrap();
|
|
||||||
// if g.remove(&key).is_some() {
|
|
||||||
// eprintln!("{} removed message {id}", runtime.node_id());
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
"broadcast" => {
|
"broadcast" => {
|
||||||
let val = req.body.extra.get("message").and_then(|v| v.as_i64());
|
let val = req.body.extra.get("message").and_then(|v| v.as_i64());
|
||||||
if let Some(val) = val {
|
if let Some(val) = val {
|
||||||
|
@ -75,7 +61,7 @@ impl Node for BCaster {
|
||||||
]
|
]
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.collect();
|
.collect();
|
||||||
let gossip = MessageBody::from_extra(extra).with_type(GOSSIP);
|
let gossip = MessageBody::from_extra(extra).with_type("gossip");
|
||||||
for node in runtime.neighbours() {
|
for node in runtime.neighbours() {
|
||||||
let id = runtime.next_msg_id();
|
let id = runtime.next_msg_id();
|
||||||
let gossip = gossip.clone().and_msg_id(id);
|
let gossip = gossip.clone().and_msg_id(id);
|
||||||
|
@ -83,7 +69,7 @@ impl Node for BCaster {
|
||||||
let mut g = self.gossips.lock().unwrap();
|
let mut g = self.gossips.lock().unwrap();
|
||||||
g.insert((id, node.to_owned()), gossip.clone());
|
g.insert((id, node.to_owned()), gossip.clone());
|
||||||
}
|
}
|
||||||
runtime.rpc(node, gossip).await.unwrap_or_default();
|
runtime.send(node, gossip).await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
let body = MessageBody::new().with_type("broadcast_ok");
|
let body = MessageBody::new().with_type("broadcast_ok");
|
||||||
|
@ -91,7 +77,6 @@ impl Node for BCaster {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
"read" => {
|
"read" => {
|
||||||
eprintln!("{req:?}");
|
|
||||||
let vals = {
|
let vals = {
|
||||||
let g = self.store.lock().unwrap();
|
let g = self.store.lock().unwrap();
|
||||||
g.values().cloned().collect::<Vec<_>>()
|
g.values().cloned().collect::<Vec<_>>()
|
||||||
|
@ -114,15 +99,21 @@ impl Node for BCaster {
|
||||||
{
|
{
|
||||||
// there's only one thread, safe to unwrap
|
// there's only one thread, safe to unwrap
|
||||||
let mut guard = self.store.lock().unwrap();
|
let mut guard = self.store.lock().unwrap();
|
||||||
guard.entry(key.to_string()).or_insert(val);
|
guard.borrow_mut().entry(key.to_string()).or_insert(val);
|
||||||
}
|
}
|
||||||
let body = MessageBody::new().with_type("g_ok");
|
let body = MessageBody::new().with_type("gossip_ok");
|
||||||
return runtime.reply(req, body).await;
|
return runtime.reply(req, body).await;
|
||||||
}
|
}
|
||||||
_ => {
|
"gossip_ok" => {
|
||||||
eprintln!("unknown type: {req:?}");
|
let id = req.body.in_reply_to;
|
||||||
|
let key = (id, frm.to_owned());
|
||||||
|
{
|
||||||
|
let mut g = self.gossips.lock().unwrap();
|
||||||
|
g.remove(&key);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
_ => {}
|
||||||
|
}
|
||||||
|
|
||||||
done(runtime, req)
|
done(runtime, req)
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,5 +5,6 @@ version.workspace = true
|
||||||
authors.workspace = true
|
authors.workspace = true
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
serde_json.workspace = true
|
async-trait.workspace = true
|
||||||
maelstrom-protocol = { path = "../maelstrom-protocol" }
|
maelstrom-node.workspace = true
|
||||||
|
tokio.workspace = true
|
||||||
|
|
|
@ -1,139 +1,29 @@
|
||||||
use maelstrom_protocol as proto;
|
use async_trait::async_trait;
|
||||||
use proto::{Body, Message};
|
use maelstrom::protocol::Message;
|
||||||
use std::{
|
use maelstrom::{done, Node, Result, Runtime};
|
||||||
cell::OnceCell,
|
use std::sync::Arc;
|
||||||
io::{BufRead, StdinLock, StdoutLock, Write},
|
|
||||||
sync::Mutex,
|
|
||||||
};
|
|
||||||
|
|
||||||
fn main() {
|
#[tokio::main]
|
||||||
let out = std::io::stdout().lock();
|
async fn main() {
|
||||||
let input = std::io::stdin().lock();
|
let handler = Arc::new(Handler::default());
|
||||||
|
let _ = Runtime::new()
|
||||||
let runner = &Runner::new(out);
|
.with_handler(handler)
|
||||||
let handler = |msg: &Message| {
|
.run()
|
||||||
let typ = &msg.body.typ;
|
.await
|
||||||
match typ.as_str() {
|
.unwrap_or_default();
|
||||||
"echo" => {
|
|
||||||
let body = Body::from_type("echo_ok").with_payload(msg.body.payload.clone());
|
|
||||||
runner.reply(&msg, body);
|
|
||||||
}
|
|
||||||
_ => {}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
runner.run(input, &handler);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
struct Runner<'io> {
|
#[derive(Clone, Default)]
|
||||||
msg_id: Mutex<u64>,
|
struct Handler {}
|
||||||
node_id: OnceCell<String>,
|
|
||||||
nodes: OnceCell<Vec<String>>,
|
|
||||||
output: Mutex<StdoutLock<'io>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<'io> Runner<'io> {
|
#[async_trait]
|
||||||
pub fn new(output: StdoutLock<'io>) -> Self {
|
impl Node for Handler {
|
||||||
Runner {
|
async fn process(&self, runtime: Runtime, req: Message) -> Result<()> {
|
||||||
output: Mutex::new(output),
|
if req.get_type() == "echo" {
|
||||||
msg_id: Mutex::new(1),
|
let echo = req.body.clone().with_type("echo_ok");
|
||||||
nodes: OnceCell::new(),
|
return runtime.reply(req, echo).await;
|
||||||
node_id: OnceCell::new(),
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn run(&self, input: StdinLock, handler: &dyn Fn(&Message)) {
|
done(runtime, req)
|
||||||
for line in input.lines() {
|
|
||||||
match line {
|
|
||||||
Ok(line) => {
|
|
||||||
if let Ok(msg) = serde_json::from_str::<proto::Message>(&line) {
|
|
||||||
let typ = &msg.body.typ;
|
|
||||||
match typ.as_str() {
|
|
||||||
"init" => {
|
|
||||||
self.init(&msg);
|
|
||||||
|
|
||||||
let body = Body::from_type("init_ok");
|
|
||||||
self.reply(&msg, body);
|
|
||||||
}
|
|
||||||
_ => {
|
|
||||||
handler(&msg);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
_ => {}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn node_id(&self) -> String {
|
|
||||||
self.node_id.get().cloned().unwrap_or("".to_string())
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn msg_id(&self) -> u64 {
|
|
||||||
*self.msg_id.lock().unwrap()
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn init(&self, msg: &Message) {
|
|
||||||
let node_id = msg
|
|
||||||
.body
|
|
||||||
.payload
|
|
||||||
.get("node_id")
|
|
||||||
.unwrap()
|
|
||||||
.as_str()
|
|
||||||
.unwrap()
|
|
||||||
.to_owned();
|
|
||||||
let nodes = msg
|
|
||||||
.body
|
|
||||||
.payload
|
|
||||||
.get("node_ids")
|
|
||||||
.unwrap()
|
|
||||||
.as_array()
|
|
||||||
.unwrap()
|
|
||||||
.iter()
|
|
||||||
.map(|s| s.as_str().unwrap().to_string())
|
|
||||||
.collect::<Vec<_>>();
|
|
||||||
|
|
||||||
let _ = self.node_id.get_or_init(|| node_id.to_owned());
|
|
||||||
let _ = self.nodes.get_or_init(|| nodes.to_vec());
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn reply(&self, req: &Message, body: Body) {
|
|
||||||
let mut body = body;
|
|
||||||
let src = self.node_id.get().unwrap().to_owned();
|
|
||||||
let dest = req.src.clone();
|
|
||||||
let in_reply_to = req.body.msg_id;
|
|
||||||
body.in_reply_to = in_reply_to;
|
|
||||||
let msg = Message { src, dest, body };
|
|
||||||
self.send(msg);
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn send(&self, msg: Message) {
|
|
||||||
let mut msg = msg;
|
|
||||||
if msg.body.msg_id == 0 {
|
|
||||||
let mid = {
|
|
||||||
let mut g = self.msg_id.lock().unwrap();
|
|
||||||
let m = *g;
|
|
||||||
*g += 1;
|
|
||||||
m
|
|
||||||
};
|
|
||||||
msg.body.msg_id = mid;
|
|
||||||
}
|
|
||||||
let msg = serde_json::to_string(&msg).unwrap();
|
|
||||||
let msg = format!("{msg}\n");
|
|
||||||
self.writeln(&msg);
|
|
||||||
}
|
|
||||||
|
|
||||||
fn writeln(&self, msg: &str) {
|
|
||||||
let mut out = self.output.lock().unwrap();
|
|
||||||
out.write_all(msg.as_bytes()).unwrap();
|
|
||||||
out.flush().unwrap();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn message(dest: &str, src: &str, body: Body) -> Message {
|
|
||||||
Message {
|
|
||||||
dest: dest.to_owned(),
|
|
||||||
src: src.to_owned(),
|
|
||||||
body,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,10 +0,0 @@
|
||||||
[package]
|
|
||||||
name = "maelstrom-protocol"
|
|
||||||
edition = "2021"
|
|
||||||
version.workspace = true
|
|
||||||
authors.workspace = true
|
|
||||||
|
|
||||||
[dependencies]
|
|
||||||
serde = { version = "1", default-features = false, features = ["derive"] }
|
|
||||||
serde_json = { version = "1", default-features = false, features = ["std"] }
|
|
||||||
serde_repr = "0.1"
|
|
|
@ -1,116 +0,0 @@
|
||||||
use serde::{Deserialize, Serialize};
|
|
||||||
use serde_json::{Map, Value};
|
|
||||||
use serde_repr::{Deserialize_repr, Serialize_repr};
|
|
||||||
|
|
||||||
pub type Payload = Map<String, Value>;
|
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
|
||||||
pub struct Message {
|
|
||||||
pub src: String,
|
|
||||||
pub dest: String,
|
|
||||||
pub body: Body,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
|
|
||||||
pub struct Body {
|
|
||||||
#[serde(rename = "type")]
|
|
||||||
pub typ: String,
|
|
||||||
#[serde(default, skip_serializing_if = "u64_zero_by_ref")]
|
|
||||||
pub msg_id: u64,
|
|
||||||
#[serde(default, skip_serializing_if = "u64_zero_by_ref")]
|
|
||||||
pub in_reply_to: u64,
|
|
||||||
#[serde(flatten)]
|
|
||||||
pub payload: Payload,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Body {
|
|
||||||
pub fn from_type(typ: &str) -> Self {
|
|
||||||
Body {
|
|
||||||
typ: typ.to_string(),
|
|
||||||
..Default::default()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn with_msg_id(self, msg_id: u64) -> Self {
|
|
||||||
let mut b = self;
|
|
||||||
b.msg_id = msg_id;
|
|
||||||
b
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn with_in_reply_to(self, in_reply_to: u64) -> Self {
|
|
||||||
let mut b = self;
|
|
||||||
b.in_reply_to = in_reply_to;
|
|
||||||
b
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn with_payload(self, payload: Payload) -> Self {
|
|
||||||
let mut b = self;
|
|
||||||
b.payload = payload;
|
|
||||||
b
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn init_ok(msg_id: u64, in_reply_to: u64) -> Body {
|
|
||||||
Body::from_type("init_ok")
|
|
||||||
.with_msg_id(msg_id)
|
|
||||||
.with_in_reply_to(in_reply_to)
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
|
||||||
#[serde(untagged)]
|
|
||||||
pub enum ErrorCode {
|
|
||||||
Definite(DefiniteError),
|
|
||||||
Indefinite(IndefiniteError),
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize_repr, Deserialize_repr)]
|
|
||||||
#[repr(u64)]
|
|
||||||
pub enum DefiniteError {
|
|
||||||
NodeNotFound = 2,
|
|
||||||
NotSupported = 10,
|
|
||||||
TemporarilyUnavailable = 11,
|
|
||||||
MalformedRequest = 12,
|
|
||||||
Abort = 14,
|
|
||||||
KeyNotFound = 20,
|
|
||||||
KeyAlreadyExists = 21,
|
|
||||||
PreconditionFailed = 22,
|
|
||||||
TxnConflict = 30,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize_repr, Deserialize_repr)]
|
|
||||||
#[repr(u64)]
|
|
||||||
pub enum IndefiniteError {
|
|
||||||
Timeout = 0,
|
|
||||||
Crash = 13,
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn error(msg_id: u64, in_reply_to: u64, code: ErrorCode, text: Option<&str>) -> Body {
|
|
||||||
Body {
|
|
||||||
typ: "error".to_string(),
|
|
||||||
msg_id,
|
|
||||||
in_reply_to,
|
|
||||||
payload: [
|
|
||||||
("code".to_string(), serde_json::to_value(code).unwrap()),
|
|
||||||
("text".to_string(), serde_json::to_value(text).unwrap()),
|
|
||||||
]
|
|
||||||
.into_iter()
|
|
||||||
.collect(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[allow(clippy::trivially_copy_pass_by_ref)]
|
|
||||||
fn u64_zero_by_ref(num: &u64) -> bool {
|
|
||||||
*num == 0
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
mod test {
|
|
||||||
use super::*;
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn error_codes() {
|
|
||||||
let ec = ErrorCode::Definite(DefiniteError::Abort);
|
|
||||||
let e = serde_json::to_string(&ec).unwrap();
|
|
||||||
assert_eq!(&e, "14");
|
|
||||||
}
|
|
||||||
}
|
|
Loading…
Reference in a new issue