ensure aborted transactions don't get written
This commit is contained in:
parent
64153b7fd4
commit
350d24d284
1 changed files with 22 additions and 10 deletions
|
@ -1,6 +1,6 @@
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
|
|
||||||
use nebkor_maelstrom::{mk_payload, Body, Message, Node, Runner};
|
use nebkor_maelstrom::{mk_payload, Body, ErrorCode, Message, Node, Runner};
|
||||||
use serde_json::Value;
|
use serde_json::Value;
|
||||||
|
|
||||||
type Txn = Vec<TxnOp>;
|
type Txn = Vec<TxnOp>;
|
||||||
|
@ -44,6 +44,7 @@ impl TANode {
|
||||||
fn transact(&mut self, txn: &Txn) -> (Txn, Txn) {
|
fn transact(&mut self, txn: &Txn) -> (Txn, Txn) {
|
||||||
let mut out = Txn::with_capacity(txn.len());
|
let mut out = Txn::with_capacity(txn.len());
|
||||||
let mut yo = Txn::new();
|
let mut yo = Txn::new();
|
||||||
|
let mut new = HashMap::new();
|
||||||
for txn_op in txn.iter() {
|
for txn_op in txn.iter() {
|
||||||
let op = txn_op.op.as_str();
|
let op = txn_op.op.as_str();
|
||||||
match op {
|
match op {
|
||||||
|
@ -57,7 +58,7 @@ impl TANode {
|
||||||
out.push(op);
|
out.push(op);
|
||||||
}
|
}
|
||||||
"w" => {
|
"w" => {
|
||||||
self.store.insert(txn_op.key, txn_op.value.unwrap());
|
new.insert(txn_op.key, txn_op.value.unwrap());
|
||||||
yo.push(txn_op.clone());
|
yo.push(txn_op.clone());
|
||||||
out.push(txn_op.clone());
|
out.push(txn_op.clone());
|
||||||
}
|
}
|
||||||
|
@ -67,13 +68,13 @@ impl TANode {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
(out, yo)
|
// if the transaction were aborted, we'd not want to commit it by inserting from
|
||||||
}
|
// `new` here
|
||||||
|
for (k, v) in new.into_iter() {
|
||||||
fn handle_yo(&mut self, txn: &Txn) {
|
self.store.insert(k, v);
|
||||||
for op in txn.iter() {
|
|
||||||
self.store.insert(op.key, op.value.unwrap());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
(out, yo)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -86,15 +87,26 @@ impl Node for TANode {
|
||||||
let txn = get_txn(&msg);
|
let txn = get_txn(&msg);
|
||||||
|
|
||||||
let (txn, yo) = self.transact(&txn);
|
let (txn, yo) = self.transact(&txn);
|
||||||
|
// simulated aborted transaction for G1a-resistance
|
||||||
|
if txn.is_empty() {
|
||||||
|
let mut error = Body::from_type("error");
|
||||||
|
let ec =
|
||||||
|
ErrorCode::Definite(nebkor_maelstrom::protocol::DefiniteError::TxnConflict);
|
||||||
|
error.code = Some(ec);
|
||||||
|
error.text = Some("shit's fucked".into());
|
||||||
|
runner.reply(&msg, error);
|
||||||
|
return;
|
||||||
|
}
|
||||||
gossip(runner, &yo);
|
gossip(runner, &yo);
|
||||||
let payload = mk_payload(&[("txn", txn.serialize())]);
|
let payload = mk_payload(&[("txn", txn.serialize())]);
|
||||||
let body = Body::from_type("txn_ok").with_payload(payload);
|
let body = Body::from_type("txn_ok").with_payload(payload);
|
||||||
runner.reply(&msg, body);
|
runner.reply(&msg, body);
|
||||||
}
|
}
|
||||||
|
|
||||||
"yo" => {
|
"yo" => {
|
||||||
let txn = get_txn(&msg);
|
let txn = get_txn(&msg);
|
||||||
self.handle_yo(&txn);
|
for op in txn.iter() {
|
||||||
|
self.store.insert(op.key, op.value.unwrap());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
_ => {
|
_ => {
|
||||||
eprintln!("unknown message type {typ}")
|
eprintln!("unknown message type {typ}")
|
||||||
|
|
Loading…
Reference in a new issue