Register listeners in statement distribution (#1759)

* Register listeners in statement distribution

* Review fixes
This commit is contained in:
Fedor Sakharov
2020-09-28 16:55:59 +03:00
committed by GitHub
parent 32cc9e7a44
commit 9756b2d676
3 changed files with 35 additions and 3 deletions
@@ -36,7 +36,7 @@ use polkadot_node_network_protocol::{
};
use futures::prelude::*;
use futures::channel::oneshot;
use futures::channel::{mpsc, oneshot};
use indexmap::IndexSet;
use std::collections::{HashMap, HashSet};
@@ -476,6 +476,24 @@ fn check_statement_signature(
.and_then(|v| statement.check_signature(&signing_context, v))
}
/// Informs all registered listeners about a newly received statement.
///
/// Removes all closed listeners.
async fn inform_statement_listeners(
statement: &SignedFullStatement,
listeners: &mut Vec<mpsc::Sender<SignedFullStatement>>,
) {
// Ignore the errors since these will be removed later.
stream::iter(listeners.iter_mut()).for_each_concurrent(
None,
|listener| async move {
let _ = listener.send(statement.clone()).await;
}
).await;
// Remove any closed listeners.
listeners.retain(|tx| !tx.is_closed());
}
/// Places the statement in storage if it is new, and then
/// circulates the statement to all peers who have not seen it yet, and
/// sends all statements dependent on that statement to peers who could previously not receive
@@ -821,6 +839,7 @@ async fn run(
let mut peers: HashMap<PeerId, PeerData> = HashMap::new();
let mut our_view = View::default();
let mut active_heads: HashMap<Hash, ActiveHeadData> = HashMap::new();
let mut statement_listeners: Vec<mpsc::Sender<SignedFullStatement>> = Vec::new();
loop {
let message = ctx.recv().await?;
@@ -874,14 +893,19 @@ async fn run(
}
FromOverseer::Signal(OverseerSignal::Conclude) => break,
FromOverseer::Communication { msg } => match msg {
StatementDistributionMessage::Share(relay_parent, statement) =>
StatementDistributionMessage::Share(relay_parent, statement) => {
inform_statement_listeners(
&statement,
&mut statement_listeners,
).await;
circulate_statement_and_dependents(
&mut peers,
&mut active_heads,
&mut ctx,
relay_parent,
statement,
).await?,
).await?;
}
StatementDistributionMessage::NetworkBridgeUpdateV1(event) =>
handle_network_update(
&mut peers,
@@ -890,6 +914,9 @@ async fn run(
&mut our_view,
event,
).await?,
StatementDistributionMessage::RegisterStatementListener(tx) => {
statement_listeners.push(tx);
}
}
}
}