Add KV impl.

use kv in counter, still doesn't pass.
This commit is contained in:
Joe Ardent 2024-06-04 11:32:00 -07:00
parent e92af7cf92
commit 77316fa927
4 changed files with 63 additions and 76 deletions

View file

@ -1,4 +1,4 @@
use nebkor_maelstrom::{mk_payload, Body, Message, Node, Runner};
use nebkor_maelstrom::{kv::Kv, mk_payload, Body, Message, Node, Runner};
const KEY: &str = "COUNTER";
@ -11,14 +11,8 @@ fn main() {
let runner = Runner::new(node);
let on_init = |rnr: &Runner| {
let payload = mk_payload(&[
("key", KEY.into()),
("from", 0i64.into()),
("to", 0i64.into()),
("create_if_not_exists", true.into()),
]);
let body = Body::from_type("cas").with_payload(payload);
rnr.send("seq-kv", body);
let kv = Kv::seq();
let _ = kv.cas(rnr, KEY, 0i64.into(), 0i64.into(), true);
};
let on_init = Box::new(on_init);
@ -28,49 +22,22 @@ fn main() {
impl Node for Counter {
fn handle<'slf>(&'slf mut self, runner: &'slf Runner, req: Message) {
let read_payload = mk_payload(&[("key", KEY.into())]);
let read_body = Body::from_type("read").with_payload(read_payload);
let typ = req.body.typ.as_str();
//let frm = req.src.clone();
//let msg_id = req.body.msg_id.to_owned();
let kv = Kv::seq();
match typ {
"add" => {
let delta = req.body.payload.get("delta").unwrap().as_i64().unwrap();
let cur = runner
.rpc("seq-kv", read_body)
.recv()
.unwrap()
.body
.payload
.get("value")
.cloned()
.unwrap()
.as_i64()
.unwrap();
let cas_payload = mk_payload(&[
("key", KEY.into()),
("from", cur.into()),
("to", (cur + delta).into()),
]);
// ERRORS BE HERE
runner
.rpc("seq-kv", Body::from_type("cas").with_payload(cas_payload))
.recv()
.unwrap();
loop {
let cur = kv.read(runner, KEY).unwrap().as_i64().unwrap();
match kv.cas(runner, KEY, cur.into(), (cur + delta).into(), false) {
Err(_) => {}
Ok(_) => break,
}
}
runner.reply(&req, Body::from_type("add_ok"));
}
"read" => {
let payload = mk_payload(&[("key", KEY.into())]);
let body = Body::from_type("read").with_payload(payload);
let val = runner
.rpc("seq-kv", body)
.recv()
.unwrap()
.body
.payload
.get("value")
.cloned()
.unwrap();
let val = kv.read(runner, KEY).unwrap();
let body = Body::from_type("read_ok").with_payload(mk_payload(&[("value", val)]));
runner.reply(&req, body);
}

View file

@ -1,40 +1,42 @@
/*
use std::{rc::Rc, sync::Arc};
use serde_json::Value;
use crate::{mk_payload, Body, Message, Result, Runner};
use crate::{check_err, mk_payload, protocol::ErrorCode, Body, RpcResult, Runner};
pub type ReadResult = std::result::Result<Value, ErrorCode>;
#[derive(Debug, Default, Clone)]
pub struct Kv {
pub service: String,
pub service: &'static str,
}
impl Kv {
pub fn seq() -> Self {
Kv {
service: "seq-kv".to_string(),
}
Kv { service: "seq-kv" }
}
pub fn lin() -> Self {
Kv {
service: "lin-kv".to_string(),
}
Kv { service: "lin-kv" }
}
pub fn lww() -> Self {
Kv {
service: "lww-kv".to_string(),
}
Kv { service: "lww-kv" }
}
pub fn read(&self, runner: &Runner, key: &str) -> Result<Value> {
todo!()
pub fn read(&self, runner: &Runner, key: &str) -> ReadResult {
let payload = mk_payload(&[("key", key.into())]);
let body = Body::from_type("read").with_payload(payload);
let rx = runner.rpc(self.service, body);
let msg = rx.recv().unwrap();
check_err(&msg)?;
Ok(msg.body.payload.get("value").unwrap().to_owned())
}
pub fn write(&self, runner: &Runner, key: &str, val: Value, create: bool) -> Result<()> {
todo!()
pub fn write(&self, runner: &Runner, key: &str, val: Value) -> RpcResult {
let payload = mk_payload(&[("key", key.into()), ("value", val)]);
let body = Body::from_type("write").with_payload(payload);
let msg = runner.rpc(self.service, body).recv().unwrap();
check_err(&msg)?;
Ok(())
}
pub fn cas(
@ -44,8 +46,16 @@ impl Kv {
from: Value,
to: Value,
create: bool,
) -> Result<()> {
todo!()
) -> RpcResult {
let payload = mk_payload(&[
("key", key.into()),
("from", from),
("to", to),
("create_if_not_exists", create.into()),
]);
let body = Body::from_type("cas").with_payload(payload);
let msg = runner.rpc(self.service, body).recv().unwrap();
check_err(&msg)?;
Ok(())
}
}
*/

View file

@ -18,8 +18,8 @@ pub mod kv;
pub type NodeyNodeFace = Arc<Mutex<dyn Node>>;
pub type OnInit = Box<dyn Fn(&Runner)>;
pub type Result<T> = std::result::Result<T, ErrorCode>;
pub type RpcPromise = Receiver<Message>;
pub type RpcResult = std::result::Result<(), ErrorCode>;
pub trait Node {
fn handle(&mut self, runner: &Runner, msg: Message);
@ -198,6 +198,14 @@ impl Runner {
}
}
pub fn check_err(msg: &Message) -> RpcResult {
if msg.body.typ.as_str() == "error" {
let error = msg.body.code.unwrap();
return Err(error);
}
Ok(())
}
pub fn mk_payload(payload: &[(&str, Value)]) -> Payload {
payload
.iter()

View file

@ -21,6 +21,11 @@ pub struct Body {
pub in_reply_to: u64,
#[serde(flatten)]
pub payload: Payload,
// the following are for the case of errors
#[serde(default, skip_serializing_if = "Option::is_none")]
pub code: Option<ErrorCode>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub text: Option<String>,
}
impl Body {
@ -56,14 +61,14 @@ pub fn init_ok(msg_id: u64, in_reply_to: u64) -> Body {
.with_in_reply_to(in_reply_to)
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
#[serde(untagged)]
pub enum ErrorCode {
Definite(DefiniteError),
Indefinite(IndefiniteError),
}
#[derive(Debug, Clone, Serialize_repr, Deserialize_repr)]
#[derive(Debug, Clone, Copy, Serialize_repr, Deserialize_repr)]
#[repr(u64)]
pub enum DefiniteError {
NodeNotFound = 2,
@ -77,7 +82,7 @@ pub enum DefiniteError {
TxnConflict = 30,
}
#[derive(Debug, Clone, Serialize_repr, Deserialize_repr)]
#[derive(Debug, Clone, Copy, Serialize_repr, Deserialize_repr)]
#[repr(u64)]
pub enum IndefiniteError {
Timeout = 0,
@ -89,12 +94,9 @@ pub fn error(msg_id: u64, in_reply_to: u64, code: ErrorCode, text: Option<&str>)
typ: "error".to_string(),
msg_id,
in_reply_to,
payload: [
("code".to_string(), serde_json::to_value(code).unwrap()),
("text".to_string(), serde_json::to_value(text).unwrap()),
]
.into_iter()
.collect(),
code: Some(code),
text: text.map(|t| t.to_string()),
payload: Default::default(),
}
}