Compare commits

..

No commits in common. "67a84aa3b6b4d90c860fa01399082007e48bd4f8" and "5e3e26978506ddbd6572c535cfa8724ee844dfe7" have entirely different histories.

4 changed files with 1 additions and 153 deletions

8
Cargo.lock generated
View file

@ -53,14 +53,6 @@ dependencies = [
"serde_json",
]
[[package]]
name = "gg-txn"
version = "0.0.1"
dependencies = [
"nebkor-maelstrom",
"serde_json",
]
[[package]]
name = "gg-uid"
version = "0.0.1"

View file

@ -1,5 +1,5 @@
[workspace]
members = ["gg-echo", "gg-uid", "gg-broadcast", "gg-g_counter", "gg-kafka", "gg-txn"]
members = ["gg-echo", "gg-uid", "gg-broadcast", "gg-g_counter", "gg-kafka"]
resolver = "2"
[workspace.package]

View file

@ -1,10 +0,0 @@
[package]
name = "gg-txn"
edition = "2021"
version.workspace = true
authors.workspace = true
license-file.workspace = true
[dependencies]
nebkor-maelstrom.workspace = true
serde_json.workspace = true

View file

@ -1,134 +0,0 @@
use std::collections::HashMap;
use nebkor_maelstrom::{mk_payload, Body, Message, Node, Runner};
use serde_json::Value;
type Txn = Vec<TxnOp>;
trait TxnSer {
fn serialize(&self) -> Value;
}
impl TxnSer for Txn {
fn serialize(&self) -> Value {
let out: Vec<Value> = self
.iter()
.map(|t| {
let v: Vec<Value> = vec![t.op.clone().into(), t.key.into(), t.value.into()];
v.into()
})
.collect();
out.into()
}
}
fn main() {
let node = TANode::default();
let runner = Runner::new(node);
runner.run(None);
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Default)]
struct TxnOp {
op: String,
key: u64,
value: Option<u64>,
}
#[derive(Debug, Clone, Default)]
struct TANode {
store: HashMap<u64, u64>,
}
impl TANode {
fn transact(&mut self, txn: &Txn) -> (Txn, Txn) {
let mut out = Txn::with_capacity(txn.len());
let mut yo = Txn::new();
for txn_op in txn.iter() {
let op = txn_op.op.as_str();
match op {
"r" => {
let key = txn_op.key;
let op = TxnOp {
op: "r".into(),
key,
value: self.store.get(&key).cloned(),
};
out.push(op);
}
"w" => {
self.store.insert(txn_op.key, txn_op.value.unwrap());
yo.push(txn_op.clone());
out.push(txn_op.clone());
}
_ => {
eprintln!("unknown TxnOp op {op}");
}
}
}
(out, yo)
}
fn handle_yo(&mut self, txn: &Txn) {
for op in txn.iter() {
self.store.insert(op.key, op.value.unwrap());
}
}
}
impl Node for TANode {
fn handle(&mut self, runner: &Runner, msg: Message) {
let typ = msg.typ();
match typ {
"txn" => {
let txn = get_txn(&msg);
let (txn, yo) = self.transact(&txn);
gossip(runner, &yo);
let payload = mk_payload(&[("txn", txn.serialize())]);
let body = Body::from_type("txn_ok").with_payload(payload);
runner.reply(&msg, body);
}
"yo" => {
let txn = get_txn(&msg);
self.handle_yo(&txn);
}
_ => {
eprintln!("unknown message type {typ}")
}
}
}
}
fn gossip(runner: &Runner, goss: &Txn) {
let payload = mk_payload(&[("txn", goss.serialize())]);
let body = Body::from_type("yo").with_payload(payload);
let slf = runner.node_id();
let nodes = runner.nodes().iter().filter(|e| e.as_str() != slf);
for node in nodes {
runner.send(node, body.clone());
}
}
fn get_txn(msg: &Message) -> Txn {
msg.body
.payload
.get("txn")
.unwrap()
.as_array()
.unwrap()
.iter()
.map(|e| {
let o = e.as_array().unwrap();
TxnOp {
op: o[0].as_str().unwrap().to_string(),
key: o[1].as_u64().unwrap(),
value: o[2].as_u64(),
}
})
.collect()
}