Notify collators about seconded collation (#2430)

* Notify collators about seconded collation

This pr adds functionality to inform a collator that its collation was
seconded by a parachain validator. Before this signed statement was only
gossiped over the validation substream. Now, we explicitly send the
seconded statement to the collator after it was validated successfully.

Besides that it changes the `CollatorFn` to return an optional result
sender that is informed when the build collation was seconded by a
parachain validator.

* Add test

* Make sure we only send `Seconded` statements

* Make sure we only receive valid statements

* Review feedback
This commit is contained in:
Bastian Köcher
2021-02-14 17:36:04 +01:00
committed by GitHub
parent 1b3d00d9d3
commit 4975521d48
21 changed files with 315 additions and 147 deletions
@@ -40,7 +40,7 @@ use polkadot_node_network_protocol::{
};
use futures::prelude::*;
use futures::channel::{mpsc, oneshot};
use futures::channel::oneshot;
use indexmap::IndexSet;
use std::collections::{HashMap, HashSet};
@@ -499,27 +499,6 @@ fn check_statement_signature(
.and_then(|v| statement.check_signature(&signing_context, v))
}
type StatementListeners = Vec<mpsc::Sender<SignedFullStatement>>;
/// Informs all registered listeners about a newly received statement.
///
/// Removes all closed listeners.
#[tracing::instrument(level = "trace", skip(listeners), fields(subsystem = LOG_TARGET))]
async fn inform_statement_listeners(
statement: &SignedFullStatement,
listeners: &mut StatementListeners,
) {
// Ignore the errors since these will be removed later.
stream::iter(listeners.iter_mut()).for_each_concurrent(
None,
|listener| async move {
let _ = listener.send(statement.clone()).await;
}
).await;
// Remove any closed listeners.
listeners.retain(|tx| !tx.is_closed());
}
/// Places the statement in storage if it is new, and then
/// circulates the statement to all peers who have not seen it yet, and
/// sends all statements dependent on that statement to peers who could previously not receive
@@ -699,7 +678,6 @@ async fn handle_incoming_message<'a>(
ctx: &mut impl SubsystemContext<Message = StatementDistributionMessage>,
message: protocol_v1::StatementDistributionMessage,
metrics: &Metrics,
statement_listeners: &mut StatementListeners,
) -> Option<(Hash, &'a StoredStatement)> {
let (relay_parent, statement) = match message {
protocol_v1::StatementDistributionMessage::Statement(r, s) => (r, s),
@@ -770,8 +748,6 @@ async fn handle_incoming_message<'a>(
Ok(false) => {}
}
inform_statement_listeners(&statement, statement_listeners).await;
// Note: `peer_data.receive` already ensures that the statement is not an unbounded equivocation
// or unpinned to a seconded candidate. So it is safe to place it into the storage.
match active_head.note_statement(statement) {
@@ -841,7 +817,6 @@ async fn handle_network_update(
our_view: &mut OurView,
update: NetworkBridgeEvent<protocol_v1::StatementDistributionMessage>,
metrics: &Metrics,
statement_listeners: &mut StatementListeners,
) {
match update {
NetworkBridgeEvent::PeerConnected(peer, _role) => {
@@ -864,7 +839,6 @@ async fn handle_network_update(
ctx,
message,
metrics,
statement_listeners,
).await
}
None => None,
@@ -931,7 +905,6 @@ impl StatementDistribution {
let mut peers: HashMap<PeerId, PeerData> = HashMap::new();
let mut our_view = OurView::default();
let mut active_heads: HashMap<Hash, ActiveHeadData> = HashMap::new();
let mut statement_listeners = StatementListeners::new();
let metrics = self.metrics;
loop {
@@ -993,10 +966,6 @@ impl StatementDistribution {
StatementDistributionMessage::Share(relay_parent, statement) => {
let _timer = metrics.time_share();
inform_statement_listeners(
&statement,
&mut statement_listeners,
).await;
circulate_statement_and_dependents(
&mut peers,
&mut active_heads,
@@ -1016,12 +985,8 @@ impl StatementDistribution {
&mut our_view,
event,
&metrics,
&mut statement_listeners,
).await;
}
StatementDistributionMessage::RegisterStatementListener(tx) => {
statement_listeners.push(tx);
}
}
}
}