diff --git a/gg-txn/src/main.rs b/gg-txn/src/main.rs index b751400..86e4594 100644 --- a/gg-txn/src/main.rs +++ b/gg-txn/src/main.rs @@ -41,8 +41,9 @@ struct TANode { } impl TANode { - fn transact(&mut self, txn: &Txn) -> Txn { + fn transact(&mut self, txn: &Txn) -> (Txn, Txn) { let mut out = Txn::with_capacity(txn.len()); + let mut yo = Txn::new(); for txn_op in txn.iter() { let op = txn_op.op.as_str(); match op { @@ -57,6 +58,7 @@ impl TANode { } "w" => { self.store.insert(txn_op.key, txn_op.value.unwrap()); + yo.push(txn_op.clone()); out.push(txn_op.clone()); } _ => { @@ -65,7 +67,13 @@ impl TANode { } } - out + (out, yo) + } + + fn handle_yo(&mut self, txn: &Txn) { + for op in txn.iter() { + self.store.insert(op.key, op.value.unwrap()); + } } } @@ -77,12 +85,17 @@ impl Node for TANode { "txn" => { let txn = get_txn(&msg); - let txn = self.transact(&txn); - + let (txn, yo) = self.transact(&txn); + gossip(runner, &yo); let payload = mk_payload(&[("txn", txn.serialize())]); let body = Body::from_type("txn_ok").with_payload(payload); runner.reply(&msg, body); } + + "yo" => { + let txn = get_txn(&msg); + self.handle_yo(&txn); + } _ => { eprintln!("unknown message type {typ}") } @@ -90,6 +103,17 @@ impl Node for TANode { } } +fn gossip(runner: &Runner, goss: &Txn) { + let payload = mk_payload(&[("txn", goss.serialize())]); + let body = Body::from_type("yo").with_payload(payload); + + let slf = runner.node_id(); + let nodes = runner.nodes().iter().filter(|e| e.as_str() != slf); + for node in nodes { + runner.send(node, body.clone()); + } +} + fn get_txn(msg: &Message) -> Txn { msg.body .payload