Make handle take Message by value.

This makes closures easier to deal with in RPC calls.
This commit is contained in:
Joe Ardent 2024-05-29 13:55:29 -07:00
parent 9e1d05983d
commit 06a1b58e4e
6 changed files with 53 additions and 51 deletions

View file

@ -106,7 +106,7 @@ impl BCaster {
} }
impl Node for BCaster { impl Node for BCaster {
fn handle(&mut self, runner: &Runner, req: &Message) { fn handle(&mut self, runner: &Runner, req: Message) {
let typ = req.body.typ.as_str(); let typ = req.body.typ.as_str();
let frm = req.src.as_str(); let frm = req.src.as_str();
let nid = runner.node_id(); let nid = runner.node_id();
@ -121,7 +121,7 @@ impl Node for BCaster {
if let Some(val) = val { if let Some(val) = val {
self.store.insert(val); self.store.insert(val);
let body = Body::from_type("broadcast_ok"); let body = Body::from_type("broadcast_ok");
runner.reply(req, body); runner.reply(&req, body);
} }
} }
"read" => { "read" => {
@ -131,12 +131,12 @@ impl Node for BCaster {
.collect(); .collect();
let body = Body::from_type("read_ok").with_payload(payload); let body = Body::from_type("read_ok").with_payload(payload);
runner.reply(req, body); runner.reply(&req, body);
} }
"topology" => { "topology" => {
self.topology(runner.nodes(), nid); self.topology(runner.nodes(), nid);
let body = Body::from_type("topology_ok"); let body = Body::from_type("topology_ok");
runner.reply(req, body); runner.reply(&req, body);
} }
"gossip" => { "gossip" => {
let goss = req let goss = req

View file

@ -9,11 +9,11 @@ use nebkor_maelstrom::{Body, Message, Node, Runner};
struct Echo; struct Echo;
impl Node for Echo { impl Node for Echo {
fn handle(&mut self, runner: &Runner, msg: &Message) { fn handle(&mut self, runner: &Runner, msg: Message) {
let typ = &msg.body.typ; let typ = &msg.body.typ;
if typ.as_str() == "echo" { if typ.as_str() == "echo" {
let body = Body::from_type("echo_ok").with_payload(msg.body.payload.clone()); let body = Body::from_type("echo_ok").with_payload(msg.body.payload.clone());
runner.reply(msg, body); runner.reply(&msg, body);
} }
} }
} }

View file

@ -1,13 +1,10 @@
use std::{ use std::{
collections::{HashMap, HashSet}, sync::{Arc, Mutex},
io::BufRead,
sync::{mpsc::channel, Arc, Mutex},
thread, thread,
time::Duration, time::Duration,
}; };
use nebkor_maelstrom::{mk_payload, mk_stdin, protocol::Payload, Body, Message, Node, Runner}; use nebkor_maelstrom::{mk_payload, mk_stdin, Body, Message, Node, Runner};
use rand::Rng;
fn main() { fn main() {
let out = std::io::stdout(); let out = std::io::stdout();
@ -16,11 +13,10 @@ fn main() {
let node = Arc::new(Mutex::new(node)); let node = Arc::new(Mutex::new(node));
let runner = Runner::new(node, out); let runner = Runner::new(node, out);
let runner = &runner;
let (i, extra_input, rx) = mk_stdin(); let (i, extra_input, rx) = mk_stdin();
let init = thread::spawn(move || { let init = thread::spawn(move || {
thread::sleep(Duration::from_millis(101)); thread::sleep(Duration::from_millis(10));
extra_input extra_input
.send(Message { .send(Message {
body: Body::from_type("kv_init"), body: Body::from_type("kv_init"),
@ -40,18 +36,13 @@ const KEY: &str = "COUNTER";
struct Counter; struct Counter;
impl Node for Counter { impl Node for Counter {
fn handle(&mut self, runner: &Runner, req: &Message) { fn handle<'slf>(&'slf mut self, runner: &'slf Runner, req: Message) {
let typ = req.body.typ.as_str(); let typ = req.body.typ.as_str();
let frm = req.src.as_str(); let frm = req.src.clone();
let nid = runner.node_id(); let msg_id = req.body.msg_id.to_owned();
let nid = nid.as_str();
let msg_id = req.body.msg_id;
match typ { match typ {
"kv_init" => { "kv_init" => {
let mut nodes = runner.nodes().to_vec();
nodes.sort_unstable();
if nodes[0] == nid {
let payload = mk_payload(&[ let payload = mk_payload(&[
("key", KEY.into()), ("key", KEY.into()),
("from", 0i64.into()), ("from", 0i64.into()),
@ -61,15 +52,25 @@ impl Node for Counter {
let body = Body::from_type("cas").with_payload(payload); let body = Body::from_type("cas").with_payload(payload);
runner.send("seq-kv", body); runner.send("seq-kv", body);
} }
}
"add" => { "add" => {
let mut h = |msg: &Message| { runner.reply(&req, Body::from_type("add_ok"));
let irt = msg_id; }
let body = Body::from_type("add_ok").with_in_reply_to(irt); "read" => {
runner.send(frm, body); let rn = runner.clone();
}; let h = move |msg: Message| {
let src = frm.clone();
let value = msg.body.payload.get("value").unwrap().as_i64().unwrap();
let irt = msg_id;
let payload = mk_payload(&[("value", value.into())]);
let body = Body::from_type("read_ok")
.with_in_reply_to(irt)
.with_payload(payload);
rn.send(&src, body);
};
let payload = mk_payload(&[("key", KEY.into())]);
let body = Body::from_type("read").with_payload(payload);
runner.rpc("seq-kv", body, Box::new(h));
} }
"read" => {}
_ => { _ => {
eprintln!("unknown type: {req:?}"); eprintln!("unknown type: {req:?}");
} }

View file

@ -34,7 +34,7 @@ fn main() {
struct GenUid; struct GenUid;
impl Node for GenUid { impl Node for GenUid {
fn handle(&mut self, runner: &Runner, msg: &Message) { fn handle(&mut self, runner: &Runner, msg: Message) {
let id = runner.node_id(); let id = runner.node_id();
let mid = runner.next_msg_id(); let mid = runner.next_msg_id();
if msg.body.typ == "generate" { if msg.body.typ == "generate" {
@ -45,7 +45,7 @@ impl Node for GenUid {
.with_msg_id(mid) .with_msg_id(mid)
.with_payload(payload); .with_payload(payload);
runner.reply(msg, body); runner.reply(&msg, body);
} }
} }
} }

View file

@ -1,4 +1,4 @@
use serde::{Deserialize, Serialize}; /*
use serde_json::Value; use serde_json::Value;
use crate::Body; use crate::Body;
@ -22,7 +22,8 @@ impl Kv {
todo!() todo!()
} }
pub fn cas(&self, val: Value) -> Body { pub fn cas(&self, key: &str, from: Value, to: Value, create: bool) -> Body {
todo!() todo!()
} }
} }
*/

View file

@ -1,5 +1,4 @@
use std::{ use std::{
any::Any,
collections::HashMap, collections::HashMap,
io::{BufRead, Stdout, Write}, io::{BufRead, Stdout, Write},
sync::{ sync::{
@ -20,15 +19,16 @@ pub type DynNode = Arc<Mutex<dyn Node>>;
pub type Handler = Box<dyn FnMut(&Message)>; pub type Handler = Box<dyn FnMut(&Message)>;
pub trait Node { pub trait Node {
fn handle(&mut self, runner: &Runner, msg: &Message); fn handle(&mut self, runner: &Runner, msg: Message);
} }
#[derive(Clone)]
pub struct Runner { pub struct Runner {
msg_id: AtomicU64, msg_id: Arc<AtomicU64>,
node: DynNode, node: DynNode,
node_id: OnceLock<String>, node_id: OnceLock<String>,
nodes: OnceLock<Vec<String>>, nodes: OnceLock<Vec<String>>,
steps: AtomicUsize, steps: Arc<AtomicUsize>,
output: Arc<Mutex<Stdout>>, output: Arc<Mutex<Stdout>>,
handlers: Arc<Mutex<HashMap<u64, Handler>>>, handlers: Arc<Mutex<HashMap<u64, Handler>>>,
} }
@ -37,10 +37,10 @@ impl Runner {
pub fn new(node: DynNode, output: Stdout) -> Self { pub fn new(node: DynNode, output: Stdout) -> Self {
Runner { Runner {
node, node,
msg_id: AtomicU64::new(1), msg_id: Arc::new(AtomicU64::new(1)),
nodes: OnceLock::new(), nodes: OnceLock::new(),
node_id: OnceLock::new(), node_id: OnceLock::new(),
steps: AtomicUsize::new(0), steps: Arc::new(AtomicUsize::new(0)),
output: Arc::new(Mutex::new(output)), output: Arc::new(Mutex::new(output)),
handlers: Default::default(), handlers: Default::default(),
} }
@ -57,12 +57,12 @@ impl Runner {
let irt = msg.body.in_reply_to; let irt = msg.body.in_reply_to;
{ {
let mut g = self.handlers.lock().unwrap(); let mut g = self.handlers.lock().unwrap();
if let Some(mut h) = g.remove(&irt) { if let Some(h) = g.remove(&irt) {
h(&msg); h(msg.clone());
} }
} }
let mut n = self.node.lock().unwrap(); let mut n = self.node.lock().unwrap();
n.handle(self, &msg); n.handle(self, msg);
} }
} }
} }
@ -125,10 +125,10 @@ impl Runner {
pub fn reply(&self, req: &Message, body: Body) { pub fn reply(&self, req: &Message, body: Body) {
let mut body = body; let mut body = body;
let dest = req.src.clone(); let dest = req.src.as_str();
let in_reply_to = req.body.msg_id; let in_reply_to = req.body.msg_id;
body.in_reply_to = in_reply_to; body.in_reply_to = in_reply_to;
self.send(&dest, body); self.send(dest, body);
} }
pub fn send(&self, dest: &str, body: Body) { pub fn send(&self, dest: &str, body: Body) {