From 4975521d481a233423eed29ba271fbefb6390bd0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastian=20K=C3=B6cher?= Date: Sun, 14 Feb 2021 17:36:04 +0100 Subject: [PATCH] Notify collators about seconded collation (#2430) * Notify collators about seconded collation This pr adds functionality to inform a collator that its collation was seconded by a parachain validator. Before this signed statement was only gossiped over the validation substream. Now, we explicitly send the seconded statement to the collator after it was validated successfully. Besides that it changes the `CollatorFn` to return an optional result sender that is informed when the build collation was seconded by a parachain validator. * Add test * Make sure we only send `Seconded` statements * Make sure we only receive valid statements * Review feedback --- polkadot/Cargo.lock | 3 + polkadot/node/collation-generation/src/lib.rs | 13 +-- polkadot/node/core/backing/src/lib.rs | 33 +++++- .../node/core/candidate-selection/Cargo.toml | 1 + .../node/core/candidate-selection/src/lib.rs | 49 ++++++++- .../node/network/collator-protocol/Cargo.toml | 2 +- .../collator-protocol/src/collator_side.rs | 40 +++++-- .../collator-protocol/src/validator_side.rs | 100 +++++++++--------- polkadot/node/network/protocol/src/lib.rs | 3 + .../network/statement-distribution/src/lib.rs | 37 +------ polkadot/node/overseer/src/lib.rs | 4 +- polkadot/node/primitives/src/lib.rs | 28 ++++- polkadot/node/subsystem/src/messages.rs | 21 ++-- .../test-parachains/adder/collator/Cargo.toml | 1 + .../test-parachains/adder/collator/src/lib.rs | 61 +++++++++-- .../adder/collator/src/main.rs | 2 +- .../adder/collator/tests/integration.rs | 9 +- .../node/collators/collation-generation.md | 24 ++++- .../src/node/subsystems-and-jobs.md | 6 -- .../implementers-guide/src/types/network.md | 2 + .../src/types/overseer-protocol.md | 23 ++-- 21 files changed, 315 insertions(+), 147 deletions(-) diff --git a/polkadot/Cargo.lock b/polkadot/Cargo.lock index 0a037ae767..f204c3e14f 100644 --- a/polkadot/Cargo.lock +++ b/polkadot/Cargo.lock @@ -5157,6 +5157,7 @@ dependencies = [ "futures-timer 3.0.2", "log", "polkadot-node-network-protocol", + "polkadot-node-primitives", "polkadot-node-subsystem", "polkadot-node-subsystem-test-helpers", "polkadot-node-subsystem-util", @@ -5337,6 +5338,7 @@ name = "polkadot-node-core-candidate-selection" version = "0.1.0" dependencies = [ "futures 0.3.12", + "polkadot-node-primitives", "polkadot-node-subsystem", "polkadot-node-subsystem-util", "polkadot-primitives", @@ -9286,6 +9288,7 @@ dependencies = [ name = "test-parachain-adder-collator" version = "0.7.26" dependencies = [ + "assert_matches", "futures 0.3.12", "futures-timer 3.0.2", "log", diff --git a/polkadot/node/collation-generation/src/lib.rs b/polkadot/node/collation-generation/src/lib.rs index 09f3db3fd0..4a93152f97 100644 --- a/polkadot/node/collation-generation/src/lib.rs +++ b/polkadot/node/collation-generation/src/lib.rs @@ -276,8 +276,8 @@ async fn handle_new_activations( ctx.spawn("collation generation collation builder", Box::pin(async move { let persisted_validation_data_hash = validation_data.hash(); - let collation = match (task_config.collator)(relay_parent, &validation_data).await { - Some(collation) => collation, + let (collation, result_sender) = match (task_config.collator)(relay_parent, &validation_data).await { + Some(collation) => collation.into_inner(), None => { tracing::debug!( target: LOG_TARGET, @@ -348,7 +348,7 @@ async fn handle_new_activations( metrics.on_collation_generated(); if let Err(err) = task_sender.send(AllMessages::CollatorProtocol( - CollatorProtocolMessage::DistributeCollation(ccr, collation.proof_of_validity) + CollatorProtocolMessage::DistributeCollation(ccr, collation.proof_of_validity, result_sender) )).await { tracing::warn!( target: LOG_TARGET, @@ -465,7 +465,7 @@ mod tests { task::{Context as FuturesContext, Poll}, Future, }; - use polkadot_node_primitives::Collation; + use polkadot_node_primitives::{Collation, CollationResult}; use polkadot_node_subsystem::messages::{ AllMessages, RuntimeApiMessage, RuntimeApiRequest, }; @@ -496,10 +496,10 @@ mod tests { struct TestCollator; impl Future for TestCollator { - type Output = Option; + type Output = Option; fn poll(self: Pin<&mut Self>, _cx: &mut FuturesContext) -> Poll { - Poll::Ready(Some(test_collation())) + Poll::Ready(Some(CollationResult { collation: test_collation(), result_sender: None })) } } @@ -755,6 +755,7 @@ mod tests { AllMessages::CollatorProtocol(CollatorProtocolMessage::DistributeCollation( CandidateReceipt { descriptor, .. }, _pov, + .. )) => { // signature generation is non-deterministic, so we can't just assert that the // expected descriptor is correct. What we can do is validate that the produced diff --git a/polkadot/node/core/backing/src/lib.rs b/polkadot/node/core/backing/src/lib.rs index 5a8a783695..4551c4feaa 100644 --- a/polkadot/node/core/backing/src/lib.rs +++ b/polkadot/node/core/backing/src/lib.rs @@ -528,7 +528,12 @@ impl CandidateBackingJob { descriptor: candidate.descriptor.clone(), commitments, }); - self.sign_import_and_distribute_statement(statement, parent_span).await?; + if let Some(stmt) = self.sign_import_and_distribute_statement( + statement, + parent_span, + ).await? { + self.issue_candidate_seconded_message(stmt).await?; + } self.distribute_pov(candidate.descriptor, pov).await?; } } @@ -586,6 +591,15 @@ impl CandidateBackingJob { Ok(()) } + async fn issue_candidate_seconded_message( + &mut self, + statement: SignedFullStatement, + ) -> Result<(), Error> { + self.tx_from.send(AllMessages::from(CandidateSelectionMessage::Seconded(self.parent, statement)).into()).await?; + + Ok(()) + } + /// Kick off background validation with intent to second. #[tracing::instrument(level = "trace", skip(self, parent_span, pov), fields(subsystem = LOG_TARGET))] async fn validate_and_second( @@ -631,13 +645,14 @@ impl CandidateBackingJob { &mut self, statement: Statement, parent_span: &JaegerSpan, - ) -> Result<(), Error> { + ) -> Result, Error> { if let Some(signed_statement) = self.sign_statement(statement).await { self.import_statement(&signed_statement, parent_span).await?; - self.distribute_signed_statement(signed_statement).await?; + self.distribute_signed_statement(signed_statement.clone()).await?; + Ok(Some(signed_statement)) + } else { + Ok(None) } - - Ok(()) } /// Check if there have happened any new misbehaviors and issue necessary messages. @@ -1486,6 +1501,14 @@ mod tests { } ); + assert_matches!( + virtual_overseer.recv().await, + AllMessages::CandidateSelection(CandidateSelectionMessage::Seconded(hash, statement)) => { + assert_eq!(test_state.relay_parent, hash); + assert_matches!(statement.payload(), Statement::Seconded(_)); + } + ); + assert_matches!( virtual_overseer.recv().await, AllMessages::PoVDistribution(PoVDistributionMessage::DistributePoV(hash, descriptor, pov_received)) => { diff --git a/polkadot/node/core/candidate-selection/Cargo.toml b/polkadot/node/core/candidate-selection/Cargo.toml index c85a0cef3c..f5b473d0be 100644 --- a/polkadot/node/core/candidate-selection/Cargo.toml +++ b/polkadot/node/core/candidate-selection/Cargo.toml @@ -14,6 +14,7 @@ sp-keystore = { git = "https://github.com/paritytech/substrate", branch = "maste polkadot-primitives = { path = "../../../primitives" } polkadot-node-subsystem = { path = "../../subsystem" } +polkadot-node-primitives = { path = "../../primitives" } polkadot-node-subsystem-util = { path = "../../subsystem-util" } [dev-dependencies] diff --git a/polkadot/node/core/candidate-selection/src/lib.rs b/polkadot/node/core/candidate-selection/src/lib.rs index 51eaa80a47..daee7d530d 100644 --- a/polkadot/node/core/candidate-selection/src/lib.rs +++ b/polkadot/node/core/candidate-selection/src/lib.rs @@ -39,6 +39,7 @@ use polkadot_node_subsystem_util::{ use polkadot_primitives::v1::{ CandidateReceipt, CollatorId, CoreState, CoreIndex, Hash, Id as ParaId, PoV, }; +use polkadot_node_primitives::SignedFullStatement; use std::{pin::Pin, sync::Arc}; use thiserror::Error; @@ -190,6 +191,10 @@ impl CandidateSelectionJob { let _span = span.child("handle-invalid"); self.handle_invalid(candidate_receipt).await; } + Some(CandidateSelectionMessage::Seconded(_, statement)) => { + let _span = span.child("handle-seconded"); + self.handle_seconded(statement).await; + } None => break, } } @@ -251,9 +256,7 @@ impl CandidateSelectionJob { pov, &mut self.sender, &self.metrics, - ) - .await - { + ).await { Err(err) => tracing::warn!(target: LOG_TARGET, err = ?err, "failed to second a candidate"), Ok(()) => self.seconded_candidate = Some(collator_id), } @@ -293,6 +296,46 @@ impl CandidateSelectionJob { }; self.metrics.on_invalid_selection(result); } + + async fn handle_seconded(&mut self, statement: SignedFullStatement) { + let received_from = match &self.seconded_candidate { + Some(peer) => peer, + None => { + tracing::warn!( + target: LOG_TARGET, + "received seconded notice for a candidate we don't remember seconding" + ); + return; + } + }; + tracing::debug!( + target: LOG_TARGET, + statement = ?statement, + "received seconded note for candidate", + ); + + if let Err(e) = self.sender + .send(AllMessages::from(CollatorProtocolMessage::NoteGoodCollation(received_from.clone())).into()).await + { + tracing::debug!( + target: LOG_TARGET, + error = ?e, + "failed to note good collator" + ); + } + + if let Err(e) = self.sender + .send(AllMessages::from( + CollatorProtocolMessage::NotifyCollationSeconded(received_from.clone(), statement) + ).into()).await + { + tracing::debug!( + target: LOG_TARGET, + error = ?e, + "failed to notify collator about seconded collation" + ); + } + } } // get a collation from the Collator Protocol subsystem diff --git a/polkadot/node/network/collator-protocol/Cargo.toml b/polkadot/node/network/collator-protocol/Cargo.toml index f392924e96..6129467e9e 100644 --- a/polkadot/node/network/collator-protocol/Cargo.toml +++ b/polkadot/node/network/collator-protocol/Cargo.toml @@ -10,9 +10,9 @@ tracing = "0.1.22" tracing-futures = "0.2.4" thiserror = "1.0.23" - polkadot-primitives = { path = "../../../primitives" } polkadot-node-network-protocol = { path = "../../network/protocol" } +polkadot-node-primitives = { path = "../../primitives" } polkadot-node-subsystem-util = { path = "../../subsystem-util" } polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" } diff --git a/polkadot/node/network/collator-protocol/src/collator_side.rs b/polkadot/node/network/collator-protocol/src/collator_side.rs index 7cbb312498..57122567da 100644 --- a/polkadot/node/network/collator-protocol/src/collator_side.rs +++ b/polkadot/node/network/collator-protocol/src/collator_side.rs @@ -18,10 +18,10 @@ use std::collections::{HashMap, HashSet}; use super::{LOG_TARGET, Result}; -use futures::{select, FutureExt}; +use futures::{select, FutureExt, channel::oneshot}; use polkadot_primitives::v1::{ - CollatorId, CoreIndex, CoreState, Hash, Id as ParaId, CandidateReceipt, PoV, ValidatorId, + CollatorId, CoreIndex, CoreState, Hash, Id as ParaId, CandidateReceipt, PoV, ValidatorId, CandidateHash, }; use polkadot_subsystem::{ jaeger, PerLeafSpan, @@ -38,6 +38,7 @@ use polkadot_node_subsystem_util::{ request_availability_cores_ctx, metrics::{self, prometheus}, }; +use polkadot_node_primitives::{SignedFullStatement, Statement}; #[derive(Clone, Default)] pub struct Metrics(Option); @@ -195,6 +196,9 @@ struct State { /// We will keep up to one local collation per relay-parent. collations: HashMap, + /// The result senders per collation. + collation_result_senders: HashMap>, + /// Our validator groups per active leaf. our_validators_groups: HashMap, @@ -230,6 +234,7 @@ async fn distribute_collation( id: ParaId, receipt: CandidateReceipt, pov: PoV, + result_sender: Option>, ) -> Result<()> { let relay_parent = receipt.descriptor.relay_parent; @@ -289,6 +294,10 @@ async fn distribute_collation( state.our_validators_groups.insert(relay_parent, current_validators.into()); + if let Some(result_sender) = result_sender { + state.collation_result_senders.insert(receipt.hash(), result_sender); + } + state.collations.insert(relay_parent, (receipt, pov)); Ok(()) @@ -438,7 +447,7 @@ async fn process_msg( CollateOn(id) => { state.collating_on = Some(id); } - DistributeCollation(receipt, pov) => { + DistributeCollation(receipt, pov, result_sender) => { let _span1 = state.span_per_relay_parent .get(&receipt.descriptor.relay_parent).map(|s| s.child("distributing-collation")); let _span2 = jaeger::pov_span(&pov, "distributing-collation"); @@ -454,7 +463,7 @@ async fn process_msg( ); } Some(id) => { - distribute_collation(ctx, state, id, receipt, pov).await?; + distribute_collation(ctx, state, id, receipt, pov, result_sender).await?; } None => { tracing::warn!( @@ -483,6 +492,12 @@ async fn process_msg( "NoteGoodCollation message is not expected on the collator side of the protocol", ); } + NotifyCollationSeconded(_, _) => { + tracing::warn!( + target: LOG_TARGET, + "NotifyCollationSeconded message is not expected on the collator side of the protocol", + ); + } NetworkBridgeUpdateV1(event) => { if let Err(e) = handle_network_msg( ctx, @@ -591,6 +606,17 @@ async fn handle_incoming_peer_message( "Collation message is not expected on the collator side of the protocol", ); } + CollationSeconded(statement) => { + if !matches!(statement.payload(), Statement::Seconded(_)) { + tracing::warn!( + target: LOG_TARGET, + statement = ?statement, + "Collation seconded message received with none-seconded statement.", + ); + } else if let Some(sender) = state.collation_result_senders.remove(&statement.payload().candidate_hash()) { + let _ = sender.send(statement); + } + } } Ok(()) @@ -685,7 +711,9 @@ async fn handle_our_view_change( view: OurView, ) -> Result<()> { for removed in state.view.difference(&view) { - state.collations.remove(removed); + if let Some((receipt, _)) = state.collations.remove(removed) { + state.collation_result_senders.remove(&receipt.hash()); + } state.our_validators_groups.remove(removed); state.connection_requests.remove_all(removed); state.span_per_relay_parent.remove(removed); @@ -1054,7 +1082,7 @@ mod tests { overseer_send( virtual_overseer, - CollatorProtocolMessage::DistributeCollation(candidate.clone(), pov_block.clone()), + CollatorProtocolMessage::DistributeCollation(candidate.clone(), pov_block.clone(), None), ).await; // obtain the availability cores. diff --git a/polkadot/node/network/collator-protocol/src/validator_side.rs b/polkadot/node/network/collator-protocol/src/validator_side.rs index a53bdf4a17..236e60ca08 100644 --- a/polkadot/node/network/collator-protocol/src/validator_side.rs +++ b/polkadot/node/network/collator-protocol/src/validator_side.rs @@ -39,6 +39,7 @@ use polkadot_node_network_protocol::{ v1 as protocol_v1, View, OurView, PeerId, ReputationChange as Rep, RequestId, }; use polkadot_node_subsystem_util::{TimeoutExt as _, metrics::{self, prometheus}}; +use polkadot_node_primitives::{Statement, SignedFullStatement}; use super::{modify_reputation, LOG_TARGET, Result}; @@ -200,9 +201,6 @@ struct State { /// Delay after which a collation request would time out. request_timeout: Duration, - /// Possessed collations. - collations: HashMap<(Hash, ParaId), Vec<(CollatorId, CandidateReceipt, PoV)>>, - /// Leaves have recently moved out of scope. /// These are looked into when we receive previously requested collations that we /// are no longer interested in. @@ -228,35 +226,13 @@ async fn fetch_collation( where Context: SubsystemContext { - // First take a look if we have already stored some of the relevant collations. - if let Some(collations) = state.collations.get(&(relay_parent, para_id)) { - for collation in collations.iter() { - if collation.0 == collator_id { - if let Err(e) = tx.send((collation.1.clone(), collation.2.clone())) { - // We do not want this to be fatal because the receving subsystem - // may have closed the results channel for some reason. - tracing::trace!( - target: LOG_TARGET, - err = ?e, - "Failed to send collation", - ); - } - return; - } + let relevant_advertiser = state.advertisements.iter().find_map(|(k, v)| { + if v.contains(&(para_id, relay_parent)) && state.known_collators.get(k) == Some(&collator_id) { + Some(k.clone()) + } else { + None } - } - - // Dodge multiple references to `state`. - let mut relevant_advertiser = None; - - // Has the collator in question advertised a relevant collation? - for (k, v) in state.advertisements.iter() { - if v.contains(&(para_id, relay_parent)) { - if state.known_collators.get(k) == Some(&collator_id) { - relevant_advertiser = Some(k.clone()); - } - } - } + }); // Request the collation. // Assume it is `request_collation`'s job to check and ignore duplicate requests. @@ -278,10 +254,8 @@ where // Since we have a one way map of PeerId -> CollatorId we have to // iterate here. Since a huge amount of peers is not expected this // is a tolerable thing to do. - for (k, v) in state.known_collators.iter() { - if *v == id { - modify_reputation(ctx, k.clone(), COST_REPORT_BAD).await; - } + for (k, _) in state.known_collators.iter().filter(|d| *d.1 == id) { + modify_reputation(ctx, k.clone(), COST_REPORT_BAD).await; } } @@ -295,10 +269,41 @@ async fn note_good_collation( where Context: SubsystemContext { - for (peer_id, collator_id) in state.known_collators.iter() { - if id == *collator_id { - modify_reputation(ctx, peer_id.clone(), BENEFIT_NOTIFY_GOOD).await; - } + for (peer_id, _) in state.known_collators.iter().filter(|d| *d.1 == id) { + modify_reputation(ctx, peer_id.clone(), BENEFIT_NOTIFY_GOOD).await; + } +} + +/// Notify a collator that its collation got seconded. +#[tracing::instrument(level = "trace", skip(ctx, state), fields(subsystem = LOG_TARGET))] +async fn notify_collation_seconded( + ctx: &mut impl SubsystemContext, + state: &mut State, + id: CollatorId, + statement: SignedFullStatement, +) { + if !matches!(statement.payload(), Statement::Seconded(_)) { + tracing::error!( + target: LOG_TARGET, + statement = ?statement, + "Notify collation seconded called with a wrong statement.", + ); + return; + } + + let peer_ids = state.known_collators.iter() + .filter_map(|(p, c)| if *c == id { Some(p.clone()) } else { None }) + .collect::>(); + + if !peer_ids.is_empty() { + let wire_message = protocol_v1::CollatorProtocolMessage::CollationSeconded(statement); + + ctx.send_message(AllMessages::NetworkBridge( + NetworkBridgeMessage::SendCollationMessage( + peer_ids, + protocol_v1::CollationProtocol::CollatorProtocol(wire_message), + ) + )).await; } } @@ -368,7 +373,7 @@ where if id == request_id { if let Some(per_request) = state.requests_info.remove(&id) { let _ = per_request.received.send(()); - if let Some(collator_id) = state.known_collators.get(&origin) { + if state.known_collators.get(&origin).is_some() { let pov = match pov.decompress() { Ok(pov) => pov, Err(error) => { @@ -395,11 +400,6 @@ where let _ = per_request.result.send((receipt.clone(), pov.clone())); state.metrics.on_request(Ok(())); - - state.collations - .entry((relay_parent, para_id)) - .or_default() - .push((collator_id.clone(), receipt, pov)); } } } @@ -558,6 +558,9 @@ where .map(|s| s.child("received-collation")); received_collation(ctx, state, origin, request_id, receipt, pov).await; } + CollationSeconded(_) => { + modify_reputation(ctx, origin, COST_UNEXPECTED_MESSAGE).await; + } } } @@ -584,8 +587,6 @@ async fn remove_relay_parent( } } - state.collations.retain(|k, _| k.0 != relay_parent); - Ok(()) } @@ -705,7 +706,7 @@ where "CollateOn message is not expected on the validator side of the protocol", ); } - DistributeCollation(_, _) => { + DistributeCollation(_, _, _) => { tracing::warn!( target: LOG_TARGET, "DistributeCollation message is not expected on the validator side of the protocol", @@ -721,6 +722,9 @@ where NoteGoodCollation(id) => { note_good_collation(ctx, state, id).await; } + NotifyCollationSeconded(id, statement) => { + notify_collation_seconded(ctx, state, id, statement).await; + } NetworkBridgeUpdateV1(event) => { if let Err(e) = handle_network_msg( ctx, diff --git a/polkadot/node/network/protocol/src/lib.rs b/polkadot/node/network/protocol/src/lib.rs index 81bd80f527..997d630636 100644 --- a/polkadot/node/network/protocol/src/lib.rs +++ b/polkadot/node/network/protocol/src/lib.rs @@ -396,6 +396,9 @@ pub mod v1 { /// A requested collation. #[codec(index = 3)] Collation(RequestId, CandidateReceipt, CompressedPoV), + /// A collation sent to a validator was seconded. + #[codec(index = 4)] + CollationSeconded(SignedFullStatement), } /// All network messages on the validation peer-set. diff --git a/polkadot/node/network/statement-distribution/src/lib.rs b/polkadot/node/network/statement-distribution/src/lib.rs index 960ff129ba..42e4816315 100644 --- a/polkadot/node/network/statement-distribution/src/lib.rs +++ b/polkadot/node/network/statement-distribution/src/lib.rs @@ -40,7 +40,7 @@ use polkadot_node_network_protocol::{ }; use futures::prelude::*; -use futures::channel::{mpsc, oneshot}; +use futures::channel::oneshot; use indexmap::IndexSet; use std::collections::{HashMap, HashSet}; @@ -499,27 +499,6 @@ fn check_statement_signature( .and_then(|v| statement.check_signature(&signing_context, v)) } -type StatementListeners = Vec>; - -/// Informs all registered listeners about a newly received statement. -/// -/// Removes all closed listeners. -#[tracing::instrument(level = "trace", skip(listeners), fields(subsystem = LOG_TARGET))] -async fn inform_statement_listeners( - statement: &SignedFullStatement, - listeners: &mut StatementListeners, -) { - // Ignore the errors since these will be removed later. - stream::iter(listeners.iter_mut()).for_each_concurrent( - None, - |listener| async move { - let _ = listener.send(statement.clone()).await; - } - ).await; - // Remove any closed listeners. - listeners.retain(|tx| !tx.is_closed()); -} - /// Places the statement in storage if it is new, and then /// circulates the statement to all peers who have not seen it yet, and /// sends all statements dependent on that statement to peers who could previously not receive @@ -699,7 +678,6 @@ async fn handle_incoming_message<'a>( ctx: &mut impl SubsystemContext, message: protocol_v1::StatementDistributionMessage, metrics: &Metrics, - statement_listeners: &mut StatementListeners, ) -> Option<(Hash, &'a StoredStatement)> { let (relay_parent, statement) = match message { protocol_v1::StatementDistributionMessage::Statement(r, s) => (r, s), @@ -770,8 +748,6 @@ async fn handle_incoming_message<'a>( Ok(false) => {} } - inform_statement_listeners(&statement, statement_listeners).await; - // Note: `peer_data.receive` already ensures that the statement is not an unbounded equivocation // or unpinned to a seconded candidate. So it is safe to place it into the storage. match active_head.note_statement(statement) { @@ -841,7 +817,6 @@ async fn handle_network_update( our_view: &mut OurView, update: NetworkBridgeEvent, metrics: &Metrics, - statement_listeners: &mut StatementListeners, ) { match update { NetworkBridgeEvent::PeerConnected(peer, _role) => { @@ -864,7 +839,6 @@ async fn handle_network_update( ctx, message, metrics, - statement_listeners, ).await } None => None, @@ -931,7 +905,6 @@ impl StatementDistribution { let mut peers: HashMap = HashMap::new(); let mut our_view = OurView::default(); let mut active_heads: HashMap = HashMap::new(); - let mut statement_listeners = StatementListeners::new(); let metrics = self.metrics; loop { @@ -993,10 +966,6 @@ impl StatementDistribution { StatementDistributionMessage::Share(relay_parent, statement) => { let _timer = metrics.time_share(); - inform_statement_listeners( - &statement, - &mut statement_listeners, - ).await; circulate_statement_and_dependents( &mut peers, &mut active_heads, @@ -1016,12 +985,8 @@ impl StatementDistribution { &mut our_view, event, &metrics, - &mut statement_listeners, ).await; } - StatementDistributionMessage::RegisterStatementListener(tx) => { - statement_listeners.push(tx); - } } } } diff --git a/polkadot/node/overseer/src/lib.rs b/polkadot/node/overseer/src/lib.rs index 8eed3bfc5e..c7f2c36a04 100644 --- a/polkadot/node/overseer/src/lib.rs +++ b/polkadot/node/overseer/src/lib.rs @@ -1962,7 +1962,7 @@ mod tests { use polkadot_primitives::v1::{BlockData, CollatorPair, PoV, CandidateHash}; use polkadot_subsystem::{messages::RuntimeApiRequest, messages::NetworkBridgeEvent, JaegerSpan}; - use polkadot_node_primitives::{Collation, CollationGenerationConfig}; + use polkadot_node_primitives::{CollationResult, CollationGenerationConfig}; use polkadot_node_network_protocol::{PeerId, ReputationChange}; use polkadot_node_subsystem_util::metered; @@ -2631,7 +2631,7 @@ mod tests { struct TestCollator; impl Future for TestCollator { - type Output = Option; + type Output = Option; fn poll(self: Pin<&mut Self>, _cx: &mut futures::task::Context) -> Poll { panic!("at the Disco") diff --git a/polkadot/node/primitives/src/lib.rs b/polkadot/node/primitives/src/lib.rs index 1727b14868..b00df8539d 100644 --- a/polkadot/node/primitives/src/lib.rs +++ b/polkadot/node/primitives/src/lib.rs @@ -157,13 +157,33 @@ pub struct Collation { pub hrmp_watermark: BlockNumber, } +/// Result of the [`CollatorFn`] invocation. +pub struct CollationResult { + /// The collation that was build. + pub collation: Collation, + /// An optional result sender that should be informed about a successfully seconded collation. + /// + /// There is no guarantee that this sender is informed ever about any result, it is completly okay to just drop it. + /// However, if it is called, it should be called with the signed statement of a parachain validator seconding the + /// collation. + pub result_sender: Option>, +} + +impl CollationResult { + /// Convert into the inner values. + pub fn into_inner(self) -> (Collation, Option>) { + (self.collation, self.result_sender) + } +} + /// Collation function. /// -/// Will be called with the hash of the relay chain block the parachain -/// block should be build on and the [`ValidationData`] that provides -/// information about the state of the parachain on the relay chain. +/// Will be called with the hash of the relay chain block the parachain block should be build on and the +/// [`ValidationData`] that provides information about the state of the parachain on the relay chain. +/// +/// Returns an optional [`CollationResult`]. pub type CollatorFn = Box< - dyn Fn(Hash, &PersistedValidationData) -> Pin> + Send>> + dyn Fn(Hash, &PersistedValidationData) -> Pin> + Send>> + Send + Sync, >; diff --git a/polkadot/node/subsystem/src/messages.rs b/polkadot/node/subsystem/src/messages.rs index 94cb6e44be..f670fdd5af 100644 --- a/polkadot/node/subsystem/src/messages.rs +++ b/polkadot/node/subsystem/src/messages.rs @@ -63,8 +63,13 @@ pub enum CandidateSelectionMessage { /// A candidate collation can be fetched from a collator and should be considered for seconding. Collation(Hash, ParaId, CollatorId), /// We recommended a particular candidate to be seconded, but it was invalid; penalize the collator. + /// /// The hash is the relay parent. Invalid(Hash, CandidateReceipt), + /// The candidate we recommended to be seconded was validated successfully. + /// + /// The hash is the relay parent. + Seconded(Hash, SignedFullStatement), } impl BoundToRelayParent for CandidateSelectionMessage { @@ -72,6 +77,7 @@ impl BoundToRelayParent for CandidateSelectionMessage { match self { Self::Collation(hash, ..) => *hash, Self::Invalid(hash, _) => *hash, + Self::Seconded(hash, _) => *hash, } } } @@ -174,8 +180,11 @@ pub enum CollatorProtocolMessage { /// /// This should be sent before any `DistributeCollation` message. CollateOn(ParaId), - /// Provide a collation to distribute to validators. - DistributeCollation(CandidateReceipt, PoV), + /// Provide a collation to distribute to validators with an optional result sender. + /// + /// The result sender should be informed when at least one parachain validator seconded the collation. It is also + /// completely okay to just drop the sender. + DistributeCollation(CandidateReceipt, PoV, Option>), /// Fetch a collation under the given relay-parent for the given ParaId. FetchCollation(Hash, CollatorId, ParaId, oneshot::Sender<(CandidateReceipt, PoV)>), /// Report a collator as having provided an invalid collation. This should lead to disconnect @@ -183,6 +192,8 @@ pub enum CollatorProtocolMessage { ReportCollator(CollatorId), /// Note a collator as having provided a good collation. NoteGoodCollation(CollatorId), + /// Notify a collator that its collation was seconded. + NotifyCollationSeconded(CollatorId, SignedFullStatement), /// Get a network bridge update. NetworkBridgeUpdateV1(NetworkBridgeEvent), } @@ -192,11 +203,12 @@ impl CollatorProtocolMessage { pub fn relay_parent(&self) -> Option { match self { Self::CollateOn(_) => None, - Self::DistributeCollation(receipt, _) => Some(receipt.descriptor().relay_parent), + Self::DistributeCollation(receipt, _, _) => Some(receipt.descriptor().relay_parent), Self::FetchCollation(relay_parent, _, _, _) => Some(*relay_parent), Self::ReportCollator(_) => None, Self::NoteGoodCollation(_) => None, Self::NetworkBridgeUpdateV1(_) => None, + Self::NotifyCollationSeconded(_, _) => None, } } } @@ -503,8 +515,6 @@ pub enum StatementDistributionMessage { Share(Hash, SignedFullStatement), /// Event from the network bridge. NetworkBridgeUpdateV1(NetworkBridgeEvent), - /// Register a listener for shared statements. - RegisterStatementListener(mpsc::Sender), } impl StatementDistributionMessage { @@ -513,7 +523,6 @@ impl StatementDistributionMessage { match self { Self::Share(hash, _) => Some(*hash), Self::NetworkBridgeUpdateV1(_) => None, - Self::RegisterStatementListener(_) => None, } } } diff --git a/polkadot/parachain/test-parachains/adder/collator/Cargo.toml b/polkadot/parachain/test-parachains/adder/collator/Cargo.toml index 4068d3c3f9..b869e08fd2 100644 --- a/polkadot/parachain/test-parachains/adder/collator/Cargo.toml +++ b/polkadot/parachain/test-parachains/adder/collator/Cargo.toml @@ -15,6 +15,7 @@ futures = "0.3.12" futures-timer = "3.0.2" log = "0.4.13" structopt = "0.3.21" +assert_matches = "1.4.0" test-parachain-adder = { path = ".." } polkadot-primitives = { path = "../../../../primitives" } diff --git a/polkadot/parachain/test-parachains/adder/collator/src/lib.rs b/polkadot/parachain/test-parachains/adder/collator/src/lib.rs index d8d8d3c1bb..ce7acd40c9 100644 --- a/polkadot/parachain/test-parachains/adder/collator/src/lib.rs +++ b/polkadot/parachain/test-parachains/adder/collator/src/lib.rs @@ -17,16 +17,18 @@ //! Collator for the adder test parachain. use futures_timer::Delay; -use polkadot_node_primitives::{Collation, CollatorFn}; +use polkadot_node_primitives::{Collation, CollatorFn, CollationResult, Statement, SignedFullStatement}; use polkadot_primitives::v1::{CollatorId, CollatorPair, PoV}; use parity_scale_codec::{Encode, Decode}; -use sp_core::Pair; +use sp_core::{Pair, traits::SpawnNamed}; use std::{ collections::HashMap, - sync::{Arc, Mutex}, + sync::{Arc, Mutex, atomic::{AtomicU32, Ordering}}, time::Duration, }; use test_parachain_adder::{execute, hash_state, BlockData, HeadData}; +use futures::channel::oneshot; +use assert_matches::assert_matches; /// The amount we add when producing a new block. /// @@ -102,6 +104,7 @@ impl State { pub struct Collator { state: Arc>, key: CollatorPair, + seconded_collations: Arc, } impl Collator { @@ -110,6 +113,7 @@ impl Collator { Self { state: Arc::new(Mutex::new(State::genesis())), key: CollatorPair::generate().0, + seconded_collations: Arc::new(AtomicU32::new(0)), } } @@ -142,10 +146,11 @@ impl Collator { /// Create the collation function. /// /// This collation function can be plugged into the overseer to generate collations for the adder parachain. - pub fn create_collation_function(&self) -> CollatorFn { + pub fn create_collation_function(&self, spawner: impl SpawnNamed + Clone + 'static) -> CollatorFn { use futures::FutureExt as _; let state = self.state.clone(); + let seconded_collations = self.seconded_collations.clone(); Box::new(move |relay_parent, validation_data| { let parent = HeadData::decode(&mut &validation_data.parent_head.0[..]) @@ -159,19 +164,33 @@ impl Collator { block_data, ); + let pov = PoV { block_data: block_data.encode().into() }; + let collation = Collation { upward_messages: Vec::new(), horizontal_messages: Vec::new(), new_validation_code: None, head_data: head_data.encode().into(), - proof_of_validity: PoV { - block_data: block_data.encode().into(), - }, + proof_of_validity: pov.clone(), processed_downward_messages: 0, hrmp_watermark: validation_data.relay_parent_number, }; - async move { Some(collation) }.boxed() + let (result_sender, recv) = oneshot::channel::(); + let seconded_collations = seconded_collations.clone(); + spawner.spawn("adder-collator-seconded", async move { + if let Ok(res) = recv.await { + assert_matches!( + res.payload(), + Statement::Seconded(s) if s.descriptor.pov_hash == pov.hash(), + "Seconded statement should match our collation!", + ); + + seconded_collations.fetch_add(1, Ordering::Relaxed); + } + }.boxed()); + + async move { Some(CollationResult { collation, result_sender: Some(result_sender) }) }.boxed() }) } @@ -188,6 +207,21 @@ impl Collator { } } } + + /// Wait until `seconded` collations of this collator are seconded by a parachain validator. + /// + /// The internal counter isn't de-duplicating the collations when counting the number of seconded collations. This + /// means when one collation is seconded by X validators, we record X seconded messages. + pub async fn wait_for_seconded_collations(&self, seconded: u32) { + let seconded_collations = self.seconded_collations.clone(); + loop { + Delay::new(Duration::from_secs(1)).await; + + if seconded <= seconded_collations.load(Ordering::Relaxed) { + return; + } + } + } } #[cfg(test)] @@ -200,8 +234,9 @@ mod tests { #[test] fn collator_works() { + let spawner = sp_core::testing::TaskExecutor::new(); let collator = Collator::new(); - let collation_function = collator.create_collation_function(); + let collation_function = collator.create_collation_function(spawner); for i in 0..5 { let parent_head = collator @@ -220,11 +255,15 @@ mod tests { let collation = block_on(collation_function(Default::default(), &validation_data)).unwrap(); - validate_collation(&collator, (*parent_head).clone(), collation); + validate_collation(&collator, (*parent_head).clone(), collation.collation); } } - fn validate_collation(collator: &Collator, parent_head: HeadData, collation: Collation) { + fn validate_collation( + collator: &Collator, + parent_head: HeadData, + collation: Collation, + ) { let ret = polkadot_parachain::wasm_executor::validate_candidate( collator.validation_code(), ValidationParams { diff --git a/polkadot/parachain/test-parachains/adder/collator/src/main.rs b/polkadot/parachain/test-parachains/adder/collator/src/main.rs index 4016e6d7d7..5ced3673e0 100644 --- a/polkadot/parachain/test-parachains/adder/collator/src/main.rs +++ b/polkadot/parachain/test-parachains/adder/collator/src/main.rs @@ -81,7 +81,7 @@ fn main() -> Result<()> { let config = CollationGenerationConfig { key: collator.collator_key(), - collator: collator.create_collation_function(), + collator: collator.create_collation_function(full_node.task_manager.spawn_handle()), para_id, }; overseer_handler diff --git a/polkadot/parachain/test-parachains/adder/collator/tests/integration.rs b/polkadot/parachain/test-parachains/adder/collator/tests/integration.rs index 215e32d7cc..fbc2086752 100644 --- a/polkadot/parachain/test-parachains/adder/collator/tests/integration.rs +++ b/polkadot/parachain/test-parachains/adder/collator/tests/integration.rs @@ -63,11 +63,18 @@ async fn collating_using_adder_collator(task_executor: sc_service::TaskExecutor) collator.collator_id(), ); - charlie.register_collator(collator.collator_key(), para_id, collator.create_collation_function()).await; + charlie.register_collator( + collator.collator_key(), + para_id, + collator.create_collation_function(charlie.task_manager.spawn_handle()), + ).await; // Wait until the parachain has 4 blocks produced. collator.wait_for_blocks(4).await; + // Wait until the collator received `12` seconded statements for its collations. + collator.wait_for_seconded_collations(12).await; + join!( alice.task_manager.clean_shutdown(), bob.task_manager.clean_shutdown(), diff --git a/polkadot/roadmap/implementers-guide/src/node/collators/collation-generation.md b/polkadot/roadmap/implementers-guide/src/node/collators/collation-generation.md index 5b9e8d6654..34be8ea7c1 100644 --- a/polkadot/roadmap/implementers-guide/src/node/collators/collation-generation.md +++ b/polkadot/roadmap/implementers-guide/src/node/collators/collation-generation.md @@ -32,8 +32,28 @@ pub struct Collation { pub proof_of_validity: PoV, } -type CollatorFn = Box< - dyn Fn(Hash, &PeristedValidationData) -> Pin>>> +/// Result of the [`CollatorFn`] invocation. +pub struct CollationResult { + /// The collation that was build. + collation: Collation, + /// An optional result sender that should be informed about a successfully seconded collation. + /// + /// There is no guarantee that this sender is informed ever about any result, it is completly okay to just drop it. + /// However, if it is called, it should be called with the signed statement of a parachain validator seconding the + /// collation. + result_sender: Option>, +} + +/// Collation function. +/// +/// Will be called with the hash of the relay chain block the parachain block should be build on and the +/// [`ValidationData`] that provides information about the state of the parachain on the relay chain. +/// +/// Returns an optional [`CollationResult`]. +pub type CollatorFn = Box< + dyn Fn(Hash, &PersistedValidationData) -> Pin> + Send>> + + Send + + Sync, >; struct CollationGenerationConfig { diff --git a/polkadot/roadmap/implementers-guide/src/node/subsystems-and-jobs.md b/polkadot/roadmap/implementers-guide/src/node/subsystems-and-jobs.md index e792cd35f6..1d0a6e7b7b 100644 --- a/polkadot/roadmap/implementers-guide/src/node/subsystems-and-jobs.md +++ b/polkadot/roadmap/implementers-guide/src/node/subsystems-and-jobs.md @@ -257,24 +257,18 @@ with implementing a gossip protocol: sequenceDiagram participant SD as StatementDistribution participant NB as NetworkBridge - participant Listener alt On receipt of a
SignedStatement from CandidateBacking % fn circulate_statement_and_dependents SD ->> NB: SendValidationMessage Note right of NB: Bridge sends validation message to all appropriate peers - else On initialization, from other subsystems: - Listener ->> SD: RegisterStatementListener else On receipt of peer validation message NB ->> SD: NetworkBridgeUpdateV1 % fn handle_incoming_message alt if we aren't already aware of the relay parent for this statement SD ->> NB: ReportPeer - else the statement corresponds to our View - Note over SD,Listener: Forward the statement to each registered listener - SD ->> Listener: SignedFullStatement end % fn circulate_statement diff --git a/polkadot/roadmap/implementers-guide/src/types/network.md b/polkadot/roadmap/implementers-guide/src/types/network.md index eb7bbe7070..a2da009d7c 100644 --- a/polkadot/roadmap/implementers-guide/src/types/network.md +++ b/polkadot/roadmap/implementers-guide/src/types/network.md @@ -105,6 +105,8 @@ enum CollatorProtocolV1Message { RequestCollation(RequestId, Hash, ParaId), /// A requested collation. Collation(RequestId, CandidateReceipt, CompressedPoV), + /// A collation sent to a validator was seconded. + CollationSeconded(SignedFullStatement), } ``` diff --git a/polkadot/roadmap/implementers-guide/src/types/overseer-protocol.md b/polkadot/roadmap/implementers-guide/src/types/overseer-protocol.md index 60994961a8..a9aa308983 100644 --- a/polkadot/roadmap/implementers-guide/src/types/overseer-protocol.md +++ b/polkadot/roadmap/implementers-guide/src/types/overseer-protocol.md @@ -234,10 +234,12 @@ These messages are sent to the [Candidate Selection subsystem](../node/backing/c ```rust enum CandidateSelectionMessage { - /// A candidate collation can be fetched from a collator and should be considered for seconding. - Collation(RelayParent, ParaId, CollatorId), - /// We recommended a particular candidate to be seconded, but it was invalid; penalize the collator. - Invalid(CandidateReceipt), + /// A candidate collation can be fetched from a collator and should be considered for seconding. + Collation(RelayParent, ParaId, CollatorId), + /// We recommended a particular candidate to be seconded, but it was invalid; penalize the collator. + Invalid(RelayParent, CandidateReceipt), + /// The candidate we recommended to be seconded was validated successfully. + Seconded(RelayParent, SignedFullStatement), } ``` @@ -290,15 +292,20 @@ enum CollatorProtocolMessage { /// /// This should be sent before any `DistributeCollation` message. CollateOn(ParaId), - /// Provide a collation to distribute to validators. - DistributeCollation(CandidateReceipt, PoV), + /// Provide a collation to distribute to validators with an optional result sender. + /// + /// The result sender should be informed when at least one parachain validator seconded the collation. It is also + /// completely okay to just drop the sender. + DistributeCollation(CandidateReceipt, PoV, Option>), /// Fetch a collation under the given relay-parent for the given ParaId. FetchCollation(Hash, ParaId, ResponseChannel<(CandidateReceipt, PoV)>), /// Report a collator as having provided an invalid collation. This should lead to disconnect /// and blacklist of the collator. ReportCollator(CollatorId), /// Note a collator as having provided a good collation. - NoteGoodCollation(CollatorId), + NoteGoodCollation(CollatorId, SignedFullStatement), + /// Notify a collator that its collation was seconded. + NotifyCollationSeconded(CollatorId, SignedFullStatement), } ``` @@ -539,8 +546,6 @@ enum StatementDistributionMessage { /// The statement distribution subsystem assumes that the statement should be correctly /// signed. Share(Hash, SignedFullStatement), - /// Register a listener to be notified on any new statements. - RegisterStatementListener(ResponseChannel), } ```