remove Network struct
This commit is contained in:
parent
8109448dac
commit
e92af7cf92
6 changed files with 28 additions and 75 deletions
|
@ -1,6 +1,5 @@
|
||||||
use std::{
|
use std::{
|
||||||
collections::{HashMap, HashSet},
|
collections::{HashMap, HashSet},
|
||||||
sync::{Arc, Mutex},
|
|
||||||
thread,
|
thread,
|
||||||
time::Duration,
|
time::Duration,
|
||||||
};
|
};
|
||||||
|
@ -10,7 +9,6 @@ use rand::Rng;
|
||||||
|
|
||||||
fn main() {
|
fn main() {
|
||||||
let node = BCaster::default();
|
let node = BCaster::default();
|
||||||
let node = Arc::new(Mutex::new(node));
|
|
||||||
|
|
||||||
let runner = Runner::new(node);
|
let runner = Runner::new(node);
|
||||||
|
|
||||||
|
|
|
@ -1,5 +1,3 @@
|
||||||
use std::sync::{Arc, Mutex};
|
|
||||||
|
|
||||||
use nebkor_maelstrom::{Body, Message, Node, Runner};
|
use nebkor_maelstrom::{Body, Message, Node, Runner};
|
||||||
|
|
||||||
struct Echo;
|
struct Echo;
|
||||||
|
@ -17,8 +15,6 @@ impl Node for Echo {
|
||||||
fn main() {
|
fn main() {
|
||||||
let node = Echo;
|
let node = Echo;
|
||||||
|
|
||||||
let node = Arc::new(Mutex::new(node));
|
|
||||||
|
|
||||||
let runner = Runner::new(node);
|
let runner = Runner::new(node);
|
||||||
|
|
||||||
runner.run(None);
|
runner.run(None);
|
||||||
|
|
|
@ -1,5 +1,3 @@
|
||||||
use std::sync::{Arc, Mutex};
|
|
||||||
|
|
||||||
use nebkor_maelstrom::{mk_payload, Body, Message, Node, Runner};
|
use nebkor_maelstrom::{mk_payload, Body, Message, Node, Runner};
|
||||||
|
|
||||||
const KEY: &str = "COUNTER";
|
const KEY: &str = "COUNTER";
|
||||||
|
@ -9,7 +7,6 @@ struct Counter;
|
||||||
|
|
||||||
fn main() {
|
fn main() {
|
||||||
let node = Counter;
|
let node = Counter;
|
||||||
let node = Arc::new(Mutex::new(node));
|
|
||||||
|
|
||||||
let runner = Runner::new(node);
|
let runner = Runner::new(node);
|
||||||
|
|
||||||
|
|
|
@ -1,10 +1,7 @@
|
||||||
use std::sync::{Arc, Mutex};
|
|
||||||
|
|
||||||
use nebkor_maelstrom::{protocol::Payload, Body, Message, Node, Runner};
|
use nebkor_maelstrom::{protocol::Payload, Body, Message, Node, Runner};
|
||||||
|
|
||||||
fn main() {
|
fn main() {
|
||||||
let node = GenUid;
|
let node = GenUid;
|
||||||
let node = Arc::new(Mutex::new(node));
|
|
||||||
|
|
||||||
let runner = Runner::new(node);
|
let runner = Runner::new(node);
|
||||||
|
|
||||||
|
|
|
@ -11,8 +11,6 @@ distributed actors. It has three dependencies:
|
||||||
For a simple example, see the [gg-echo](https://git.kittencollective.com/nebkor/chatty-catties/src/branch/main/gg-echo/src/main.rs) program:
|
For a simple example, see the [gg-echo](https://git.kittencollective.com/nebkor/chatty-catties/src/branch/main/gg-echo/src/main.rs) program:
|
||||||
|
|
||||||
``` rust
|
``` rust
|
||||||
use std::sync::{Arc, Mutex};
|
|
||||||
|
|
||||||
use nebkor_maelstrom::{Body, Message, Node, Runner};
|
use nebkor_maelstrom::{Body, Message, Node, Runner};
|
||||||
|
|
||||||
struct Echo;
|
struct Echo;
|
||||||
|
@ -30,8 +28,6 @@ impl Node for Echo {
|
||||||
fn main() {
|
fn main() {
|
||||||
let node = Echo;
|
let node = Echo;
|
||||||
|
|
||||||
let node = Arc::new(Mutex::new(node));
|
|
||||||
|
|
||||||
let runner = Runner::new(node);
|
let runner = Runner::new(node);
|
||||||
|
|
||||||
runner.run(None);
|
runner.run(None);
|
||||||
|
@ -44,9 +40,9 @@ Create a struct and implement `nebkor_maelstrom::Node` for it, which involves a
|
||||||
`handle(&mut self, &Runner, Message)`. This method is passed a `Runner` which contains methods like
|
`handle(&mut self, &Runner, Message)`. This method is passed a `Runner` which contains methods like
|
||||||
`send`, `reply`, and `rpc`.
|
`send`, `reply`, and `rpc`.
|
||||||
|
|
||||||
In your main function, instantiate that struct and wrap it in an `Arc<Mutex<>>`, then pass that into
|
In your main function, instantiate that struct and pass that into `Runner::new()` to get a
|
||||||
`Runner::new()` to get a Runner. The `run()` method takes an optional closure that will be run when
|
Runner. The `run()` method takes an optional closure that will be run when the `init` Message is
|
||||||
the `init` Message is received; see the
|
received; see the
|
||||||
[broadcast](https://git.kittencollective.com/nebkor/chatty-catties/src/commit/fff7fdc2d52f7d4c2d4d9c581ea16cdf0e1e3f30/gg-broadcast/src/main.rs#L18-L33)
|
[broadcast](https://git.kittencollective.com/nebkor/chatty-catties/src/commit/fff7fdc2d52f7d4c2d4d9c581ea16cdf0e1e3f30/gg-broadcast/src/main.rs#L18-L33)
|
||||||
program for an example of that, where it spawns a thread to send periodic messages to the node.
|
program for an example of that, where it spawns a thread to send periodic messages to the node.
|
||||||
|
|
||||||
|
|
|
@ -25,61 +25,33 @@ pub trait Node {
|
||||||
fn handle(&mut self, runner: &Runner, msg: Message);
|
fn handle(&mut self, runner: &Runner, msg: Message);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
|
||||||
pub struct Network {
|
|
||||||
promises: Arc<Mutex<HashMap<u64, Sender<Message>>>>,
|
|
||||||
node_output_tx: Sender<Message>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Network {
|
|
||||||
pub fn new() -> (Self, Receiver<Message>) {
|
|
||||||
let (node_output_tx, node_output_rx) = channel();
|
|
||||||
let net = Self {
|
|
||||||
node_output_tx,
|
|
||||||
promises: Arc::new(Mutex::new(HashMap::default())),
|
|
||||||
};
|
|
||||||
(net, node_output_rx)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn send(&self, msg: Message) {
|
|
||||||
self.node_output_tx.send(msg).unwrap();
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn rpc(&self, msg: Message) -> RpcPromise {
|
|
||||||
let (tx, rx) = channel();
|
|
||||||
{
|
|
||||||
let msg_id = msg.body.msg_id;
|
|
||||||
let mut g = self.promises.lock().unwrap();
|
|
||||||
g.insert(msg_id, tx);
|
|
||||||
}
|
|
||||||
self.send(msg);
|
|
||||||
rx
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct Runner {
|
pub struct Runner {
|
||||||
node: NodeyNodeFace,
|
node: NodeyNodeFace,
|
||||||
node_id: OnceLock<String>,
|
node_id: OnceLock<String>,
|
||||||
nodes: OnceLock<Vec<String>>,
|
nodes: OnceLock<Vec<String>>,
|
||||||
network: OnceLock<Network>,
|
|
||||||
backdoor: OnceLock<Sender<Message>>,
|
backdoor: OnceLock<Sender<Message>>,
|
||||||
|
promises: Arc<Mutex<HashMap<u64, Sender<Message>>>>,
|
||||||
|
outbound_tx: OnceLock<Sender<Message>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Runner {
|
impl Runner {
|
||||||
pub fn new(node: NodeyNodeFace) -> Self {
|
pub fn new<N: Node + 'static>(node: N) -> Self {
|
||||||
|
let node = Arc::new(Mutex::new(node));
|
||||||
Runner {
|
Runner {
|
||||||
node,
|
node,
|
||||||
nodes: OnceLock::new(),
|
nodes: OnceLock::new(),
|
||||||
node_id: OnceLock::new(),
|
node_id: OnceLock::new(),
|
||||||
network: OnceLock::new(),
|
|
||||||
backdoor: OnceLock::new(),
|
backdoor: OnceLock::new(),
|
||||||
|
outbound_tx: OnceLock::new(),
|
||||||
|
promises: Default::default(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn run(&self, on_init: Option<OnInit>) {
|
pub fn run(&self, on_init: Option<OnInit>) {
|
||||||
let (stdin_tx, stdin_rx) = channel();
|
let (outbound_tx, outbound_rx) = channel();
|
||||||
let (stdout_tx, stdout_rx) = channel();
|
let _ = self.outbound_tx.get_or_init(|| outbound_tx);
|
||||||
|
|
||||||
|
let (stdin_tx, stdin_rx) = channel();
|
||||||
thread::spawn(move || {
|
thread::spawn(move || {
|
||||||
let stdin = std::io::stdin().lock().lines();
|
let stdin = std::io::stdin().lock().lines();
|
||||||
for line in stdin {
|
for line in stdin {
|
||||||
|
@ -87,6 +59,7 @@ impl Runner {
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
let (stdout_tx, stdout_rx) = channel();
|
||||||
thread::spawn(move || {
|
thread::spawn(move || {
|
||||||
let mut stdout = std::io::stdout().lock();
|
let mut stdout = std::io::stdout().lock();
|
||||||
for msg in stdout_rx {
|
for msg in stdout_rx {
|
||||||
|
@ -94,30 +67,19 @@ impl Runner {
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
self.run_internal(stdout_tx, stdin_rx, on_init);
|
self.run_output(stdout_tx, outbound_rx);
|
||||||
|
self.run_input(stdin_rx, on_init);
|
||||||
}
|
}
|
||||||
|
|
||||||
fn run_internal(
|
fn run_input(&self, stdin_rx: Receiver<String>, on_init: Option<OnInit>) {
|
||||||
&self,
|
|
||||||
stdout_tx: Sender<String>,
|
|
||||||
stdin_rx: Receiver<String>,
|
|
||||||
on_init: Option<OnInit>,
|
|
||||||
) {
|
|
||||||
let (network, node_receiver) = Network::new();
|
|
||||||
let _ = self.network.get_or_init(|| network.clone());
|
|
||||||
self.run_output(stdout_tx, node_receiver);
|
|
||||||
self.run_input(stdin_rx, network, on_init);
|
|
||||||
}
|
|
||||||
|
|
||||||
fn run_input(&self, stdin_rx: Receiver<String>, network: Network, on_init: Option<OnInit>) {
|
|
||||||
let (json_tx, json_rx) = channel();
|
let (json_tx, json_rx) = channel();
|
||||||
let _ = self.backdoor.get_or_init(|| json_tx.clone());
|
let _ = self.backdoor.get_or_init(|| json_tx.clone());
|
||||||
|
let proms = self.promises.clone();
|
||||||
thread::spawn(move || {
|
thread::spawn(move || {
|
||||||
for line in stdin_rx {
|
for line in stdin_rx {
|
||||||
let msg: Message = serde_json::from_str(&line).unwrap();
|
let msg: Message = serde_json::from_str(&line).unwrap();
|
||||||
let irt = msg.body.in_reply_to;
|
let irt = msg.body.in_reply_to;
|
||||||
if let Some(tx) = network.promises.lock().unwrap().remove(&irt) {
|
if let Some(tx) = proms.lock().unwrap().remove(&irt) {
|
||||||
tx.send(msg).unwrap();
|
tx.send(msg).unwrap();
|
||||||
} else {
|
} else {
|
||||||
json_tx.send(msg).unwrap();
|
json_tx.send(msg).unwrap();
|
||||||
|
@ -126,7 +88,6 @@ impl Runner {
|
||||||
});
|
});
|
||||||
|
|
||||||
for msg in json_rx {
|
for msg in json_rx {
|
||||||
let mut node = self.node.lock().unwrap();
|
|
||||||
if msg.body.typ.as_str() == "init" {
|
if msg.body.typ.as_str() == "init" {
|
||||||
self.init(&msg);
|
self.init(&msg);
|
||||||
let body = Body::from_type("init_ok");
|
let body = Body::from_type("init_ok");
|
||||||
|
@ -135,6 +96,7 @@ impl Runner {
|
||||||
on_init(self);
|
on_init(self);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
let mut node = self.node.lock().unwrap();
|
||||||
node.handle(self, msg);
|
node.handle(self, msg);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -212,7 +174,7 @@ impl Runner {
|
||||||
dest: dest.to_string(),
|
dest: dest.to_string(),
|
||||||
body,
|
body,
|
||||||
};
|
};
|
||||||
self.network.get().unwrap().send(msg);
|
self.outbound_tx.get().unwrap().send(msg).unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn rpc(&self, dest: &str, body: Body) -> RpcPromise {
|
pub fn rpc(&self, dest: &str, body: Body) -> RpcPromise {
|
||||||
|
@ -225,7 +187,14 @@ impl Runner {
|
||||||
dest: dest.to_string(),
|
dest: dest.to_string(),
|
||||||
body,
|
body,
|
||||||
};
|
};
|
||||||
self.network.get().unwrap().rpc(msg)
|
let (tx, rx) = channel();
|
||||||
|
{
|
||||||
|
let msg_id = msg.body.msg_id;
|
||||||
|
let mut g = self.promises.lock().unwrap();
|
||||||
|
g.insert(msg_id, tx);
|
||||||
|
}
|
||||||
|
self.outbound_tx.get().unwrap().send(msg).unwrap();
|
||||||
|
rx
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue