Compare commits
3 commits
461087023d
...
e6d76ba37c
Author | SHA1 | Date | |
---|---|---|---|
|
e6d76ba37c | ||
|
c53edd921a | ||
|
8638ffe67f |
6 changed files with 94 additions and 53 deletions
|
@ -42,7 +42,7 @@ fn main() {
|
||||||
tx.send(msg).unwrap();
|
tx.send(msg).unwrap();
|
||||||
});
|
});
|
||||||
|
|
||||||
runner.run(rx);
|
runner.run(rx, None);
|
||||||
let _ = i.join();
|
let _ = i.join();
|
||||||
let _ = g.join();
|
let _ = g.join();
|
||||||
}
|
}
|
||||||
|
|
|
@ -39,6 +39,6 @@ fn main() {
|
||||||
|
|
||||||
let runner = Runner::new(node, out);
|
let runner = Runner::new(node, out);
|
||||||
|
|
||||||
runner.run(rx);
|
runner.run(rx, None);
|
||||||
i.join().unwrap();
|
i.join().unwrap();
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,32 +1,32 @@
|
||||||
use std::{
|
use std::{
|
||||||
|
rc::Rc,
|
||||||
sync::{Arc, Mutex},
|
sync::{Arc, Mutex},
|
||||||
thread,
|
|
||||||
time::Duration,
|
|
||||||
};
|
};
|
||||||
|
|
||||||
use nebkor_maelstrom::{mk_payload, mk_stdin, Body, Message, Node, Runner};
|
use nebkor_maelstrom::{mk_payload, mk_stdin, Body, Message, Node, Runner};
|
||||||
|
|
||||||
fn main() {
|
fn main() {
|
||||||
let out = std::io::stdout();
|
|
||||||
|
|
||||||
let node = Counter;
|
let node = Counter;
|
||||||
let node = Arc::new(Mutex::new(node));
|
let node = Arc::new(Mutex::new(node));
|
||||||
|
|
||||||
let runner = Runner::new(node, out);
|
let runner = Rc::new(Runner::new(node));
|
||||||
|
|
||||||
let (i, extra_input, rx) = mk_stdin();
|
let (i, _, rx) = mk_stdin();
|
||||||
let init = thread::spawn(move || {
|
|
||||||
thread::sleep(Duration::from_millis(10));
|
|
||||||
extra_input
|
|
||||||
.send(Message {
|
|
||||||
body: Body::from_type("kv_init"),
|
|
||||||
..Default::default()
|
|
||||||
})
|
|
||||||
.unwrap();
|
|
||||||
});
|
|
||||||
|
|
||||||
runner.run(rx);
|
let on_init = |rnr: &Runner| {
|
||||||
init.join().unwrap();
|
let payload = mk_payload(&[
|
||||||
|
("key", KEY.into()),
|
||||||
|
("from", 0i64.into()),
|
||||||
|
("to", 0i64.into()),
|
||||||
|
("create_if_not_exists", true.into()),
|
||||||
|
]);
|
||||||
|
let body = Body::from_type("cas").with_payload(payload);
|
||||||
|
rnr.send("seq-kv", body);
|
||||||
|
};
|
||||||
|
|
||||||
|
let on_init = Box::new(on_init);
|
||||||
|
|
||||||
|
runner.run(rx, Some(on_init));
|
||||||
i.join().unwrap();
|
i.join().unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -36,24 +36,36 @@ const KEY: &str = "COUNTER";
|
||||||
struct Counter;
|
struct Counter;
|
||||||
|
|
||||||
impl Node for Counter {
|
impl Node for Counter {
|
||||||
fn handle<'slf>(&'slf mut self, runner: &'slf Runner, req: Message) {
|
fn handle<'slf>(&'slf mut self, runner: &'slf Rc<Runner>, req: Message) {
|
||||||
let typ = req.body.typ.as_str();
|
let typ = req.body.typ.as_str();
|
||||||
let frm = req.src.clone();
|
let frm = req.src.clone();
|
||||||
let msg_id = req.body.msg_id.to_owned();
|
let msg_id = req.body.msg_id.to_owned();
|
||||||
|
|
||||||
match typ {
|
match typ {
|
||||||
"kv_init" => {
|
|
||||||
let payload = mk_payload(&[
|
|
||||||
("key", KEY.into()),
|
|
||||||
("from", 0i64.into()),
|
|
||||||
("to", 0i64.into()),
|
|
||||||
("create_if_not_exists", true.into()),
|
|
||||||
]);
|
|
||||||
let body = Body::from_type("cas").with_payload(payload);
|
|
||||||
runner.send("seq-kv", body);
|
|
||||||
}
|
|
||||||
"add" => {
|
"add" => {
|
||||||
runner.reply(&req, Body::from_type("add_ok"));
|
let read_runner = runner.clone();
|
||||||
|
let cas_runner = runner.clone();
|
||||||
|
let add_req = req.clone();
|
||||||
|
|
||||||
|
let cas_handler = move |_msg: Message| {
|
||||||
|
let req = add_req;
|
||||||
|
cas_runner.reply(&req, Body::from_type("add_ok"));
|
||||||
|
};
|
||||||
|
|
||||||
|
let delta = req.body.payload.get("delta").unwrap().as_i64().unwrap();
|
||||||
|
let read_handler = move |msg: Message| {
|
||||||
|
let value = msg.body.payload.get("value").unwrap().as_i64().unwrap();
|
||||||
|
let payload = mk_payload(&[
|
||||||
|
("from", value.into()),
|
||||||
|
("to", (value + delta).into()),
|
||||||
|
("key", KEY.into()),
|
||||||
|
]);
|
||||||
|
let body = Body::from_type("cas").with_payload(payload);
|
||||||
|
read_runner.rpc("seq-kv", body, Box::new(cas_handler));
|
||||||
|
};
|
||||||
|
// kick it off by calling "read" on seq-kv:
|
||||||
|
let payload = mk_payload(&[("key", KEY.into())]);
|
||||||
|
let body = Body::from_type("read").with_payload(payload);
|
||||||
|
runner.rpc("seq-kv", body, Box::new(read_handler));
|
||||||
}
|
}
|
||||||
"read" => {
|
"read" => {
|
||||||
let rn = runner.clone();
|
let rn = runner.clone();
|
||||||
|
|
|
@ -26,7 +26,7 @@ fn main() {
|
||||||
|
|
||||||
let runner = Runner::new(node, out);
|
let runner = Runner::new(node, out);
|
||||||
|
|
||||||
runner.run(rx);
|
runner.run(rx, None);
|
||||||
i.join().unwrap();
|
i.join().unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,7 +1,9 @@
|
||||||
/*
|
/*
|
||||||
|
use std::{rc::Rc, sync::Arc};
|
||||||
|
|
||||||
use serde_json::Value;
|
use serde_json::Value;
|
||||||
|
|
||||||
use crate::Body;
|
use crate::{mk_payload, Body, Message, Result, Runner};
|
||||||
|
|
||||||
#[derive(Debug, Default, Clone)]
|
#[derive(Debug, Default, Clone)]
|
||||||
pub struct Kv {
|
pub struct Kv {
|
||||||
|
@ -9,20 +11,40 @@ pub struct Kv {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Kv {
|
impl Kv {
|
||||||
pub fn new(service: &str) -> Self {
|
pub fn seq() -> Self {
|
||||||
Kv {
|
Kv {
|
||||||
service: service.to_string(),
|
service: "seq-kv".to_string(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
pub fn read(&self, key: &str) -> Body {
|
|
||||||
|
pub fn lin() -> Self {
|
||||||
|
Kv {
|
||||||
|
service: "lin-kv".to_string(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn lww() -> Self {
|
||||||
|
Kv {
|
||||||
|
service: "lww-kv".to_string(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn read(&self, runner: &Runner, key: &str) -> Result<Value> {
|
||||||
todo!()
|
todo!()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn write(&self, key: &str, val: Value, create: bool) -> Body {
|
pub fn write(&self, runner: &Runner, key: &str, val: Value, create: bool) -> Result<()> {
|
||||||
todo!()
|
todo!()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn cas(&self, key: &str, from: Value, to: Value, create: bool) -> Body {
|
pub fn cas(
|
||||||
|
&self,
|
||||||
|
runner: &Runner,
|
||||||
|
key: &str,
|
||||||
|
from: Value,
|
||||||
|
to: Value,
|
||||||
|
create: bool,
|
||||||
|
) -> Result<()> {
|
||||||
todo!()
|
todo!()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,63 +1,70 @@
|
||||||
use std::{
|
use std::{
|
||||||
collections::HashMap,
|
collections::HashMap,
|
||||||
io::{BufRead, Stdout, Write},
|
io::{BufRead, Stdout, Write},
|
||||||
|
rc::Rc,
|
||||||
sync::{
|
sync::{
|
||||||
atomic::{AtomicU64, AtomicUsize, Ordering},
|
atomic::{AtomicU64, AtomicUsize, Ordering},
|
||||||
mpsc::{channel, Receiver, Sender},
|
mpsc::{channel, Receiver, Sender},
|
||||||
Arc, Mutex, OnceLock,
|
Arc, LazyLock, Mutex, OnceLock,
|
||||||
},
|
},
|
||||||
thread::{self, JoinHandle},
|
thread::{self, JoinHandle},
|
||||||
};
|
};
|
||||||
|
|
||||||
pub mod protocol;
|
pub mod protocol;
|
||||||
|
use protocol::ErrorCode;
|
||||||
pub use protocol::{Body, Message, Payload};
|
pub use protocol::{Body, Message, Payload};
|
||||||
use serde_json::Value;
|
use serde_json::Value;
|
||||||
|
|
||||||
pub mod kv;
|
pub mod kv;
|
||||||
|
|
||||||
pub type DynNode = Arc<Mutex<dyn Node>>;
|
pub type DynNode = Arc<Mutex<dyn Node>>;
|
||||||
pub type Handler = Box<dyn FnMut(Message)>;
|
pub type Handler = Box<dyn FnOnce(Message)>; // -> Result<Option<Value>>>;
|
||||||
|
pub type OnInit = Box<dyn Fn(&Runner)>;
|
||||||
|
|
||||||
|
pub type Result<T> = std::result::Result<T, ErrorCode>;
|
||||||
|
|
||||||
|
static MSG_ID: AtomicU64 = AtomicU64::new(0);
|
||||||
|
static OUTPUT: LazyLock<Stdout> = LazyLock::new(std::io::stdout);
|
||||||
|
|
||||||
pub trait Node {
|
pub trait Node {
|
||||||
fn handle(&mut self, runner: &Runner, msg: Message);
|
fn handle(&mut self, runner: &Rc<Runner>, msg: Message);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct Runner {
|
pub struct Runner {
|
||||||
msg_id: Arc<AtomicU64>,
|
|
||||||
node: DynNode,
|
node: DynNode,
|
||||||
node_id: OnceLock<String>,
|
node_id: OnceLock<String>,
|
||||||
nodes: OnceLock<Vec<String>>,
|
nodes: OnceLock<Vec<String>>,
|
||||||
steps: Arc<AtomicUsize>,
|
steps: Arc<AtomicUsize>,
|
||||||
output: Arc<Mutex<Stdout>>,
|
|
||||||
handlers: Arc<Mutex<HashMap<u64, Handler>>>,
|
handlers: Arc<Mutex<HashMap<u64, Handler>>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Runner {
|
impl Runner {
|
||||||
pub fn new(node: DynNode, output: Stdout) -> Self {
|
pub fn new(node: DynNode) -> Self {
|
||||||
Runner {
|
Runner {
|
||||||
node,
|
node,
|
||||||
msg_id: Arc::new(AtomicU64::new(1)),
|
|
||||||
nodes: OnceLock::new(),
|
nodes: OnceLock::new(),
|
||||||
node_id: OnceLock::new(),
|
node_id: OnceLock::new(),
|
||||||
steps: Arc::new(AtomicUsize::new(0)),
|
steps: Arc::new(AtomicUsize::new(0)),
|
||||||
output: Arc::new(Mutex::new(output)),
|
|
||||||
handlers: Default::default(),
|
handlers: Default::default(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn run(&self, input: Receiver<Message>) {
|
pub fn run(self: &Rc<Self>, input: Receiver<Message>, on_init: Option<OnInit>) {
|
||||||
for msg in input.iter() {
|
for msg in input.iter() {
|
||||||
let typ = &msg.body.typ;
|
let typ = &msg.body.typ;
|
||||||
if let "init" = typ.as_str() {
|
if let "init" = typ.as_str() {
|
||||||
self.init(&msg);
|
self.init(&msg);
|
||||||
let body = Body::from_type("init_ok");
|
let body = Body::from_type("init_ok");
|
||||||
self.reply(&msg, body);
|
self.reply(&msg, body);
|
||||||
|
if let Some(ref h) = on_init {
|
||||||
|
h(self);
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
let irt = msg.body.in_reply_to;
|
let irt = msg.body.in_reply_to;
|
||||||
{
|
{
|
||||||
let mut g = self.handlers.lock().unwrap();
|
let mut g = self.handlers.lock().unwrap();
|
||||||
if let Some(mut h) = g.remove(&irt) {
|
if let Some(h) = g.remove(&irt) {
|
||||||
h(msg.clone());
|
h(msg.clone());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -83,11 +90,11 @@ impl Runner {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn next_msg_id(&self) -> u64 {
|
pub fn next_msg_id(&self) -> u64 {
|
||||||
self.msg_id.fetch_add(1, Ordering::SeqCst)
|
MSG_ID.fetch_add(1, Ordering::SeqCst)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn cur_msg_id(&self) -> u64 {
|
pub fn cur_msg_id(&self) -> u64 {
|
||||||
self.msg_id.load(Ordering::SeqCst)
|
MSG_ID.load(Ordering::SeqCst)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn nodes(&self) -> &[String] {
|
pub fn nodes(&self) -> &[String] {
|
||||||
|
@ -146,7 +153,7 @@ impl Runner {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn writeln(&self, msg: &str) {
|
fn writeln(&self, msg: &str) {
|
||||||
let mut out = self.output.lock().unwrap();
|
let mut out = OUTPUT.lock();
|
||||||
let msg = format!("{msg}\n");
|
let msg = format!("{msg}\n");
|
||||||
out.write_all(msg.as_bytes()).unwrap();
|
out.write_all(msg.as_bytes()).unwrap();
|
||||||
out.flush().unwrap();
|
out.flush().unwrap();
|
||||||
|
@ -163,7 +170,7 @@ pub fn mk_stdin() -> (JoinHandle<()>, Sender<Message>, Receiver<Message>) {
|
||||||
|
|
||||||
let join = thread::spawn(move || {
|
let join = thread::spawn(move || {
|
||||||
let g = stdin.lock();
|
let g = stdin.lock();
|
||||||
for line in g.lines().map_while(Result::ok) {
|
for line in g.lines().map_while(std::result::Result::ok) {
|
||||||
if let Ok(msg) = serde_json::from_str(&line) {
|
if let Ok(msg) = serde_json::from_str(&line) {
|
||||||
tx.send(msg).unwrap();
|
tx.send(msg).unwrap();
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue