just gossip to one non-self node 2x as often, better latencies, same msgs/op
This commit is contained in:
parent
024005d59e
commit
47d7118f04
1 changed files with 22 additions and 35 deletions
|
@ -18,7 +18,7 @@ fn main() {
|
||||||
fn on_init(runner: &Runner) {
|
fn on_init(runner: &Runner) {
|
||||||
let tx = runner.get_backdoor();
|
let tx = runner.get_backdoor();
|
||||||
thread::spawn(move || loop {
|
thread::spawn(move || loop {
|
||||||
let millis = 50;
|
let millis = 25;
|
||||||
thread::sleep(Duration::from_millis(millis));
|
thread::sleep(Duration::from_millis(millis));
|
||||||
let body = Body::from_type("do_gossip");
|
let body = Body::from_type("do_gossip");
|
||||||
let msg = Message {
|
let msg = Message {
|
||||||
|
@ -33,45 +33,30 @@ fn on_init(runner: &Runner) {
|
||||||
struct BCaster {
|
struct BCaster {
|
||||||
store: BTreeSet<i64>,
|
store: BTreeSet<i64>,
|
||||||
gossips: HashMap<String, BTreeSet<i64>>,
|
gossips: HashMap<String, BTreeSet<i64>>,
|
||||||
neighbors: Vec<String>,
|
nodes: Vec<String>,
|
||||||
others: Vec<String>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl BCaster {
|
impl BCaster {
|
||||||
fn topology(&mut self, topology: HashMap<String, Vec<String>>, id: &str) {
|
|
||||||
self.neighbors = topology[id].clone();
|
|
||||||
self.neighbors.sort_unstable();
|
|
||||||
for node in topology.keys() {
|
|
||||||
if !(node == id || self.neighbors.binary_search(node).is_ok()) {
|
|
||||||
self.others.push(node.to_owned());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn gossip(&self, runner: &Runner) {
|
fn gossip(&self, runner: &Runner) {
|
||||||
let mut rng = rand::thread_rng();
|
if self.nodes.is_empty() {
|
||||||
let neighbor = &self.neighbors[rng.gen_range(0..self.neighbors.len())];
|
return;
|
||||||
let goss = if let Some(ngoss) = self.gossips.get(neighbor) {
|
}
|
||||||
self.store.difference(ngoss).copied().collect()
|
|
||||||
} else {
|
|
||||||
self.store.clone()
|
|
||||||
};
|
|
||||||
let goss: Vec<_> = goss.into_iter().collect();
|
|
||||||
let payload: Payload = [("goss".to_string(), goss.into())].into_iter().collect();
|
|
||||||
let body = Body::from_type("gossip").with_payload(payload);
|
|
||||||
runner.send(neighbor, body);
|
|
||||||
|
|
||||||
// now the non-neighbor
|
let mut rng = rand::thread_rng();
|
||||||
let other = &self.others[rng.gen_range(0..self.others.len())];
|
let dest = &self.nodes[rng.gen_range(0..self.nodes.len())];
|
||||||
let goss = if let Some(goss) = self.gossips.get(other) {
|
let goss = if let Some(goss) = self.gossips.get(dest) {
|
||||||
self.store.difference(goss).copied().collect()
|
self.store.difference(goss).copied().collect()
|
||||||
} else {
|
} else {
|
||||||
self.store.clone()
|
self.store.clone()
|
||||||
};
|
};
|
||||||
let goss: Vec<_> = goss.into_iter().collect();
|
let payload: Payload = [(
|
||||||
let payload: Payload = [("goss".to_string(), goss.into())].into_iter().collect();
|
"goss".to_string(),
|
||||||
|
goss.into_iter().collect::<Vec<_>>().into(),
|
||||||
|
)]
|
||||||
|
.into_iter()
|
||||||
|
.collect();
|
||||||
let body = Body::from_type("gossip").with_payload(payload);
|
let body = Body::from_type("gossip").with_payload(payload);
|
||||||
runner.send(other, body);
|
runner.send(dest, body);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -103,11 +88,13 @@ impl Node for BCaster {
|
||||||
runner.reply(&req, body);
|
runner.reply(&req, body);
|
||||||
}
|
}
|
||||||
"topology" => {
|
"topology" => {
|
||||||
let nid = runner.node_id();
|
let nodes = runner
|
||||||
let topology =
|
.nodes()
|
||||||
serde_json::from_value(req.body.payload.get("topology").cloned().unwrap())
|
.iter()
|
||||||
.unwrap();
|
.filter(|&n| n.as_str() != runner.node_id())
|
||||||
self.topology(topology, nid);
|
.cloned()
|
||||||
|
.collect();
|
||||||
|
self.nodes = nodes;
|
||||||
let body = Body::from_type("topology_ok");
|
let body = Body::from_type("topology_ok");
|
||||||
runner.reply(&req, body);
|
runner.reply(&req, body);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue