Compare commits
No commits in common. "67a84aa3b6b4d90c860fa01399082007e48bd4f8" and "5e3e26978506ddbd6572c535cfa8724ee844dfe7" have entirely different histories.
67a84aa3b6
...
5e3e269785
4 changed files with 1 additions and 153 deletions
8
Cargo.lock
generated
8
Cargo.lock
generated
|
@ -53,14 +53,6 @@ dependencies = [
|
|||
"serde_json",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "gg-txn"
|
||||
version = "0.0.1"
|
||||
dependencies = [
|
||||
"nebkor-maelstrom",
|
||||
"serde_json",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "gg-uid"
|
||||
version = "0.0.1"
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
[workspace]
|
||||
members = ["gg-echo", "gg-uid", "gg-broadcast", "gg-g_counter", "gg-kafka", "gg-txn"]
|
||||
members = ["gg-echo", "gg-uid", "gg-broadcast", "gg-g_counter", "gg-kafka"]
|
||||
resolver = "2"
|
||||
|
||||
[workspace.package]
|
||||
|
|
|
@ -1,10 +0,0 @@
|
|||
[package]
|
||||
name = "gg-txn"
|
||||
edition = "2021"
|
||||
version.workspace = true
|
||||
authors.workspace = true
|
||||
license-file.workspace = true
|
||||
|
||||
[dependencies]
|
||||
nebkor-maelstrom.workspace = true
|
||||
serde_json.workspace = true
|
|
@ -1,134 +0,0 @@
|
|||
use std::collections::HashMap;
|
||||
|
||||
use nebkor_maelstrom::{mk_payload, Body, Message, Node, Runner};
|
||||
use serde_json::Value;
|
||||
|
||||
type Txn = Vec<TxnOp>;
|
||||
|
||||
trait TxnSer {
|
||||
fn serialize(&self) -> Value;
|
||||
}
|
||||
|
||||
impl TxnSer for Txn {
|
||||
fn serialize(&self) -> Value {
|
||||
let out: Vec<Value> = self
|
||||
.iter()
|
||||
.map(|t| {
|
||||
let v: Vec<Value> = vec![t.op.clone().into(), t.key.into(), t.value.into()];
|
||||
v.into()
|
||||
})
|
||||
.collect();
|
||||
out.into()
|
||||
}
|
||||
}
|
||||
|
||||
fn main() {
|
||||
let node = TANode::default();
|
||||
let runner = Runner::new(node);
|
||||
runner.run(None);
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Default)]
|
||||
struct TxnOp {
|
||||
op: String,
|
||||
key: u64,
|
||||
value: Option<u64>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Default)]
|
||||
struct TANode {
|
||||
store: HashMap<u64, u64>,
|
||||
}
|
||||
|
||||
impl TANode {
|
||||
fn transact(&mut self, txn: &Txn) -> (Txn, Txn) {
|
||||
let mut out = Txn::with_capacity(txn.len());
|
||||
let mut yo = Txn::new();
|
||||
for txn_op in txn.iter() {
|
||||
let op = txn_op.op.as_str();
|
||||
match op {
|
||||
"r" => {
|
||||
let key = txn_op.key;
|
||||
let op = TxnOp {
|
||||
op: "r".into(),
|
||||
key,
|
||||
value: self.store.get(&key).cloned(),
|
||||
};
|
||||
out.push(op);
|
||||
}
|
||||
"w" => {
|
||||
self.store.insert(txn_op.key, txn_op.value.unwrap());
|
||||
yo.push(txn_op.clone());
|
||||
out.push(txn_op.clone());
|
||||
}
|
||||
_ => {
|
||||
eprintln!("unknown TxnOp op {op}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
(out, yo)
|
||||
}
|
||||
|
||||
fn handle_yo(&mut self, txn: &Txn) {
|
||||
for op in txn.iter() {
|
||||
self.store.insert(op.key, op.value.unwrap());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Node for TANode {
|
||||
fn handle(&mut self, runner: &Runner, msg: Message) {
|
||||
let typ = msg.typ();
|
||||
|
||||
match typ {
|
||||
"txn" => {
|
||||
let txn = get_txn(&msg);
|
||||
|
||||
let (txn, yo) = self.transact(&txn);
|
||||
gossip(runner, &yo);
|
||||
let payload = mk_payload(&[("txn", txn.serialize())]);
|
||||
let body = Body::from_type("txn_ok").with_payload(payload);
|
||||
runner.reply(&msg, body);
|
||||
}
|
||||
|
||||
"yo" => {
|
||||
let txn = get_txn(&msg);
|
||||
self.handle_yo(&txn);
|
||||
}
|
||||
_ => {
|
||||
eprintln!("unknown message type {typ}")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn gossip(runner: &Runner, goss: &Txn) {
|
||||
let payload = mk_payload(&[("txn", goss.serialize())]);
|
||||
let body = Body::from_type("yo").with_payload(payload);
|
||||
|
||||
let slf = runner.node_id();
|
||||
let nodes = runner.nodes().iter().filter(|e| e.as_str() != slf);
|
||||
for node in nodes {
|
||||
runner.send(node, body.clone());
|
||||
}
|
||||
}
|
||||
|
||||
fn get_txn(msg: &Message) -> Txn {
|
||||
msg.body
|
||||
.payload
|
||||
.get("txn")
|
||||
.unwrap()
|
||||
.as_array()
|
||||
.unwrap()
|
||||
.iter()
|
||||
.map(|e| {
|
||||
let o = e.as_array().unwrap();
|
||||
TxnOp {
|
||||
op: o[0].as_str().unwrap().to_string(),
|
||||
key: o[1].as_u64().unwrap(),
|
||||
value: o[2].as_u64(),
|
||||
}
|
||||
})
|
||||
.collect()
|
||||
}
|
Loading…
Reference in a new issue