framework works
This commit is contained in:
parent
6f45521660
commit
2793508a93
3 changed files with 139 additions and 74 deletions
|
@ -19,7 +19,7 @@ fn main() {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn on_init(runner: &Runner) {
|
fn on_init(runner: &Runner) {
|
||||||
let tx = runner.get_input();
|
let tx = runner.get_backdoor();
|
||||||
thread::spawn(move || loop {
|
thread::spawn(move || loop {
|
||||||
let millis = rand::thread_rng().gen_range(400..=800);
|
let millis = rand::thread_rng().gen_range(400..=800);
|
||||||
thread::sleep(Duration::from_millis(millis));
|
thread::sleep(Duration::from_millis(millis));
|
||||||
|
|
|
@ -2,6 +2,11 @@ use std::sync::{Arc, Mutex};
|
||||||
|
|
||||||
use nebkor_maelstrom::{mk_payload, Body, Message, Node, Runner};
|
use nebkor_maelstrom::{mk_payload, Body, Message, Node, Runner};
|
||||||
|
|
||||||
|
const KEY: &str = "COUNTER";
|
||||||
|
|
||||||
|
#[derive(Clone, Default)]
|
||||||
|
struct Counter;
|
||||||
|
|
||||||
fn main() {
|
fn main() {
|
||||||
let node = Counter;
|
let node = Counter;
|
||||||
let node = Arc::new(Mutex::new(node));
|
let node = Arc::new(Mutex::new(node));
|
||||||
|
@ -24,19 +29,38 @@ fn main() {
|
||||||
runner.run(Some(on_init));
|
runner.run(Some(on_init));
|
||||||
}
|
}
|
||||||
|
|
||||||
const KEY: &str = "COUNTER";
|
|
||||||
|
|
||||||
#[derive(Clone, Default)]
|
|
||||||
struct Counter;
|
|
||||||
|
|
||||||
impl Node for Counter {
|
impl Node for Counter {
|
||||||
fn handle<'slf>(&'slf mut self, runner: &'slf Runner, req: Message) {
|
fn handle<'slf>(&'slf mut self, runner: &'slf Runner, req: Message) {
|
||||||
|
let read_payload = mk_payload(&[("key", KEY.into())]);
|
||||||
|
let read_body = Body::from_type("read").with_payload(read_payload);
|
||||||
let typ = req.body.typ.as_str();
|
let typ = req.body.typ.as_str();
|
||||||
let frm = req.src.clone();
|
//let frm = req.src.clone();
|
||||||
let msg_id = req.body.msg_id.to_owned();
|
//let msg_id = req.body.msg_id.to_owned();
|
||||||
match typ {
|
match typ {
|
||||||
"add" => {
|
"add" => {
|
||||||
let delta = req.body.payload.get("delta").unwrap().as_i64().unwrap();
|
let delta = req.body.payload.get("delta").unwrap().as_i64().unwrap();
|
||||||
|
let cur = runner
|
||||||
|
.rpc("seq-kv", read_body)
|
||||||
|
.recv()
|
||||||
|
.unwrap()
|
||||||
|
.body
|
||||||
|
.payload
|
||||||
|
.get("value")
|
||||||
|
.cloned()
|
||||||
|
.unwrap()
|
||||||
|
.as_i64()
|
||||||
|
.unwrap();
|
||||||
|
let cas_payload = mk_payload(&[
|
||||||
|
("key", KEY.into()),
|
||||||
|
("from", cur.into()),
|
||||||
|
("to", (cur + delta).into()),
|
||||||
|
]);
|
||||||
|
// ERRORS BE HERE
|
||||||
|
runner
|
||||||
|
.rpc("seq-kv", Body::from_type("cas").with_payload(cas_payload))
|
||||||
|
.recv()
|
||||||
|
.unwrap();
|
||||||
|
runner.reply(&req, Body::from_type("add_ok"));
|
||||||
}
|
}
|
||||||
"read" => {
|
"read" => {
|
||||||
let payload = mk_payload(&[("key", KEY.into())]);
|
let payload = mk_payload(&[("key", KEY.into())]);
|
||||||
|
|
|
@ -29,15 +29,45 @@ pub trait Node {
|
||||||
fn handle(&mut self, runner: &Runner, msg: Message);
|
fn handle(&mut self, runner: &Runner, msg: Message);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct Network {
|
||||||
|
promises: Arc<Mutex<HashMap<u64, Sender<Message>>>>,
|
||||||
|
node_output_tx: Sender<Message>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Network {
|
||||||
|
pub fn new() -> (Self, Receiver<Message>) {
|
||||||
|
let (node_output_tx, node_output_rx) = channel();
|
||||||
|
let net = Self {
|
||||||
|
node_output_tx,
|
||||||
|
promises: Arc::new(Mutex::new(HashMap::default())),
|
||||||
|
};
|
||||||
|
(net, node_output_rx)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn send(&self, msg: Message) {
|
||||||
|
self.node_output_tx.send(msg).unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn rpc(&self, msg: Message) -> RpcPromise {
|
||||||
|
let (tx, rx) = channel();
|
||||||
|
{
|
||||||
|
let msg_id = msg.body.msg_id;
|
||||||
|
let mut g = self.promises.lock().unwrap();
|
||||||
|
g.insert(msg_id, tx);
|
||||||
|
}
|
||||||
|
self.send(msg);
|
||||||
|
rx
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub struct Runner {
|
pub struct Runner {
|
||||||
node: DynNode,
|
node: DynNode,
|
||||||
node_id: OnceLock<String>,
|
node_id: OnceLock<String>,
|
||||||
nodes: OnceLock<Vec<String>>,
|
nodes: OnceLock<Vec<String>>,
|
||||||
|
network: OnceLock<Network>,
|
||||||
|
backdoor: OnceLock<Sender<Message>>,
|
||||||
steps: Arc<AtomicUsize>,
|
steps: Arc<AtomicUsize>,
|
||||||
promises: Arc<Mutex<HashMap<u64, Sender<Message>>>>,
|
|
||||||
input: OnceLock<Sender<Message>>,
|
|
||||||
output: OnceLock<Sender<Message>>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Runner {
|
impl Runner {
|
||||||
|
@ -46,24 +76,64 @@ impl Runner {
|
||||||
node,
|
node,
|
||||||
nodes: OnceLock::new(),
|
nodes: OnceLock::new(),
|
||||||
node_id: OnceLock::new(),
|
node_id: OnceLock::new(),
|
||||||
|
network: OnceLock::new(),
|
||||||
|
backdoor: OnceLock::new(),
|
||||||
steps: Arc::new(AtomicUsize::new(0)),
|
steps: Arc::new(AtomicUsize::new(0)),
|
||||||
promises: Default::default(),
|
|
||||||
input: OnceLock::new(),
|
|
||||||
output: OnceLock::new(),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn run(&self, on_init: Option<OnInit>) {
|
pub fn run(&self, on_init: Option<OnInit>) {
|
||||||
let (stdin_tx, stdin_rx) = run_stdin();
|
let (stdin_tx, stdin_rx) = channel();
|
||||||
let _ = self.input.get_or_init(|| stdin_tx);
|
|
||||||
|
|
||||||
let (stdout_tx, stdout_rx) = channel();
|
let (stdout_tx, stdout_rx) = channel();
|
||||||
let _ = self.output.get_or_init(|| stdout_tx);
|
|
||||||
run_stdout(stdout_rx);
|
|
||||||
|
|
||||||
for msg in stdin_rx {
|
thread::spawn(move || {
|
||||||
let typ = &msg.body.typ;
|
let stdin = std::io::stdin().lock().lines();
|
||||||
if let "init" = typ.as_str() {
|
for line in stdin {
|
||||||
|
stdin_tx.send(line.unwrap()).unwrap();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
thread::spawn(move || {
|
||||||
|
let mut stdout = std::io::stdout().lock();
|
||||||
|
for msg in stdout_rx {
|
||||||
|
writeln!(&mut stdout, "{msg}").unwrap();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
self.run_internal(stdout_tx, stdin_rx, on_init);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn run_internal(
|
||||||
|
&self,
|
||||||
|
stdout_tx: Sender<String>,
|
||||||
|
stdin_rx: Receiver<String>,
|
||||||
|
on_init: Option<OnInit>,
|
||||||
|
) {
|
||||||
|
let (network, node_receiver) = Network::new();
|
||||||
|
let _ = self.network.get_or_init(|| network.clone());
|
||||||
|
self.run_output(stdout_tx, node_receiver);
|
||||||
|
self.run_input(stdin_rx, network, on_init);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn run_input(&self, stdin_rx: Receiver<String>, network: Network, on_init: Option<OnInit>) {
|
||||||
|
let (json_tx, json_rx) = channel();
|
||||||
|
let _ = self.backdoor.get_or_init(|| json_tx.clone());
|
||||||
|
|
||||||
|
thread::spawn(move || {
|
||||||
|
for line in stdin_rx {
|
||||||
|
let msg: Message = serde_json::from_str(&line).unwrap();
|
||||||
|
let irt = msg.body.in_reply_to;
|
||||||
|
if let Some(tx) = network.promises.lock().unwrap().remove(&irt) {
|
||||||
|
tx.send(msg).unwrap();
|
||||||
|
} else {
|
||||||
|
json_tx.send(msg).unwrap();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
for msg in json_rx {
|
||||||
|
let mut node = self.node.lock().unwrap();
|
||||||
|
if msg.body.typ.as_str() == "init" {
|
||||||
self.init(&msg);
|
self.init(&msg);
|
||||||
let body = Body::from_type("init_ok");
|
let body = Body::from_type("init_ok");
|
||||||
self.reply(&msg, body);
|
self.reply(&msg, body);
|
||||||
|
@ -71,35 +141,22 @@ impl Runner {
|
||||||
on_init(self);
|
on_init(self);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
let irt = msg.body.in_reply_to;
|
node.handle(self, msg);
|
||||||
{
|
|
||||||
let mut g = self.promises.lock().unwrap();
|
|
||||||
if let Some(h) = g.remove(&irt) {
|
|
||||||
h.send(msg.clone()).unwrap();
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
let mut n = self.node.lock().unwrap();
|
|
||||||
n.handle(self, msg);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn rpc(&self, dest: &str, body: Body) -> RpcPromise {
|
fn run_output(&self, stdout_tx: Sender<String>, node_output_rx: Receiver<Message>) {
|
||||||
let mut body = body;
|
thread::spawn(move || {
|
||||||
let msg_id = self.next_msg_id();
|
while let Ok(msg) = node_output_rx.recv() {
|
||||||
body.msg_id = msg_id;
|
let msg = serde_json::to_string(&msg).unwrap();
|
||||||
let (tx, rx) = channel();
|
stdout_tx.send(msg).unwrap();
|
||||||
{
|
|
||||||
let mut g = self.promises.lock().unwrap();
|
|
||||||
g.insert(msg_id, tx);
|
|
||||||
}
|
}
|
||||||
self.send(dest, body);
|
});
|
||||||
rx
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn get_input(&self) -> Sender<Message> {
|
pub fn get_backdoor(&self) -> Sender<Message> {
|
||||||
self.input.get().cloned().unwrap()
|
self.backdoor.get().unwrap().clone()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn node_id(&self) -> String {
|
pub fn node_id(&self) -> String {
|
||||||
|
@ -165,37 +222,21 @@ impl Runner {
|
||||||
dest: dest.to_string(),
|
dest: dest.to_string(),
|
||||||
body,
|
body,
|
||||||
};
|
};
|
||||||
self.output.get().unwrap().send(msg).unwrap();
|
self.network.get().unwrap().send(msg);
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Feeds lines from stdin to the MPSC Sender, so that the Receiver can be used
|
pub fn rpc(&self, dest: &str, body: Body) -> RpcPromise {
|
||||||
/// in the Runner::run() method. Clone the Sender if you want to inject messages
|
let mut body = body;
|
||||||
/// into the Runner. Join the handle after `run()`.
|
if body.msg_id == 0 {
|
||||||
pub fn run_stdin() -> (Sender<Message>, Receiver<Message>) {
|
body.msg_id = self.next_msg_id();
|
||||||
let stdin = std::io::stdin();
|
|
||||||
let (tx, rx) = channel();
|
|
||||||
let xtra_input = tx.clone();
|
|
||||||
|
|
||||||
thread::spawn(move || {
|
|
||||||
let g = stdin.lock();
|
|
||||||
for line in g.lines().map_while(std::result::Result::ok) {
|
|
||||||
if let Ok(msg) = serde_json::from_str(&line) {
|
|
||||||
tx.send(msg).unwrap();
|
|
||||||
}
|
}
|
||||||
|
let msg = Message {
|
||||||
|
src: self.node_id().to_string(),
|
||||||
|
dest: dest.to_string(),
|
||||||
|
body,
|
||||||
|
};
|
||||||
|
self.network.get().unwrap().rpc(msg)
|
||||||
}
|
}
|
||||||
});
|
|
||||||
|
|
||||||
(xtra_input, rx)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn run_stdout(rx: Receiver<Message>) {
|
|
||||||
thread::spawn(move || {
|
|
||||||
let mut stdout = std::io::stdout().lock();
|
|
||||||
while let Ok(msg) = rx.recv() {
|
|
||||||
writeln!(stdout, "{}", serde_json::to_string(&msg).unwrap()).unwrap();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn mk_payload(payload: &[(&str, Value)]) -> Payload {
|
pub fn mk_payload(payload: &[(&str, Value)]) -> Payload {
|
||||||
|
|
Loading…
Reference in a new issue