Make sure we inform statement listeners about received statements (#1999)

This commit is contained in:
Bastian Köcher
2020-11-24 11:50:27 +01:00
committed by GitHub
parent 495172c585
commit 698de0b4fa
@@ -312,7 +312,9 @@ impl PeerData {
fingerprint: &(CompactStatement, ValidatorIndex),
max_message_count: usize,
) -> Result<bool, Rep> {
self.view_knowledge.get_mut(relay_parent).ok_or(COST_UNEXPECTED_STATEMENT)?
self.view_knowledge
.get_mut(relay_parent)
.ok_or(COST_UNEXPECTED_STATEMENT)?
.receive(fingerprint, max_message_count)
}
}
@@ -492,13 +494,15 @@ fn check_statement_signature(
.and_then(|v| statement.check_signature(&signing_context, v))
}
type StatementListeners = Vec<mpsc::Sender<SignedFullStatement>>;
/// Informs all registered listeners about a newly received statement.
///
/// Removes all closed listeners.
#[tracing::instrument(level = "trace", skip(listeners), fields(subsystem = LOG_TARGET))]
async fn inform_statement_listeners(
statement: &SignedFullStatement,
listeners: &mut Vec<mpsc::Sender<SignedFullStatement>>,
listeners: &mut StatementListeners,
) {
// Ignore the errors since these will be removed later.
stream::iter(listeners.iter_mut()).for_each_concurrent(
@@ -524,36 +528,38 @@ async fn circulate_statement_and_dependents(
statement: SignedFullStatement,
metrics: &Metrics,
) {
if let Some(active_head)= active_heads.get_mut(&relay_parent) {
let active_head = match active_heads.get_mut(&relay_parent) {
Some(res) => res,
None => return,
};
// First circulate the statement directly to all peers needing it.
// The borrow of `active_head` needs to encompass only this (Rust) statement.
let outputs: Option<(CandidateHash, Vec<PeerId>)> = {
match active_head.note_statement(statement) {
NotedStatement::Fresh(stored) => Some((
*stored.compact().candidate_hash(),
circulate_statement(peers, ctx, relay_parent, stored).await,
)),
_ => None,
}
};
// First circulate the statement directly to all peers needing it.
// The borrow of `active_head` needs to encompass only this (Rust) statement.
let outputs: Option<(CandidateHash, Vec<PeerId>)> = {
match active_head.note_statement(statement) {
NotedStatement::Fresh(stored) => Some((
*stored.compact().candidate_hash(),
circulate_statement(peers, ctx, relay_parent, stored).await,
)),
_ => None,
}
};
// Now send dependent statements to all peers needing them, if any.
if let Some((candidate_hash, peers_needing_dependents)) = outputs {
for peer in peers_needing_dependents {
if let Some(peer_data) = peers.get_mut(&peer) {
// defensive: the peer data should always be some because the iterator
// of peers is derived from the set of peers.
send_statements_about(
peer,
peer_data,
ctx,
relay_parent,
candidate_hash,
&*active_head,
metrics,
).await;
}
// Now send dependent statements to all peers needing them, if any.
if let Some((candidate_hash, peers_needing_dependents)) = outputs {
for peer in peers_needing_dependents {
if let Some(peer_data) = peers.get_mut(&peer) {
// defensive: the peer data should always be some because the iterator
// of peers is derived from the set of peers.
send_statements_about(
peer,
peer_data,
ctx,
relay_parent,
candidate_hash,
&*active_head,
metrics,
).await;
}
}
}
@@ -679,6 +685,7 @@ async fn handle_incoming_message<'a>(
ctx: &mut impl SubsystemContext<Message = StatementDistributionMessage>,
message: protocol_v1::StatementDistributionMessage,
metrics: &Metrics,
statement_listeners: &mut StatementListeners,
) -> Option<(Hash, &'a StoredStatement)> {
let (relay_parent, statement) = match message {
protocol_v1::StatementDistributionMessage::Statement(r, s) => (r, s),
@@ -735,6 +742,8 @@ async fn handle_incoming_message<'a>(
Ok(false) => {}
}
inform_statement_listeners(&statement, statement_listeners).await;
// Note: `peer_data.receive` already ensures that the statement is not an unbounded equivocation
// or unpinned to a seconded candidate. So it is safe to place it into the storage.
match active_head.note_statement(statement) {
@@ -794,6 +803,7 @@ async fn handle_network_update(
our_view: &mut View,
update: NetworkBridgeEvent<protocol_v1::StatementDistributionMessage>,
metrics: &Metrics,
statement_listeners: &mut StatementListeners,
) {
match update {
NetworkBridgeEvent::PeerConnected(peer, _role) => {
@@ -816,6 +826,7 @@ async fn handle_network_update(
ctx,
message,
metrics,
statement_listeners,
).await;
if let Some((relay_parent, new)) = new_stored {
@@ -875,7 +886,7 @@ impl StatementDistribution {
let mut peers: HashMap<PeerId, PeerData> = HashMap::new();
let mut our_view = View::default();
let mut active_heads: HashMap<Hash, ActiveHeadData> = HashMap::new();
let mut statement_listeners: Vec<mpsc::Sender<SignedFullStatement>> = Vec::new();
let mut statement_listeners = StatementListeners::new();
let metrics = self.metrics;
loop {
@@ -958,6 +969,7 @@ impl StatementDistribution {
&mut our_view,
event,
&metrics,
&mut statement_listeners,
).await;
}
StatementDistributionMessage::RegisterStatementListener(tx) => {