Move all lib-y code into nebkor-maelstrom, have gg-echo use it fully.

This commit is contained in:
Joe Ardent 2024-05-20 12:42:11 -07:00
parent 634ba4a2ba
commit 851e9a6711
8 changed files with 152 additions and 139 deletions

20
Cargo.lock generated
View file

@ -198,7 +198,7 @@ dependencies = [
name = "gg-echo"
version = "0.0.1"
dependencies = [
"maelstrom-protocol",
"nebkor-maelstrom",
"serde_json",
]
@ -287,15 +287,6 @@ dependencies = [
"tokio-util",
]
[[package]]
name = "maelstrom-protocol"
version = "0.0.1"
dependencies = [
"serde",
"serde_json",
"serde_repr",
]
[[package]]
name = "memchr"
version = "2.7.2"
@ -322,6 +313,15 @@ dependencies = [
"windows-sys 0.48.0",
]
[[package]]
name = "nebkor-maelstrom"
version = "0.0.1"
dependencies = [
"serde",
"serde_json",
"serde_repr",
]
[[package]]
name = "num_cpus"
version = "1.16.0"

View file

@ -1,5 +1,5 @@
[workspace]
members = ["maelstrom-protocol", "gg-echo", "gg-uid", "gg-broadcast"]
members = ["gg-echo", "gg-uid", "gg-broadcast", "nebkor-maelstrom"]
resolver = "2"
[workspace.package]

View file

@ -1 +1,5 @@
Working through the [Fly.io distributed systems challenge](https://fly.io/dist-sys/), in Rust.
The `nebkor-maelstrom` crate is meant to be roughly equivalent to the Go Maelstrom.Node package, and
provides serde-backed data structures for messages as well as runner for processing messages from
the Maelstrom network. See `gg-echo/src/main.rs` for the simplest possible use of it.

View file

@ -6,4 +6,4 @@ authors.workspace = true
[dependencies]
serde_json.workspace = true
maelstrom-protocol = { path = "../maelstrom-protocol" }
nebkor-maelstrom = { path = "../nebkor-maelstrom" }

View file

@ -1,17 +1,11 @@
use maelstrom_protocol as proto;
use proto::{Body, Message};
use std::{
cell::OnceCell,
io::{BufRead, StdinLock, StdoutLock, Write},
sync::Mutex,
};
use std::{rc::Rc, sync::Mutex};
fn main() {
let out = std::io::stdout().lock();
let input = std::io::stdin().lock();
use nebkor_maelstrom::{Body, Message, Node, Runner};
let runner = &Runner::new(out);
let handler = |msg: &Message| {
struct Echo;
impl Node for Echo {
fn handle(&mut self, runner: &Runner, msg: &Message) {
let typ = &msg.body.typ;
match typ.as_str() {
"echo" => {
@ -20,120 +14,17 @@ fn main() {
}
_ => {}
}
};
runner.run(input, &handler);
}
struct Runner<'io> {
msg_id: Mutex<u64>,
node_id: OnceCell<String>,
nodes: OnceCell<Vec<String>>,
output: Mutex<StdoutLock<'io>>,
}
impl<'io> Runner<'io> {
pub fn new(output: StdoutLock<'io>) -> Self {
Runner {
output: Mutex::new(output),
msg_id: Mutex::new(1),
nodes: OnceCell::new(),
node_id: OnceCell::new(),
}
}
pub fn run(&self, input: StdinLock, handler: &dyn Fn(&Message)) {
for line in input.lines() {
match line {
Ok(line) => {
if let Ok(msg) = serde_json::from_str::<proto::Message>(&line) {
let typ = &msg.body.typ;
match typ.as_str() {
"init" => {
self.init(&msg);
let body = Body::from_type("init_ok");
self.reply(&msg, body);
}
_ => {
handler(&msg);
}
}
}
}
_ => {}
}
}
}
pub fn node_id(&self) -> String {
self.node_id.get().cloned().unwrap_or("".to_string())
}
pub fn msg_id(&self) -> u64 {
*self.msg_id.lock().unwrap()
}
pub fn init(&self, msg: &Message) {
let node_id = msg
.body
.payload
.get("node_id")
.unwrap()
.as_str()
.unwrap()
.to_owned();
let nodes = msg
.body
.payload
.get("node_ids")
.unwrap()
.as_array()
.unwrap()
.iter()
.map(|s| s.as_str().unwrap().to_string())
.collect::<Vec<_>>();
let _ = self.node_id.get_or_init(|| node_id.to_owned());
let _ = self.nodes.get_or_init(|| nodes.to_vec());
}
pub fn reply(&self, req: &Message, body: Body) {
let mut body = body;
let src = self.node_id.get().unwrap().to_owned();
let dest = req.src.clone();
let in_reply_to = req.body.msg_id;
body.in_reply_to = in_reply_to;
let msg = Message { src, dest, body };
self.send(msg);
}
pub fn send(&self, msg: Message) {
let mut msg = msg;
if msg.body.msg_id == 0 {
let mid = {
let mut g = self.msg_id.lock().unwrap();
let m = *g;
*g += 1;
m
};
msg.body.msg_id = mid;
}
let msg = serde_json::to_string(&msg).unwrap();
let msg = format!("{msg}\n");
self.writeln(&msg);
}
fn writeln(&self, msg: &str) {
let mut out = self.output.lock().unwrap();
out.write_all(msg.as_bytes()).unwrap();
out.flush().unwrap();
}
}
fn message(dest: &str, src: &str, body: Body) -> Message {
Message {
dest: dest.to_owned(),
src: src.to_owned(),
body,
}
fn main() {
let out = std::io::stdout().lock();
let input = std::io::stdin().lock();
let node = Echo;
let node = Rc::new(Mutex::new(node));
let runner = Runner::new(out, node);
runner.run(input);
}

View file

@ -1,10 +1,11 @@
[package]
name = "maelstrom-protocol"
name = "nebkor-maelstrom"
edition = "2021"
version.workspace = true
authors.workspace = true
license-file.workspace = true
[dependencies]
serde_json.workspace = true
serde = { version = "1", default-features = false, features = ["derive"] }
serde_json = { version = "1", default-features = false, features = ["std"] }
serde_repr = "0.1"

117
nebkor-maelstrom/src/lib.rs Normal file
View file

@ -0,0 +1,117 @@
use std::{
cell::OnceCell,
io::{BufRead, StdinLock, StdoutLock, Write},
rc::Rc,
sync::{
atomic::{AtomicU64, Ordering},
Mutex,
},
};
pub mod protocol;
pub use protocol::{Body, Message};
pub type DynNode = Rc<Mutex<dyn Node>>;
pub trait Node {
fn handle(&mut self, runner: &Runner, msg: &Message);
}
pub struct Runner<'io> {
msg_id: AtomicU64,
node: DynNode,
node_id: OnceCell<String>,
nodes: OnceCell<Vec<String>>,
output: Mutex<StdoutLock<'io>>,
steps: AtomicU64,
}
impl<'io> Runner<'io> {
pub fn new(output: StdoutLock<'io>, node: DynNode) -> Self {
Runner {
output: Mutex::new(output),
node,
msg_id: AtomicU64::new(1),
nodes: OnceCell::new(),
node_id: OnceCell::new(),
steps: AtomicU64::new(0),
}
}
pub fn run(&self, input: StdinLock) {
for line in input.lines().map_while(Result::ok) {
if let Ok(msg) = serde_json::from_str::<Message>(&line) {
let typ = &msg.body.typ;
if let "init" = typ.as_str() {
self.init(&msg);
let body = Body::from_type("init_ok");
self.reply(&msg, body);
} else {
let mut n = self.node.lock().unwrap();
n.handle(self, &msg);
}
}
self.steps.fetch_add(1, Ordering::SeqCst);
}
}
pub fn node_id(&self) -> String {
self.node_id.get().cloned().unwrap_or_default()
}
pub fn next_msg_id(&self) -> u64 {
self.msg_id.fetch_add(1, Ordering::SeqCst)
}
pub fn init(&self, msg: &Message) {
let node_id = msg
.body
.payload
.get("node_id")
.unwrap()
.as_str()
.unwrap()
.to_owned();
let nodes = msg
.body
.payload
.get("node_ids")
.unwrap()
.as_array()
.unwrap()
.iter()
.map(|s| s.as_str().unwrap().to_string())
.collect::<Vec<_>>();
let _ = self.node_id.get_or_init(|| node_id.to_owned());
let _ = self.nodes.get_or_init(|| nodes.to_vec());
}
pub fn reply(&self, req: &Message, body: Body) {
let mut body = body;
let src = self.node_id.get().unwrap().to_owned();
let dest = req.src.clone();
let in_reply_to = req.body.msg_id;
body.in_reply_to = in_reply_to;
let msg = Message { src, dest, body };
self.send(msg);
}
pub fn send(&self, msg: Message) {
let mut msg = msg;
if msg.body.msg_id == 0 {
let mid = self.next_msg_id();
msg.body.msg_id = mid;
}
let msg = serde_json::to_string(&msg).unwrap();
self.writeln(&msg);
}
fn writeln(&self, msg: &str) {
let mut out = self.output.lock().unwrap();
let msg = format!("{msg}\n");
out.write_all(msg.as_bytes()).unwrap();
out.flush().unwrap();
}
}