new IO works for old challenges.

This commit is contained in:
Joe Ardent 2024-06-02 11:13:31 -07:00
parent e6d76ba37c
commit 6f45521660
5 changed files with 81 additions and 146 deletions

View File

@ -1,7 +1,6 @@
use std::{ use std::{
collections::{HashMap, HashSet}, collections::{HashMap, HashSet},
io::BufRead, sync::{Arc, Mutex},
sync::{mpsc::channel, Arc, Mutex},
thread, thread,
time::Duration, time::Duration,
}; };
@ -10,28 +9,18 @@ use nebkor_maelstrom::{protocol::Payload, Body, Message, Node, Runner};
use rand::Rng; use rand::Rng;
fn main() { fn main() {
let out = std::io::stdout();
let std_in = Arc::new(std::io::stdin());
let (tx, rx) = channel();
let input = tx.clone();
let i = thread::spawn(move || {
let g = std_in.lock();
for line in g.lines().map_while(Result::ok) {
if let Ok(msg) = serde_json::from_str::<Message>(&line) {
input.send(msg).unwrap();
}
}
});
let node = BCaster::default(); let node = BCaster::default();
let node = Arc::new(Mutex::new(node)); let node = Arc::new(Mutex::new(node));
let runner = Runner::new(node, out); let runner = Runner::new(node);
let runner = &runner; let runner = &runner;
let g = thread::spawn(move || loop { runner.run(Some(Box::new(on_init)));
}
fn on_init(runner: &Runner) {
let tx = runner.get_input();
thread::spawn(move || loop {
let millis = rand::thread_rng().gen_range(400..=800); let millis = rand::thread_rng().gen_range(400..=800);
thread::sleep(Duration::from_millis(millis)); thread::sleep(Duration::from_millis(millis));
let body = Body::from_type("do_gossip"); let body = Body::from_type("do_gossip");
@ -41,10 +30,6 @@ fn main() {
}; };
tx.send(msg).unwrap(); tx.send(msg).unwrap();
}); });
runner.run(rx, None);
let _ = i.join();
let _ = g.join();
} }
#[derive(Clone, Default)] #[derive(Clone, Default)]

View File

@ -1,8 +1,4 @@
use std::{ use std::sync::{Arc, Mutex};
io::BufRead,
sync::{mpsc::channel, Arc, Mutex},
thread,
};
use nebkor_maelstrom::{Body, Message, Node, Runner}; use nebkor_maelstrom::{Body, Message, Node, Runner};
@ -19,26 +15,11 @@ impl Node for Echo {
} }
fn main() { fn main() {
let out = std::io::stdout();
let std_in = Arc::new(std::io::stdin());
let (tx, rx) = channel();
let i = thread::spawn(move || {
let g = std_in.lock();
for line in g.lines().map_while(Result::ok) {
if let Ok(msg) = serde_json::from_str::<Message>(&line) {
tx.send(msg).unwrap();
}
}
});
let node = Echo; let node = Echo;
let node = Arc::new(Mutex::new(node)); let node = Arc::new(Mutex::new(node));
let runner = Runner::new(node, out); let runner = Runner::new(node);
runner.run(rx, None); runner.run(None);
i.join().unwrap();
} }

View File

@ -1,17 +1,12 @@
use std::{ use std::sync::{Arc, Mutex};
rc::Rc,
sync::{Arc, Mutex},
};
use nebkor_maelstrom::{mk_payload, mk_stdin, Body, Message, Node, Runner}; use nebkor_maelstrom::{mk_payload, Body, Message, Node, Runner};
fn main() { fn main() {
let node = Counter; let node = Counter;
let node = Arc::new(Mutex::new(node)); let node = Arc::new(Mutex::new(node));
let runner = Rc::new(Runner::new(node)); let runner = Runner::new(node);
let (i, _, rx) = mk_stdin();
let on_init = |rnr: &Runner| { let on_init = |rnr: &Runner| {
let payload = mk_payload(&[ let payload = mk_payload(&[
@ -26,8 +21,7 @@ fn main() {
let on_init = Box::new(on_init); let on_init = Box::new(on_init);
runner.run(rx, Some(on_init)); runner.run(Some(on_init));
i.join().unwrap();
} }
const KEY: &str = "COUNTER"; const KEY: &str = "COUNTER";
@ -36,52 +30,28 @@ const KEY: &str = "COUNTER";
struct Counter; struct Counter;
impl Node for Counter { impl Node for Counter {
fn handle<'slf>(&'slf mut self, runner: &'slf Rc<Runner>, req: Message) { fn handle<'slf>(&'slf mut self, runner: &'slf Runner, req: Message) {
let typ = req.body.typ.as_str(); let typ = req.body.typ.as_str();
let frm = req.src.clone(); let frm = req.src.clone();
let msg_id = req.body.msg_id.to_owned(); let msg_id = req.body.msg_id.to_owned();
match typ { match typ {
"add" => { "add" => {
let read_runner = runner.clone();
let cas_runner = runner.clone();
let add_req = req.clone();
let cas_handler = move |_msg: Message| {
let req = add_req;
cas_runner.reply(&req, Body::from_type("add_ok"));
};
let delta = req.body.payload.get("delta").unwrap().as_i64().unwrap(); let delta = req.body.payload.get("delta").unwrap().as_i64().unwrap();
let read_handler = move |msg: Message| {
let value = msg.body.payload.get("value").unwrap().as_i64().unwrap();
let payload = mk_payload(&[
("from", value.into()),
("to", (value + delta).into()),
("key", KEY.into()),
]);
let body = Body::from_type("cas").with_payload(payload);
read_runner.rpc("seq-kv", body, Box::new(cas_handler));
};
// kick it off by calling "read" on seq-kv:
let payload = mk_payload(&[("key", KEY.into())]);
let body = Body::from_type("read").with_payload(payload);
runner.rpc("seq-kv", body, Box::new(read_handler));
} }
"read" => { "read" => {
let rn = runner.clone();
let h = move |msg: Message| {
let src = frm.clone();
let value = msg.body.payload.get("value").unwrap().as_i64().unwrap();
let irt = msg_id;
let payload = mk_payload(&[("value", value.into())]);
let body = Body::from_type("read_ok")
.with_in_reply_to(irt)
.with_payload(payload);
rn.send(&src, body);
};
let payload = mk_payload(&[("key", KEY.into())]); let payload = mk_payload(&[("key", KEY.into())]);
let body = Body::from_type("read").with_payload(payload); let body = Body::from_type("read").with_payload(payload);
runner.rpc("seq-kv", body, Box::new(h)); let val = runner
.rpc("seq-kv", body)
.recv()
.unwrap()
.body
.payload
.get("value")
.cloned()
.unwrap();
let body = Body::from_type("read_ok").with_payload(mk_payload(&[("value", val)]));
runner.reply(&req, body);
} }
_ => { _ => {
eprintln!("unknown type: {req:?}"); eprintln!("unknown type: {req:?}");

View File

@ -1,33 +1,14 @@
use std::{ use std::sync::{Arc, Mutex};
io::BufRead,
sync::{mpsc::channel, Arc, Mutex},
thread,
};
use nebkor_maelstrom::{protocol::Payload, Body, Message, Node, Runner}; use nebkor_maelstrom::{protocol::Payload, Body, Message, Node, Runner};
fn main() { fn main() {
let out = std::io::stdout();
let std_in = Arc::new(std::io::stdin());
let (tx, rx) = channel();
let i = thread::spawn(move || {
let g = std_in.lock();
for line in g.lines().map_while(Result::ok) {
if let Ok(msg) = serde_json::from_str::<Message>(&line) {
tx.send(msg).unwrap();
}
}
});
let node = GenUid; let node = GenUid;
let node = Arc::new(Mutex::new(node)); let node = Arc::new(Mutex::new(node));
let runner = Runner::new(node, out); let runner = Runner::new(node);
runner.run(rx, None); runner.run(None);
i.join().unwrap();
} }
#[derive(Clone, Default)] #[derive(Clone, Default)]

View File

@ -1,13 +1,12 @@
use std::{ use std::{
collections::HashMap, collections::HashMap,
io::{BufRead, Stdout, Write}, io::{BufRead, Write},
rc::Rc,
sync::{ sync::{
atomic::{AtomicU64, AtomicUsize, Ordering}, atomic::{AtomicU64, AtomicUsize, Ordering},
mpsc::{channel, Receiver, Sender}, mpsc::{channel, Receiver, Sender},
Arc, LazyLock, Mutex, OnceLock, Arc, Mutex, OnceLock,
}, },
thread::{self, JoinHandle}, thread::{self},
}; };
pub mod protocol; pub mod protocol;
@ -18,16 +17,16 @@ use serde_json::Value;
pub mod kv; pub mod kv;
pub type DynNode = Arc<Mutex<dyn Node>>; pub type DynNode = Arc<Mutex<dyn Node>>;
pub type Handler = Box<dyn FnOnce(Message)>; // -> Result<Option<Value>>>;
pub type OnInit = Box<dyn Fn(&Runner)>; pub type OnInit = Box<dyn Fn(&Runner)>;
pub type Result<T> = std::result::Result<T, ErrorCode>; pub type Result<T> = std::result::Result<T, ErrorCode>;
pub type RpcPromise = Receiver<Message>;
static MSG_ID: AtomicU64 = AtomicU64::new(0); static MSG_ID: AtomicU64 = AtomicU64::new(0);
static OUTPUT: LazyLock<Stdout> = LazyLock::new(std::io::stdout);
pub trait Node { pub trait Node {
fn handle(&mut self, runner: &Rc<Runner>, msg: Message); fn handle(&mut self, runner: &Runner, msg: Message);
} }
#[derive(Clone)] #[derive(Clone)]
@ -36,7 +35,9 @@ pub struct Runner {
node_id: OnceLock<String>, node_id: OnceLock<String>,
nodes: OnceLock<Vec<String>>, nodes: OnceLock<Vec<String>>,
steps: Arc<AtomicUsize>, steps: Arc<AtomicUsize>,
handlers: Arc<Mutex<HashMap<u64, Handler>>>, promises: Arc<Mutex<HashMap<u64, Sender<Message>>>>,
input: OnceLock<Sender<Message>>,
output: OnceLock<Sender<Message>>,
} }
impl Runner { impl Runner {
@ -46,26 +47,36 @@ impl Runner {
nodes: OnceLock::new(), nodes: OnceLock::new(),
node_id: OnceLock::new(), node_id: OnceLock::new(),
steps: Arc::new(AtomicUsize::new(0)), steps: Arc::new(AtomicUsize::new(0)),
handlers: Default::default(), promises: Default::default(),
input: OnceLock::new(),
output: OnceLock::new(),
} }
} }
pub fn run(self: &Rc<Self>, input: Receiver<Message>, on_init: Option<OnInit>) { pub fn run(&self, on_init: Option<OnInit>) {
for msg in input.iter() { let (stdin_tx, stdin_rx) = run_stdin();
let _ = self.input.get_or_init(|| stdin_tx);
let (stdout_tx, stdout_rx) = channel();
let _ = self.output.get_or_init(|| stdout_tx);
run_stdout(stdout_rx);
for msg in stdin_rx {
let typ = &msg.body.typ; let typ = &msg.body.typ;
if let "init" = typ.as_str() { if let "init" = typ.as_str() {
self.init(&msg); self.init(&msg);
let body = Body::from_type("init_ok"); let body = Body::from_type("init_ok");
self.reply(&msg, body); self.reply(&msg, body);
if let Some(ref h) = on_init { if let Some(ref on_init) = on_init {
h(self); on_init(self);
} }
} else { } else {
let irt = msg.body.in_reply_to; let irt = msg.body.in_reply_to;
{ {
let mut g = self.handlers.lock().unwrap(); let mut g = self.promises.lock().unwrap();
if let Some(h) = g.remove(&irt) { if let Some(h) = g.remove(&irt) {
h(msg.clone()); h.send(msg.clone()).unwrap();
continue;
} }
} }
let mut n = self.node.lock().unwrap(); let mut n = self.node.lock().unwrap();
@ -74,15 +85,21 @@ impl Runner {
} }
} }
pub fn rpc(&self, dest: &str, body: Body, handler: Handler) { pub fn rpc(&self, dest: &str, body: Body) -> RpcPromise {
let mut body = body; let mut body = body;
let msg_id = self.next_msg_id(); let msg_id = self.next_msg_id();
body.msg_id = msg_id; body.msg_id = msg_id;
let (tx, rx) = channel();
{ {
let mut g = self.handlers.lock().unwrap(); let mut g = self.promises.lock().unwrap();
g.insert(msg_id, handler); g.insert(msg_id, tx);
} }
self.send(dest, body); self.send(dest, body);
rx
}
pub fn get_input(&self) -> Sender<Message> {
self.input.get().cloned().unwrap()
} }
pub fn node_id(&self) -> String { pub fn node_id(&self) -> String {
@ -124,10 +141,10 @@ impl Runner {
.unwrap() .unwrap()
.iter() .iter()
.map(|s| s.as_str().unwrap().to_string()) .map(|s| s.as_str().unwrap().to_string())
.collect::<Vec<_>>(); .collect();
let _ = self.node_id.get_or_init(|| node_id.to_owned()); let _ = self.node_id.get_or_init(|| node_id);
let _ = self.nodes.get_or_init(|| nodes.to_vec()); let _ = self.nodes.get_or_init(|| nodes);
} }
pub fn reply(&self, req: &Message, body: Body) { pub fn reply(&self, req: &Message, body: Body) {
@ -148,27 +165,19 @@ impl Runner {
dest: dest.to_string(), dest: dest.to_string(),
body, body,
}; };
let msg = serde_json::to_string(&msg).unwrap(); self.output.get().unwrap().send(msg).unwrap();
self.writeln(&msg);
}
fn writeln(&self, msg: &str) {
let mut out = OUTPUT.lock();
let msg = format!("{msg}\n");
out.write_all(msg.as_bytes()).unwrap();
out.flush().unwrap();
} }
} }
/// Feeds lines from stdin to the MPSC Sender, so that the Receiver can be used /// Feeds lines from stdin to the MPSC Sender, so that the Receiver can be used
/// in the Runner::run() method. Clone the Sender if you want to inject messages /// in the Runner::run() method. Clone the Sender if you want to inject messages
/// into the Runner. Join the handle after `run()`. /// into the Runner. Join the handle after `run()`.
pub fn mk_stdin() -> (JoinHandle<()>, Sender<Message>, Receiver<Message>) { pub fn run_stdin() -> (Sender<Message>, Receiver<Message>) {
let stdin = std::io::stdin(); let stdin = std::io::stdin();
let (tx, rx) = channel(); let (tx, rx) = channel();
let xtra_input = tx.clone(); let xtra_input = tx.clone();
let join = thread::spawn(move || { thread::spawn(move || {
let g = stdin.lock(); let g = stdin.lock();
for line in g.lines().map_while(std::result::Result::ok) { for line in g.lines().map_while(std::result::Result::ok) {
if let Ok(msg) = serde_json::from_str(&line) { if let Ok(msg) = serde_json::from_str(&line) {
@ -177,7 +186,16 @@ pub fn mk_stdin() -> (JoinHandle<()>, Sender<Message>, Receiver<Message>) {
} }
}); });
(join, xtra_input, rx) (xtra_input, rx)
}
fn run_stdout(rx: Receiver<Message>) {
thread::spawn(move || {
let mut stdout = std::io::stdout().lock();
while let Ok(msg) = rx.recv() {
writeln!(stdout, "{}", serde_json::to_string(&msg).unwrap()).unwrap();
}
});
} }
pub fn mk_payload(payload: &[(&str, Value)]) -> Payload { pub fn mk_payload(payload: &[(&str, Value)]) -> Payload {