From 77316fa927ddc0784c6d1e36c9713f3dff52b85f Mon Sep 17 00:00:00 2001 From: Joe Ardent Date: Tue, 4 Jun 2024 11:32:00 -0700 Subject: [PATCH] Add KV impl. use kv in counter, still doesn't pass. --- gg-g_counter/src/main.rs | 57 +++++++------------------------- nebkor-maelstrom/src/kv.rs | 52 +++++++++++++++++------------ nebkor-maelstrom/src/lib.rs | 10 +++++- nebkor-maelstrom/src/protocol.rs | 20 ++++++----- 4 files changed, 63 insertions(+), 76 deletions(-) diff --git a/gg-g_counter/src/main.rs b/gg-g_counter/src/main.rs index 4e5b32c..0505f8b 100644 --- a/gg-g_counter/src/main.rs +++ b/gg-g_counter/src/main.rs @@ -1,4 +1,4 @@ -use nebkor_maelstrom::{mk_payload, Body, Message, Node, Runner}; +use nebkor_maelstrom::{kv::Kv, mk_payload, Body, Message, Node, Runner}; const KEY: &str = "COUNTER"; @@ -11,14 +11,8 @@ fn main() { let runner = Runner::new(node); let on_init = |rnr: &Runner| { - let payload = mk_payload(&[ - ("key", KEY.into()), - ("from", 0i64.into()), - ("to", 0i64.into()), - ("create_if_not_exists", true.into()), - ]); - let body = Body::from_type("cas").with_payload(payload); - rnr.send("seq-kv", body); + let kv = Kv::seq(); + let _ = kv.cas(rnr, KEY, 0i64.into(), 0i64.into(), true); }; let on_init = Box::new(on_init); @@ -28,49 +22,22 @@ fn main() { impl Node for Counter { fn handle<'slf>(&'slf mut self, runner: &'slf Runner, req: Message) { - let read_payload = mk_payload(&[("key", KEY.into())]); - let read_body = Body::from_type("read").with_payload(read_payload); let typ = req.body.typ.as_str(); - //let frm = req.src.clone(); - //let msg_id = req.body.msg_id.to_owned(); + let kv = Kv::seq(); match typ { "add" => { let delta = req.body.payload.get("delta").unwrap().as_i64().unwrap(); - let cur = runner - .rpc("seq-kv", read_body) - .recv() - .unwrap() - .body - .payload - .get("value") - .cloned() - .unwrap() - .as_i64() - .unwrap(); - let cas_payload = mk_payload(&[ - ("key", KEY.into()), - ("from", cur.into()), - ("to", (cur + delta).into()), - ]); - // ERRORS BE HERE - runner - .rpc("seq-kv", Body::from_type("cas").with_payload(cas_payload)) - .recv() - .unwrap(); + loop { + let cur = kv.read(runner, KEY).unwrap().as_i64().unwrap(); + match kv.cas(runner, KEY, cur.into(), (cur + delta).into(), false) { + Err(_) => {} + Ok(_) => break, + } + } runner.reply(&req, Body::from_type("add_ok")); } "read" => { - let payload = mk_payload(&[("key", KEY.into())]); - let body = Body::from_type("read").with_payload(payload); - let val = runner - .rpc("seq-kv", body) - .recv() - .unwrap() - .body - .payload - .get("value") - .cloned() - .unwrap(); + let val = kv.read(runner, KEY).unwrap(); let body = Body::from_type("read_ok").with_payload(mk_payload(&[("value", val)])); runner.reply(&req, body); } diff --git a/nebkor-maelstrom/src/kv.rs b/nebkor-maelstrom/src/kv.rs index 704698f..4ad7bed 100644 --- a/nebkor-maelstrom/src/kv.rs +++ b/nebkor-maelstrom/src/kv.rs @@ -1,40 +1,42 @@ -/* -use std::{rc::Rc, sync::Arc}; - use serde_json::Value; -use crate::{mk_payload, Body, Message, Result, Runner}; +use crate::{check_err, mk_payload, protocol::ErrorCode, Body, RpcResult, Runner}; + +pub type ReadResult = std::result::Result; #[derive(Debug, Default, Clone)] pub struct Kv { - pub service: String, + pub service: &'static str, } impl Kv { pub fn seq() -> Self { - Kv { - service: "seq-kv".to_string(), - } + Kv { service: "seq-kv" } } pub fn lin() -> Self { - Kv { - service: "lin-kv".to_string(), - } + Kv { service: "lin-kv" } } pub fn lww() -> Self { - Kv { - service: "lww-kv".to_string(), - } + Kv { service: "lww-kv" } } - pub fn read(&self, runner: &Runner, key: &str) -> Result { - todo!() + pub fn read(&self, runner: &Runner, key: &str) -> ReadResult { + let payload = mk_payload(&[("key", key.into())]); + let body = Body::from_type("read").with_payload(payload); + let rx = runner.rpc(self.service, body); + let msg = rx.recv().unwrap(); + check_err(&msg)?; + Ok(msg.body.payload.get("value").unwrap().to_owned()) } - pub fn write(&self, runner: &Runner, key: &str, val: Value, create: bool) -> Result<()> { - todo!() + pub fn write(&self, runner: &Runner, key: &str, val: Value) -> RpcResult { + let payload = mk_payload(&[("key", key.into()), ("value", val)]); + let body = Body::from_type("write").with_payload(payload); + let msg = runner.rpc(self.service, body).recv().unwrap(); + check_err(&msg)?; + Ok(()) } pub fn cas( @@ -44,8 +46,16 @@ impl Kv { from: Value, to: Value, create: bool, - ) -> Result<()> { - todo!() + ) -> RpcResult { + let payload = mk_payload(&[ + ("key", key.into()), + ("from", from), + ("to", to), + ("create_if_not_exists", create.into()), + ]); + let body = Body::from_type("cas").with_payload(payload); + let msg = runner.rpc(self.service, body).recv().unwrap(); + check_err(&msg)?; + Ok(()) } } -*/ diff --git a/nebkor-maelstrom/src/lib.rs b/nebkor-maelstrom/src/lib.rs index 62dccde..0c7c286 100644 --- a/nebkor-maelstrom/src/lib.rs +++ b/nebkor-maelstrom/src/lib.rs @@ -18,8 +18,8 @@ pub mod kv; pub type NodeyNodeFace = Arc>; pub type OnInit = Box; -pub type Result = std::result::Result; pub type RpcPromise = Receiver; +pub type RpcResult = std::result::Result<(), ErrorCode>; pub trait Node { fn handle(&mut self, runner: &Runner, msg: Message); @@ -198,6 +198,14 @@ impl Runner { } } +pub fn check_err(msg: &Message) -> RpcResult { + if msg.body.typ.as_str() == "error" { + let error = msg.body.code.unwrap(); + return Err(error); + } + Ok(()) +} + pub fn mk_payload(payload: &[(&str, Value)]) -> Payload { payload .iter() diff --git a/nebkor-maelstrom/src/protocol.rs b/nebkor-maelstrom/src/protocol.rs index 40f8e74..6f35178 100644 --- a/nebkor-maelstrom/src/protocol.rs +++ b/nebkor-maelstrom/src/protocol.rs @@ -21,6 +21,11 @@ pub struct Body { pub in_reply_to: u64, #[serde(flatten)] pub payload: Payload, + // the following are for the case of errors + #[serde(default, skip_serializing_if = "Option::is_none")] + pub code: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub text: Option, } impl Body { @@ -56,14 +61,14 @@ pub fn init_ok(msg_id: u64, in_reply_to: u64) -> Body { .with_in_reply_to(in_reply_to) } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Copy, Serialize, Deserialize)] #[serde(untagged)] pub enum ErrorCode { Definite(DefiniteError), Indefinite(IndefiniteError), } -#[derive(Debug, Clone, Serialize_repr, Deserialize_repr)] +#[derive(Debug, Clone, Copy, Serialize_repr, Deserialize_repr)] #[repr(u64)] pub enum DefiniteError { NodeNotFound = 2, @@ -77,7 +82,7 @@ pub enum DefiniteError { TxnConflict = 30, } -#[derive(Debug, Clone, Serialize_repr, Deserialize_repr)] +#[derive(Debug, Clone, Copy, Serialize_repr, Deserialize_repr)] #[repr(u64)] pub enum IndefiniteError { Timeout = 0, @@ -89,12 +94,9 @@ pub fn error(msg_id: u64, in_reply_to: u64, code: ErrorCode, text: Option<&str>) typ: "error".to_string(), msg_id, in_reply_to, - payload: [ - ("code".to_string(), serde_json::to_value(code).unwrap()), - ("text".to_string(), serde_json::to_value(text).unwrap()), - ] - .into_iter() - .collect(), + code: Some(code), + text: text.map(|t| t.to_string()), + payload: Default::default(), } }