Compare commits

...

4 commits

Author SHA1 Message Date
Joe Ardent
634ba4a2ba added runner 2024-05-19 18:58:46 -07:00
Joe Ardent
35e89d7c7d echo works, only external crate is serde 2024-05-19 17:08:47 -07:00
Joe Ardent
11d6734d29 futzing 2024-05-19 15:22:20 -07:00
Joe Ardent
cdc191a3a1 re-add custom maelstrom crate 2024-05-19 15:19:43 -07:00
7 changed files with 303 additions and 40 deletions

25
Cargo.lock generated
View file

@ -198,9 +198,8 @@ dependencies = [
name = "gg-echo" name = "gg-echo"
version = "0.0.1" version = "0.0.1"
dependencies = [ dependencies = [
"async-trait", "maelstrom-protocol",
"maelstrom-node", "serde_json",
"tokio",
] ]
[[package]] [[package]]
@ -288,6 +287,15 @@ dependencies = [
"tokio-util", "tokio-util",
] ]
[[package]]
name = "maelstrom-protocol"
version = "0.0.1"
dependencies = [
"serde",
"serde_json",
"serde_repr",
]
[[package]] [[package]]
name = "memchr" name = "memchr"
version = "2.7.2" version = "2.7.2"
@ -473,6 +481,17 @@ dependencies = [
"serde", "serde",
] ]
[[package]]
name = "serde_repr"
version = "0.1.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c64451ba24fc7a6a2d60fc75dd9c83c90903b19028d4eff35e88fc1e86564e9"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]] [[package]]
name = "signal-hook-registry" name = "signal-hook-registry"
version = "1.4.2" version = "1.4.2"

View file

@ -1,5 +1,5 @@
[workspace] [workspace]
members = ["gg-echo", "gg-uid", "gg-broadcast"] members = ["maelstrom-protocol", "gg-echo", "gg-uid", "gg-broadcast"]
resolver = "2" resolver = "2"
[workspace.package] [workspace.package]

View file

@ -25,12 +25,15 @@ struct BCaster {
pub gossips: Gossips, pub gossips: Gossips,
} }
static GOSSIP: &str = "gossip";
#[async_trait] #[async_trait]
impl Node for BCaster { impl Node for BCaster {
async fn process(&self, runtime: Runtime, req: Message) -> Result<()> { async fn process(&self, runtime: Runtime, req: Message) -> Result<()> {
let typ = req.get_type(); let typ = req.get_type();
let frm = req.src.as_str(); let frm = req.src.as_str();
let mid = req.body.msg_id; let mid = req.body.msg_id;
let nid = runtime.node_id();
let key = format!("{frm}{mid}"); let key = format!("{frm}{mid}");
let key = key.as_str(); let key = key.as_str();
@ -46,6 +49,17 @@ impl Node for BCaster {
} }
match typ { match typ {
// "g_ok" => {
// let id = req.body.in_reply_to;
// let key = (id, frm.to_owned());
// eprintln!("{nid} got gossip_ok for msg {id} from {frm}");
// {
// let mut g = self.gossips.lock().unwrap();
// if g.remove(&key).is_some() {
// eprintln!("{} removed message {id}", runtime.node_id());
// }
// }
// }
"broadcast" => { "broadcast" => {
let val = req.body.extra.get("message").and_then(|v| v.as_i64()); let val = req.body.extra.get("message").and_then(|v| v.as_i64());
if let Some(val) = val { if let Some(val) = val {
@ -61,7 +75,7 @@ impl Node for BCaster {
] ]
.into_iter() .into_iter()
.collect(); .collect();
let gossip = MessageBody::from_extra(extra).with_type("gossip"); let gossip = MessageBody::from_extra(extra).with_type(GOSSIP);
for node in runtime.neighbours() { for node in runtime.neighbours() {
let id = runtime.next_msg_id(); let id = runtime.next_msg_id();
let gossip = gossip.clone().and_msg_id(id); let gossip = gossip.clone().and_msg_id(id);
@ -69,7 +83,7 @@ impl Node for BCaster {
let mut g = self.gossips.lock().unwrap(); let mut g = self.gossips.lock().unwrap();
g.insert((id, node.to_owned()), gossip.clone()); g.insert((id, node.to_owned()), gossip.clone());
} }
runtime.send(node, gossip).await?; runtime.rpc(node, gossip).await.unwrap_or_default();
} }
let body = MessageBody::new().with_type("broadcast_ok"); let body = MessageBody::new().with_type("broadcast_ok");
@ -77,6 +91,7 @@ impl Node for BCaster {
} }
} }
"read" => { "read" => {
eprintln!("{req:?}");
let vals = { let vals = {
let g = self.store.lock().unwrap(); let g = self.store.lock().unwrap();
g.values().cloned().collect::<Vec<_>>() g.values().cloned().collect::<Vec<_>>()
@ -99,21 +114,15 @@ impl Node for BCaster {
{ {
// there's only one thread, safe to unwrap // there's only one thread, safe to unwrap
let mut guard = self.store.lock().unwrap(); let mut guard = self.store.lock().unwrap();
guard.borrow_mut().entry(key.to_string()).or_insert(val); guard.entry(key.to_string()).or_insert(val);
} }
let body = MessageBody::new().with_type("gossip_ok"); let body = MessageBody::new().with_type("g_ok");
return runtime.reply(req, body).await; return runtime.reply(req, body).await;
} }
"gossip_ok" => { _ => {
let id = req.body.in_reply_to; eprintln!("unknown type: {req:?}");
let key = (id, frm.to_owned());
{
let mut g = self.gossips.lock().unwrap();
g.remove(&key);
} }
} }
_ => {}
}
done(runtime, req) done(runtime, req)
} }

View file

@ -5,6 +5,5 @@ version.workspace = true
authors.workspace = true authors.workspace = true
[dependencies] [dependencies]
async-trait.workspace = true serde_json.workspace = true
maelstrom-node.workspace = true maelstrom-protocol = { path = "../maelstrom-protocol" }
tokio.workspace = true

View file

@ -1,29 +1,139 @@
use async_trait::async_trait; use maelstrom_protocol as proto;
use maelstrom::protocol::Message; use proto::{Body, Message};
use maelstrom::{done, Node, Result, Runtime}; use std::{
use std::sync::Arc; cell::OnceCell,
io::{BufRead, StdinLock, StdoutLock, Write},
sync::Mutex,
};
#[tokio::main] fn main() {
async fn main() { let out = std::io::stdout().lock();
let handler = Arc::new(Handler::default()); let input = std::io::stdin().lock();
let _ = Runtime::new()
.with_handler(handler) let runner = &Runner::new(out);
.run() let handler = |msg: &Message| {
.await let typ = &msg.body.typ;
.unwrap_or_default(); match typ.as_str() {
"echo" => {
let body = Body::from_type("echo_ok").with_payload(msg.body.payload.clone());
runner.reply(&msg, body);
}
_ => {}
}
};
runner.run(input, &handler);
} }
#[derive(Clone, Default)] struct Runner<'io> {
struct Handler {} msg_id: Mutex<u64>,
node_id: OnceCell<String>,
#[async_trait] nodes: OnceCell<Vec<String>>,
impl Node for Handler { output: Mutex<StdoutLock<'io>>,
async fn process(&self, runtime: Runtime, req: Message) -> Result<()> {
if req.get_type() == "echo" {
let echo = req.body.clone().with_type("echo_ok");
return runtime.reply(req, echo).await;
} }
done(runtime, req) impl<'io> Runner<'io> {
pub fn new(output: StdoutLock<'io>) -> Self {
Runner {
output: Mutex::new(output),
msg_id: Mutex::new(1),
nodes: OnceCell::new(),
node_id: OnceCell::new(),
}
}
pub fn run(&self, input: StdinLock, handler: &dyn Fn(&Message)) {
for line in input.lines() {
match line {
Ok(line) => {
if let Ok(msg) = serde_json::from_str::<proto::Message>(&line) {
let typ = &msg.body.typ;
match typ.as_str() {
"init" => {
self.init(&msg);
let body = Body::from_type("init_ok");
self.reply(&msg, body);
}
_ => {
handler(&msg);
}
}
}
}
_ => {}
}
}
}
pub fn node_id(&self) -> String {
self.node_id.get().cloned().unwrap_or("".to_string())
}
pub fn msg_id(&self) -> u64 {
*self.msg_id.lock().unwrap()
}
pub fn init(&self, msg: &Message) {
let node_id = msg
.body
.payload
.get("node_id")
.unwrap()
.as_str()
.unwrap()
.to_owned();
let nodes = msg
.body
.payload
.get("node_ids")
.unwrap()
.as_array()
.unwrap()
.iter()
.map(|s| s.as_str().unwrap().to_string())
.collect::<Vec<_>>();
let _ = self.node_id.get_or_init(|| node_id.to_owned());
let _ = self.nodes.get_or_init(|| nodes.to_vec());
}
pub fn reply(&self, req: &Message, body: Body) {
let mut body = body;
let src = self.node_id.get().unwrap().to_owned();
let dest = req.src.clone();
let in_reply_to = req.body.msg_id;
body.in_reply_to = in_reply_to;
let msg = Message { src, dest, body };
self.send(msg);
}
pub fn send(&self, msg: Message) {
let mut msg = msg;
if msg.body.msg_id == 0 {
let mid = {
let mut g = self.msg_id.lock().unwrap();
let m = *g;
*g += 1;
m
};
msg.body.msg_id = mid;
}
let msg = serde_json::to_string(&msg).unwrap();
let msg = format!("{msg}\n");
self.writeln(&msg);
}
fn writeln(&self, msg: &str) {
let mut out = self.output.lock().unwrap();
out.write_all(msg.as_bytes()).unwrap();
out.flush().unwrap();
}
}
fn message(dest: &str, src: &str, body: Body) -> Message {
Message {
dest: dest.to_owned(),
src: src.to_owned(),
body,
} }
} }

View file

@ -0,0 +1,10 @@
[package]
name = "maelstrom-protocol"
edition = "2021"
version.workspace = true
authors.workspace = true
[dependencies]
serde = { version = "1", default-features = false, features = ["derive"] }
serde_json = { version = "1", default-features = false, features = ["std"] }
serde_repr = "0.1"

View file

@ -0,0 +1,116 @@
use serde::{Deserialize, Serialize};
use serde_json::{Map, Value};
use serde_repr::{Deserialize_repr, Serialize_repr};
pub type Payload = Map<String, Value>;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Message {
pub src: String,
pub dest: String,
pub body: Body,
}
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
pub struct Body {
#[serde(rename = "type")]
pub typ: String,
#[serde(default, skip_serializing_if = "u64_zero_by_ref")]
pub msg_id: u64,
#[serde(default, skip_serializing_if = "u64_zero_by_ref")]
pub in_reply_to: u64,
#[serde(flatten)]
pub payload: Payload,
}
impl Body {
pub fn from_type(typ: &str) -> Self {
Body {
typ: typ.to_string(),
..Default::default()
}
}
pub fn with_msg_id(self, msg_id: u64) -> Self {
let mut b = self;
b.msg_id = msg_id;
b
}
pub fn with_in_reply_to(self, in_reply_to: u64) -> Self {
let mut b = self;
b.in_reply_to = in_reply_to;
b
}
pub fn with_payload(self, payload: Payload) -> Self {
let mut b = self;
b.payload = payload;
b
}
}
pub fn init_ok(msg_id: u64, in_reply_to: u64) -> Body {
Body::from_type("init_ok")
.with_msg_id(msg_id)
.with_in_reply_to(in_reply_to)
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(untagged)]
pub enum ErrorCode {
Definite(DefiniteError),
Indefinite(IndefiniteError),
}
#[derive(Debug, Clone, Serialize_repr, Deserialize_repr)]
#[repr(u64)]
pub enum DefiniteError {
NodeNotFound = 2,
NotSupported = 10,
TemporarilyUnavailable = 11,
MalformedRequest = 12,
Abort = 14,
KeyNotFound = 20,
KeyAlreadyExists = 21,
PreconditionFailed = 22,
TxnConflict = 30,
}
#[derive(Debug, Clone, Serialize_repr, Deserialize_repr)]
#[repr(u64)]
pub enum IndefiniteError {
Timeout = 0,
Crash = 13,
}
pub fn error(msg_id: u64, in_reply_to: u64, code: ErrorCode, text: Option<&str>) -> Body {
Body {
typ: "error".to_string(),
msg_id,
in_reply_to,
payload: [
("code".to_string(), serde_json::to_value(code).unwrap()),
("text".to_string(), serde_json::to_value(text).unwrap()),
]
.into_iter()
.collect(),
}
}
#[allow(clippy::trivially_copy_pass_by_ref)]
fn u64_zero_by_ref(num: &u64) -> bool {
*num == 0
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn error_codes() {
let ec = ErrorCode::Definite(DefiniteError::Abort);
let e = serde_json::to_string(&ec).unwrap();
assert_eq!(&e, "14");
}
}