Compare commits
No commits in common. "67a84aa3b6b4d90c860fa01399082007e48bd4f8" and "5e3e26978506ddbd6572c535cfa8724ee844dfe7" have entirely different histories.
67a84aa3b6
...
5e3e269785
4 changed files with 1 additions and 153 deletions
8
Cargo.lock
generated
8
Cargo.lock
generated
|
@ -53,14 +53,6 @@ dependencies = [
|
||||||
"serde_json",
|
"serde_json",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "gg-txn"
|
|
||||||
version = "0.0.1"
|
|
||||||
dependencies = [
|
|
||||||
"nebkor-maelstrom",
|
|
||||||
"serde_json",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "gg-uid"
|
name = "gg-uid"
|
||||||
version = "0.0.1"
|
version = "0.0.1"
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
[workspace]
|
[workspace]
|
||||||
members = ["gg-echo", "gg-uid", "gg-broadcast", "gg-g_counter", "gg-kafka", "gg-txn"]
|
members = ["gg-echo", "gg-uid", "gg-broadcast", "gg-g_counter", "gg-kafka"]
|
||||||
resolver = "2"
|
resolver = "2"
|
||||||
|
|
||||||
[workspace.package]
|
[workspace.package]
|
||||||
|
|
|
@ -1,10 +0,0 @@
|
||||||
[package]
|
|
||||||
name = "gg-txn"
|
|
||||||
edition = "2021"
|
|
||||||
version.workspace = true
|
|
||||||
authors.workspace = true
|
|
||||||
license-file.workspace = true
|
|
||||||
|
|
||||||
[dependencies]
|
|
||||||
nebkor-maelstrom.workspace = true
|
|
||||||
serde_json.workspace = true
|
|
|
@ -1,134 +0,0 @@
|
||||||
use std::collections::HashMap;
|
|
||||||
|
|
||||||
use nebkor_maelstrom::{mk_payload, Body, Message, Node, Runner};
|
|
||||||
use serde_json::Value;
|
|
||||||
|
|
||||||
type Txn = Vec<TxnOp>;
|
|
||||||
|
|
||||||
trait TxnSer {
|
|
||||||
fn serialize(&self) -> Value;
|
|
||||||
}
|
|
||||||
|
|
||||||
impl TxnSer for Txn {
|
|
||||||
fn serialize(&self) -> Value {
|
|
||||||
let out: Vec<Value> = self
|
|
||||||
.iter()
|
|
||||||
.map(|t| {
|
|
||||||
let v: Vec<Value> = vec![t.op.clone().into(), t.key.into(), t.value.into()];
|
|
||||||
v.into()
|
|
||||||
})
|
|
||||||
.collect();
|
|
||||||
out.into()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn main() {
|
|
||||||
let node = TANode::default();
|
|
||||||
let runner = Runner::new(node);
|
|
||||||
runner.run(None);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Default)]
|
|
||||||
struct TxnOp {
|
|
||||||
op: String,
|
|
||||||
key: u64,
|
|
||||||
value: Option<u64>,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Clone, Default)]
|
|
||||||
struct TANode {
|
|
||||||
store: HashMap<u64, u64>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl TANode {
|
|
||||||
fn transact(&mut self, txn: &Txn) -> (Txn, Txn) {
|
|
||||||
let mut out = Txn::with_capacity(txn.len());
|
|
||||||
let mut yo = Txn::new();
|
|
||||||
for txn_op in txn.iter() {
|
|
||||||
let op = txn_op.op.as_str();
|
|
||||||
match op {
|
|
||||||
"r" => {
|
|
||||||
let key = txn_op.key;
|
|
||||||
let op = TxnOp {
|
|
||||||
op: "r".into(),
|
|
||||||
key,
|
|
||||||
value: self.store.get(&key).cloned(),
|
|
||||||
};
|
|
||||||
out.push(op);
|
|
||||||
}
|
|
||||||
"w" => {
|
|
||||||
self.store.insert(txn_op.key, txn_op.value.unwrap());
|
|
||||||
yo.push(txn_op.clone());
|
|
||||||
out.push(txn_op.clone());
|
|
||||||
}
|
|
||||||
_ => {
|
|
||||||
eprintln!("unknown TxnOp op {op}");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
(out, yo)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn handle_yo(&mut self, txn: &Txn) {
|
|
||||||
for op in txn.iter() {
|
|
||||||
self.store.insert(op.key, op.value.unwrap());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Node for TANode {
|
|
||||||
fn handle(&mut self, runner: &Runner, msg: Message) {
|
|
||||||
let typ = msg.typ();
|
|
||||||
|
|
||||||
match typ {
|
|
||||||
"txn" => {
|
|
||||||
let txn = get_txn(&msg);
|
|
||||||
|
|
||||||
let (txn, yo) = self.transact(&txn);
|
|
||||||
gossip(runner, &yo);
|
|
||||||
let payload = mk_payload(&[("txn", txn.serialize())]);
|
|
||||||
let body = Body::from_type("txn_ok").with_payload(payload);
|
|
||||||
runner.reply(&msg, body);
|
|
||||||
}
|
|
||||||
|
|
||||||
"yo" => {
|
|
||||||
let txn = get_txn(&msg);
|
|
||||||
self.handle_yo(&txn);
|
|
||||||
}
|
|
||||||
_ => {
|
|
||||||
eprintln!("unknown message type {typ}")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn gossip(runner: &Runner, goss: &Txn) {
|
|
||||||
let payload = mk_payload(&[("txn", goss.serialize())]);
|
|
||||||
let body = Body::from_type("yo").with_payload(payload);
|
|
||||||
|
|
||||||
let slf = runner.node_id();
|
|
||||||
let nodes = runner.nodes().iter().filter(|e| e.as_str() != slf);
|
|
||||||
for node in nodes {
|
|
||||||
runner.send(node, body.clone());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn get_txn(msg: &Message) -> Txn {
|
|
||||||
msg.body
|
|
||||||
.payload
|
|
||||||
.get("txn")
|
|
||||||
.unwrap()
|
|
||||||
.as_array()
|
|
||||||
.unwrap()
|
|
||||||
.iter()
|
|
||||||
.map(|e| {
|
|
||||||
let o = e.as_array().unwrap();
|
|
||||||
TxnOp {
|
|
||||||
op: o[0].as_str().unwrap().to_string(),
|
|
||||||
key: o[1].as_u64().unwrap(),
|
|
||||||
value: o[2].as_u64(),
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.collect()
|
|
||||||
}
|
|
Loading…
Reference in a new issue