Compare commits
2 commits
5e3e269785
...
67a84aa3b6
Author | SHA1 | Date | |
---|---|---|---|
|
67a84aa3b6 | ||
|
ac5477d1d0 |
4 changed files with 153 additions and 1 deletions
8
Cargo.lock
generated
8
Cargo.lock
generated
|
@ -53,6 +53,14 @@ dependencies = [
|
||||||
"serde_json",
|
"serde_json",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "gg-txn"
|
||||||
|
version = "0.0.1"
|
||||||
|
dependencies = [
|
||||||
|
"nebkor-maelstrom",
|
||||||
|
"serde_json",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "gg-uid"
|
name = "gg-uid"
|
||||||
version = "0.0.1"
|
version = "0.0.1"
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
[workspace]
|
[workspace]
|
||||||
members = ["gg-echo", "gg-uid", "gg-broadcast", "gg-g_counter", "gg-kafka"]
|
members = ["gg-echo", "gg-uid", "gg-broadcast", "gg-g_counter", "gg-kafka", "gg-txn"]
|
||||||
resolver = "2"
|
resolver = "2"
|
||||||
|
|
||||||
[workspace.package]
|
[workspace.package]
|
||||||
|
|
10
gg-txn/Cargo.toml
Normal file
10
gg-txn/Cargo.toml
Normal file
|
@ -0,0 +1,10 @@
|
||||||
|
[package]
|
||||||
|
name = "gg-txn"
|
||||||
|
edition = "2021"
|
||||||
|
version.workspace = true
|
||||||
|
authors.workspace = true
|
||||||
|
license-file.workspace = true
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
nebkor-maelstrom.workspace = true
|
||||||
|
serde_json.workspace = true
|
134
gg-txn/src/main.rs
Normal file
134
gg-txn/src/main.rs
Normal file
|
@ -0,0 +1,134 @@
|
||||||
|
use std::collections::HashMap;
|
||||||
|
|
||||||
|
use nebkor_maelstrom::{mk_payload, Body, Message, Node, Runner};
|
||||||
|
use serde_json::Value;
|
||||||
|
|
||||||
|
type Txn = Vec<TxnOp>;
|
||||||
|
|
||||||
|
trait TxnSer {
|
||||||
|
fn serialize(&self) -> Value;
|
||||||
|
}
|
||||||
|
|
||||||
|
impl TxnSer for Txn {
|
||||||
|
fn serialize(&self) -> Value {
|
||||||
|
let out: Vec<Value> = self
|
||||||
|
.iter()
|
||||||
|
.map(|t| {
|
||||||
|
let v: Vec<Value> = vec![t.op.clone().into(), t.key.into(), t.value.into()];
|
||||||
|
v.into()
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
out.into()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn main() {
|
||||||
|
let node = TANode::default();
|
||||||
|
let runner = Runner::new(node);
|
||||||
|
runner.run(None);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Default)]
|
||||||
|
struct TxnOp {
|
||||||
|
op: String,
|
||||||
|
key: u64,
|
||||||
|
value: Option<u64>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Default)]
|
||||||
|
struct TANode {
|
||||||
|
store: HashMap<u64, u64>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl TANode {
|
||||||
|
fn transact(&mut self, txn: &Txn) -> (Txn, Txn) {
|
||||||
|
let mut out = Txn::with_capacity(txn.len());
|
||||||
|
let mut yo = Txn::new();
|
||||||
|
for txn_op in txn.iter() {
|
||||||
|
let op = txn_op.op.as_str();
|
||||||
|
match op {
|
||||||
|
"r" => {
|
||||||
|
let key = txn_op.key;
|
||||||
|
let op = TxnOp {
|
||||||
|
op: "r".into(),
|
||||||
|
key,
|
||||||
|
value: self.store.get(&key).cloned(),
|
||||||
|
};
|
||||||
|
out.push(op);
|
||||||
|
}
|
||||||
|
"w" => {
|
||||||
|
self.store.insert(txn_op.key, txn_op.value.unwrap());
|
||||||
|
yo.push(txn_op.clone());
|
||||||
|
out.push(txn_op.clone());
|
||||||
|
}
|
||||||
|
_ => {
|
||||||
|
eprintln!("unknown TxnOp op {op}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
(out, yo)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn handle_yo(&mut self, txn: &Txn) {
|
||||||
|
for op in txn.iter() {
|
||||||
|
self.store.insert(op.key, op.value.unwrap());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Node for TANode {
|
||||||
|
fn handle(&mut self, runner: &Runner, msg: Message) {
|
||||||
|
let typ = msg.typ();
|
||||||
|
|
||||||
|
match typ {
|
||||||
|
"txn" => {
|
||||||
|
let txn = get_txn(&msg);
|
||||||
|
|
||||||
|
let (txn, yo) = self.transact(&txn);
|
||||||
|
gossip(runner, &yo);
|
||||||
|
let payload = mk_payload(&[("txn", txn.serialize())]);
|
||||||
|
let body = Body::from_type("txn_ok").with_payload(payload);
|
||||||
|
runner.reply(&msg, body);
|
||||||
|
}
|
||||||
|
|
||||||
|
"yo" => {
|
||||||
|
let txn = get_txn(&msg);
|
||||||
|
self.handle_yo(&txn);
|
||||||
|
}
|
||||||
|
_ => {
|
||||||
|
eprintln!("unknown message type {typ}")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn gossip(runner: &Runner, goss: &Txn) {
|
||||||
|
let payload = mk_payload(&[("txn", goss.serialize())]);
|
||||||
|
let body = Body::from_type("yo").with_payload(payload);
|
||||||
|
|
||||||
|
let slf = runner.node_id();
|
||||||
|
let nodes = runner.nodes().iter().filter(|e| e.as_str() != slf);
|
||||||
|
for node in nodes {
|
||||||
|
runner.send(node, body.clone());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_txn(msg: &Message) -> Txn {
|
||||||
|
msg.body
|
||||||
|
.payload
|
||||||
|
.get("txn")
|
||||||
|
.unwrap()
|
||||||
|
.as_array()
|
||||||
|
.unwrap()
|
||||||
|
.iter()
|
||||||
|
.map(|e| {
|
||||||
|
let o = e.as_array().unwrap();
|
||||||
|
TxnOp {
|
||||||
|
op: o[0].as_str().unwrap().to_string(),
|
||||||
|
key: o[1].as_u64().unwrap(),
|
||||||
|
value: o[2].as_u64(),
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.collect()
|
||||||
|
}
|
Loading…
Reference in a new issue