passes 6b, unreliably gossips writes
This commit is contained in:
parent
ac5477d1d0
commit
67a84aa3b6
1 changed files with 28 additions and 4 deletions
|
@ -41,8 +41,9 @@ struct TANode {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl TANode {
|
impl TANode {
|
||||||
fn transact(&mut self, txn: &Txn) -> Txn {
|
fn transact(&mut self, txn: &Txn) -> (Txn, Txn) {
|
||||||
let mut out = Txn::with_capacity(txn.len());
|
let mut out = Txn::with_capacity(txn.len());
|
||||||
|
let mut yo = Txn::new();
|
||||||
for txn_op in txn.iter() {
|
for txn_op in txn.iter() {
|
||||||
let op = txn_op.op.as_str();
|
let op = txn_op.op.as_str();
|
||||||
match op {
|
match op {
|
||||||
|
@ -57,6 +58,7 @@ impl TANode {
|
||||||
}
|
}
|
||||||
"w" => {
|
"w" => {
|
||||||
self.store.insert(txn_op.key, txn_op.value.unwrap());
|
self.store.insert(txn_op.key, txn_op.value.unwrap());
|
||||||
|
yo.push(txn_op.clone());
|
||||||
out.push(txn_op.clone());
|
out.push(txn_op.clone());
|
||||||
}
|
}
|
||||||
_ => {
|
_ => {
|
||||||
|
@ -65,7 +67,13 @@ impl TANode {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
out
|
(out, yo)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn handle_yo(&mut self, txn: &Txn) {
|
||||||
|
for op in txn.iter() {
|
||||||
|
self.store.insert(op.key, op.value.unwrap());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -77,12 +85,17 @@ impl Node for TANode {
|
||||||
"txn" => {
|
"txn" => {
|
||||||
let txn = get_txn(&msg);
|
let txn = get_txn(&msg);
|
||||||
|
|
||||||
let txn = self.transact(&txn);
|
let (txn, yo) = self.transact(&txn);
|
||||||
|
gossip(runner, &yo);
|
||||||
let payload = mk_payload(&[("txn", txn.serialize())]);
|
let payload = mk_payload(&[("txn", txn.serialize())]);
|
||||||
let body = Body::from_type("txn_ok").with_payload(payload);
|
let body = Body::from_type("txn_ok").with_payload(payload);
|
||||||
runner.reply(&msg, body);
|
runner.reply(&msg, body);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
"yo" => {
|
||||||
|
let txn = get_txn(&msg);
|
||||||
|
self.handle_yo(&txn);
|
||||||
|
}
|
||||||
_ => {
|
_ => {
|
||||||
eprintln!("unknown message type {typ}")
|
eprintln!("unknown message type {typ}")
|
||||||
}
|
}
|
||||||
|
@ -90,6 +103,17 @@ impl Node for TANode {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn gossip(runner: &Runner, goss: &Txn) {
|
||||||
|
let payload = mk_payload(&[("txn", goss.serialize())]);
|
||||||
|
let body = Body::from_type("yo").with_payload(payload);
|
||||||
|
|
||||||
|
let slf = runner.node_id();
|
||||||
|
let nodes = runner.nodes().iter().filter(|e| e.as_str() != slf);
|
||||||
|
for node in nodes {
|
||||||
|
runner.send(node, body.clone());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fn get_txn(msg: &Message) -> Txn {
|
fn get_txn(msg: &Message) -> Txn {
|
||||||
msg.body
|
msg.body
|
||||||
.payload
|
.payload
|
||||||
|
|
Loading…
Reference in a new issue