flatten runner code even more

This commit is contained in:
Joe Ardent 2024-06-04 16:00:30 -07:00
parent 23b95677bf
commit af6d2c0b27
2 changed files with 23 additions and 26 deletions

View file

@ -44,8 +44,8 @@ impl Node for Counter {
let v = kv.read(runner, KEY).unwrap().unwrap(); let v = kv.read(runner, KEY).unwrap().unwrap();
if kv.cas(runner, KEY, v.clone(), v.clone(), false).is_ok() { if kv.cas(runner, KEY, v.clone(), v.clone(), false).is_ok() {
rounds += 1; rounds += 1;
val = v;
} }
val = v;
} }
let body = Body::from_type("read_ok").with_payload(mk_payload(&[("value", val)])); let body = Body::from_type("read_ok").with_payload(mk_payload(&[("value", val)]));

View file

@ -50,9 +50,6 @@ impl Runner {
} }
pub fn run(&self, on_init: Option<OnInit>) { pub fn run(&self, on_init: Option<OnInit>) {
let (outbound_tx, outbound_rx) = channel();
let _ = self.outbound_tx.get_or_init(|| outbound_tx);
let (stdin_tx, stdin_rx) = channel(); let (stdin_tx, stdin_rx) = channel();
thread::spawn(move || { thread::spawn(move || {
let stdin = std::io::stdin().lock().lines(); let stdin = std::io::stdin().lock().lines();
@ -69,8 +66,16 @@ impl Runner {
} }
}); });
self.run_output(stdout_tx, outbound_rx); let (outbound_tx, outbound_rx) = channel();
self.run_input(stdin_rx, on_init); let _ = self.outbound_tx.get_or_init(|| outbound_tx);
thread::spawn(move || {
while let Ok(msg) = outbound_rx.recv() {
let msg = serde_json::to_string(&msg).unwrap();
stdout_tx.send(msg).unwrap();
}
});
self.process_input(stdin_rx, on_init);
} }
pub fn get_backdoor(&self) -> Sender<Message> { pub fn get_backdoor(&self) -> Sender<Message> {
@ -144,7 +149,7 @@ impl Runner {
let _ = self.nodes.get_or_init(|| nodes); let _ = self.nodes.get_or_init(|| nodes);
} }
fn run_input(&self, stdin_rx: Receiver<String>, on_init: Option<OnInit>) { fn process_input(&self, stdin_rx: Receiver<String>, on_init: Option<OnInit>) {
let (json_tx, json_rx) = channel(); let (json_tx, json_rx) = channel();
let _ = self.backdoor.get_or_init(|| json_tx.clone()); let _ = self.backdoor.get_or_init(|| json_tx.clone());
let proms = self.promises.clone(); let proms = self.promises.clone();
@ -160,28 +165,20 @@ impl Runner {
} }
}); });
for msg in json_rx { let msg = json_rx.recv().unwrap();
if msg.body.typ.as_str() == "init" { {
self.init(&msg); self.init(&msg);
let body = Body::from_type("init_ok"); let body = Body::from_type("init_ok");
self.reply(&msg, body); self.reply(&msg, body);
if let Some(ref on_init) = on_init { if let Some(on_init) = on_init {
on_init(self); on_init(self);
}
} else {
let mut node = self.node.lock().unwrap();
node.handle(self, msg);
} }
} }
}
fn run_output(&self, stdout_tx: Sender<String>, node_output_rx: Receiver<Message>) { let mut node = self.node.lock().unwrap();
thread::spawn(move || { for msg in json_rx {
while let Ok(msg) = node_output_rx.recv() { node.handle(self, msg);
let msg = serde_json::to_string(&msg).unwrap(); }
stdout_tx.send(msg).unwrap();
}
});
} }
fn mk_msg(&self, dest: &str, body: Body) -> Message { fn mk_msg(&self, dest: &str, body: Body) -> Message {