try to be fault tolerant, fail.
This commit is contained in:
parent
75aaf3418b
commit
0be4e33b36
1 changed files with 29 additions and 1 deletions
|
@ -17,10 +17,12 @@ async fn main() {
|
||||||
}
|
}
|
||||||
|
|
||||||
type Store = Arc<Mutex<HashMap<String, i64>>>;
|
type Store = Arc<Mutex<HashMap<String, i64>>>;
|
||||||
|
type Gossips = Arc<Mutex<HashMap<(u64, String), MessageBody>>>;
|
||||||
|
|
||||||
#[derive(Clone, Default)]
|
#[derive(Clone, Default)]
|
||||||
struct BCaster {
|
struct BCaster {
|
||||||
pub store: Store,
|
pub store: Store,
|
||||||
|
pub gossips: Gossips,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
|
@ -31,6 +33,18 @@ impl Node for BCaster {
|
||||||
let mid = req.body.msg_id;
|
let mid = req.body.msg_id;
|
||||||
let key = format!("{frm}{mid}");
|
let key = format!("{frm}{mid}");
|
||||||
let key = key.as_str();
|
let key = key.as_str();
|
||||||
|
|
||||||
|
{
|
||||||
|
let gossips: Vec<((_, String), MessageBody)> = {
|
||||||
|
let g = self.gossips.lock().unwrap();
|
||||||
|
g.clone().into_iter().collect()
|
||||||
|
};
|
||||||
|
|
||||||
|
for ((_, node), gossip) in gossips.into_iter() {
|
||||||
|
runtime.send(node, gossip).await.unwrap_or_default();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
match typ {
|
match typ {
|
||||||
"broadcast" => {
|
"broadcast" => {
|
||||||
let val = req.body.extra.get("message").and_then(|v| v.as_i64());
|
let val = req.body.extra.get("message").and_then(|v| v.as_i64());
|
||||||
|
@ -49,7 +63,13 @@ impl Node for BCaster {
|
||||||
.collect();
|
.collect();
|
||||||
let gossip = MessageBody::from_extra(extra).with_type("gossip");
|
let gossip = MessageBody::from_extra(extra).with_type("gossip");
|
||||||
for node in runtime.neighbours() {
|
for node in runtime.neighbours() {
|
||||||
runtime.send(node, gossip.clone()).await?;
|
let id = runtime.next_msg_id();
|
||||||
|
let gossip = gossip.clone().and_msg_id(id);
|
||||||
|
{
|
||||||
|
let mut g = self.gossips.lock().unwrap();
|
||||||
|
g.insert((id, node.to_owned()), gossip.clone());
|
||||||
|
}
|
||||||
|
runtime.send(node, gossip).await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
let body = MessageBody::new().with_type("broadcast_ok");
|
let body = MessageBody::new().with_type("broadcast_ok");
|
||||||
|
@ -84,6 +104,14 @@ impl Node for BCaster {
|
||||||
let body = MessageBody::new().with_type("gossip_ok");
|
let body = MessageBody::new().with_type("gossip_ok");
|
||||||
return runtime.reply(req, body).await;
|
return runtime.reply(req, body).await;
|
||||||
}
|
}
|
||||||
|
"gossip_ok" => {
|
||||||
|
let id = req.body.in_reply_to;
|
||||||
|
let key = (id, frm.to_owned());
|
||||||
|
{
|
||||||
|
let mut g = self.gossips.lock().unwrap();
|
||||||
|
g.remove(&key);
|
||||||
|
}
|
||||||
|
}
|
||||||
_ => {}
|
_ => {}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue