be slightly more parsimonious with sending gossip
This commit is contained in:
parent
2badae0321
commit
6e9dfe881a
1 changed files with 23 additions and 23 deletions
|
@ -1,5 +1,5 @@
|
||||||
use std::{
|
use std::{
|
||||||
collections::{HashMap, HashSet},
|
collections::{BTreeSet, HashMap, HashSet},
|
||||||
thread,
|
thread,
|
||||||
time::Duration,
|
time::Duration,
|
||||||
};
|
};
|
||||||
|
@ -18,8 +18,7 @@ fn main() {
|
||||||
fn on_init(runner: &Runner) {
|
fn on_init(runner: &Runner) {
|
||||||
let tx = runner.get_backdoor();
|
let tx = runner.get_backdoor();
|
||||||
thread::spawn(move || loop {
|
thread::spawn(move || loop {
|
||||||
//let millis = rand::thread_rng().gen_range(200..=800);
|
let millis = 50;
|
||||||
let millis = 65;
|
|
||||||
thread::sleep(Duration::from_millis(millis));
|
thread::sleep(Duration::from_millis(millis));
|
||||||
let body = Body::from_type("do_gossip");
|
let body = Body::from_type("do_gossip");
|
||||||
let msg = Message {
|
let msg = Message {
|
||||||
|
@ -32,8 +31,8 @@ fn on_init(runner: &Runner) {
|
||||||
|
|
||||||
#[derive(Clone, Default)]
|
#[derive(Clone, Default)]
|
||||||
struct BCaster {
|
struct BCaster {
|
||||||
store: HashSet<i64>,
|
store: BTreeSet<i64>,
|
||||||
gossips: HashMap<String, HashSet<i64>>,
|
gossips: HashMap<String, BTreeSet<i64>>,
|
||||||
neighbors: Vec<String>,
|
neighbors: Vec<String>,
|
||||||
others: Vec<String>,
|
others: Vec<String>,
|
||||||
}
|
}
|
||||||
|
@ -52,26 +51,25 @@ impl BCaster {
|
||||||
|
|
||||||
fn gossip(&self, runner: &Runner) {
|
fn gossip(&self, runner: &Runner) {
|
||||||
let mut rng = rand::thread_rng();
|
let mut rng = rand::thread_rng();
|
||||||
let rng = &mut rng;
|
let neighbor = &self.neighbors[rng.gen_range(0..self.neighbors.len())];
|
||||||
for node in self.neighbors.iter() {
|
let goss = if let Some(ngoss) = self.gossips.get(neighbor) {
|
||||||
let mut goss = if let Some(neighbor) = self.gossips.get(node) {
|
self.store.difference(ngoss).copied().collect()
|
||||||
self.store.difference(neighbor).cloned().collect()
|
} else {
|
||||||
} else {
|
self.store.clone()
|
||||||
self.store.clone()
|
};
|
||||||
};
|
let goss: Vec<_> = goss.into_iter().collect();
|
||||||
for v in self.store.iter().filter(|_| rng.gen_bool(0.10)) {
|
let payload: Payload = [("goss".to_string(), goss.into())].into_iter().collect();
|
||||||
goss.insert(*v);
|
let body = Body::from_type("gossip").with_payload(payload);
|
||||||
}
|
runner.send(neighbor, body);
|
||||||
if !goss.is_empty() && rng.gen_bool(0.6) {
|
|
||||||
let goss: Vec<_> = goss.into_iter().collect();
|
|
||||||
let payload: Payload = [("goss".to_string(), goss.into())].into_iter().collect();
|
|
||||||
let body = Body::from_type("gossip").with_payload(payload);
|
|
||||||
runner.send(node, body.clone());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
// now the non-neighbor
|
||||||
let other = &self.others[rng.gen_range(0..self.others.len())];
|
let other = &self.others[rng.gen_range(0..self.others.len())];
|
||||||
let goss: Vec<_> = self.store.iter().cloned().collect();
|
let goss = if let Some(goss) = self.gossips.get(other) {
|
||||||
|
self.store.difference(goss).copied().collect()
|
||||||
|
} else {
|
||||||
|
self.store.clone()
|
||||||
|
};
|
||||||
|
let goss: Vec<_> = goss.into_iter().collect();
|
||||||
let payload: Payload = [("goss".to_string(), goss.into())].into_iter().collect();
|
let payload: Payload = [("goss".to_string(), goss.into())].into_iter().collect();
|
||||||
let body = Body::from_type("gossip").with_payload(payload);
|
let body = Body::from_type("gossip").with_payload(payload);
|
||||||
runner.send(other, body);
|
runner.send(other, body);
|
||||||
|
@ -88,10 +86,12 @@ impl Node for BCaster {
|
||||||
}
|
}
|
||||||
"broadcast" => {
|
"broadcast" => {
|
||||||
let val = req.body.payload.get("message").and_then(|v| v.as_i64());
|
let val = req.body.payload.get("message").and_then(|v| v.as_i64());
|
||||||
|
let sender = req.src.clone();
|
||||||
if let Some(val) = val {
|
if let Some(val) = val {
|
||||||
self.store.insert(val);
|
self.store.insert(val);
|
||||||
let body = Body::from_type("broadcast_ok");
|
let body = Body::from_type("broadcast_ok");
|
||||||
runner.reply(&req, body);
|
runner.reply(&req, body);
|
||||||
|
self.gossips.entry(sender).or_default().insert(val);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
"read" => {
|
"read" => {
|
||||||
|
|
Loading…
Reference in a new issue