actually pass efficient broadcast
This commit is contained in:
parent
4d6df5c8a5
commit
f2af9d6a5a
1 changed files with 37 additions and 31 deletions
|
@ -18,7 +18,7 @@ fn main() {
|
||||||
fn on_init(runner: &Runner) {
|
fn on_init(runner: &Runner) {
|
||||||
let tx = runner.get_backdoor();
|
let tx = runner.get_backdoor();
|
||||||
thread::spawn(move || loop {
|
thread::spawn(move || loop {
|
||||||
let millis = rand::thread_rng().gen_range(400..=800);
|
let millis = rand::thread_rng().gen_range(50..=60);
|
||||||
thread::sleep(Duration::from_millis(millis));
|
thread::sleep(Duration::from_millis(millis));
|
||||||
let body = Body::from_type("do_gossip");
|
let body = Body::from_type("do_gossip");
|
||||||
let msg = Message {
|
let msg = Message {
|
||||||
|
@ -33,35 +33,19 @@ fn on_init(runner: &Runner) {
|
||||||
struct BCaster {
|
struct BCaster {
|
||||||
store: HashSet<i64>,
|
store: HashSet<i64>,
|
||||||
gossips: HashMap<String, HashSet<i64>>,
|
gossips: HashMap<String, HashSet<i64>>,
|
||||||
neighbors: HashSet<String>,
|
neighbors: Vec<String>,
|
||||||
|
others: Vec<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
const NUM_NEIGHBORS: i32 = 5;
|
|
||||||
|
|
||||||
impl BCaster {
|
impl BCaster {
|
||||||
fn topology(&mut self, nodes: &[String], id: &str) {
|
fn topology(&mut self, topology: HashMap<&str, Vec<String>>, id: &str) {
|
||||||
let mut nodes = nodes.to_vec();
|
self.neighbors = topology[id].clone();
|
||||||
let len = nodes.len();
|
self.neighbors.sort_unstable();
|
||||||
nodes.sort_unstable();
|
for &node in topology.keys() {
|
||||||
let idx = nodes
|
let node = node.to_string();
|
||||||
.iter()
|
if !(node == id || self.neighbors.binary_search(&node).is_ok()) {
|
||||||
.enumerate()
|
self.others.push(node);
|
||||||
.find(|n| n.1.as_str() == id)
|
}
|
||||||
.unwrap()
|
|
||||||
.0;
|
|
||||||
for i in 1..=NUM_NEIGHBORS {
|
|
||||||
let ni = idx as i32 - i;
|
|
||||||
let left = if ni < 0 {
|
|
||||||
let ni = -ni;
|
|
||||||
let ni = ni as usize % len;
|
|
||||||
len - ni
|
|
||||||
} else {
|
|
||||||
ni as usize
|
|
||||||
};
|
|
||||||
|
|
||||||
let right = (idx + i as usize) % len;
|
|
||||||
self.neighbors.insert(nodes[left].clone());
|
|
||||||
self.neighbors.insert(nodes[right].clone());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -74,14 +58,16 @@ impl BCaster {
|
||||||
} else {
|
} else {
|
||||||
self.store.clone()
|
self.store.clone()
|
||||||
};
|
};
|
||||||
for v in self.store.iter().filter(|_| rng.gen_bool(0.11)) {
|
for v in self.store.iter().filter(|_| rng.gen_bool(0.20)) {
|
||||||
goss.insert(*v);
|
goss.insert(*v);
|
||||||
}
|
}
|
||||||
if !goss.is_empty() {
|
if !goss.is_empty() && rng.gen_bool(0.5) {
|
||||||
let goss: Vec<_> = goss.into_iter().collect();
|
let goss: Vec<_> = goss.into_iter().collect();
|
||||||
let payload: Payload = [("goss".to_string(), goss.into())].into_iter().collect();
|
let payload: Payload = [("goss".to_string(), goss.into())].into_iter().collect();
|
||||||
let body = Body::from_type("gossip").with_payload(payload);
|
let body = Body::from_type("gossip").with_payload(payload);
|
||||||
runner.send(node, body);
|
runner.send(node, body.clone());
|
||||||
|
let other = &self.others[rng.gen_range(0..self.others.len())];
|
||||||
|
runner.send(other, body);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -114,7 +100,27 @@ impl Node for BCaster {
|
||||||
}
|
}
|
||||||
"topology" => {
|
"topology" => {
|
||||||
let nid = runner.node_id();
|
let nid = runner.node_id();
|
||||||
self.topology(runner.nodes(), nid);
|
let topology = req
|
||||||
|
.body
|
||||||
|
.payload
|
||||||
|
.get("topology")
|
||||||
|
.unwrap()
|
||||||
|
.as_object()
|
||||||
|
.unwrap();
|
||||||
|
let topology: HashMap<&str, Vec<String>> = topology
|
||||||
|
.iter()
|
||||||
|
.map(|(k, v)| {
|
||||||
|
(
|
||||||
|
k.as_str(),
|
||||||
|
v.as_array()
|
||||||
|
.unwrap()
|
||||||
|
.iter()
|
||||||
|
.map(|v| v.as_str().unwrap().to_string())
|
||||||
|
.collect(),
|
||||||
|
)
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
self.topology(topology, nid);
|
||||||
let body = Body::from_type("topology_ok");
|
let body = Body::from_type("topology_ok");
|
||||||
runner.reply(&req, body);
|
runner.reply(&req, body);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue