diff --git a/polkadot/Cargo.lock b/polkadot/Cargo.lock index f3d6ba663d..758bc2b7a9 100644 --- a/polkadot/Cargo.lock +++ b/polkadot/Cargo.lock @@ -6055,22 +6055,6 @@ dependencies = [ "wasm-timer", ] -[[package]] -name = "polkadot-node-core-candidate-selection" -version = "0.1.0" -dependencies = [ - "futures 0.3.14", - "polkadot-node-primitives", - "polkadot-node-subsystem", - "polkadot-node-subsystem-test-helpers", - "polkadot-node-subsystem-util", - "polkadot-primitives", - "sp-core", - "sp-keystore", - "thiserror", - "tracing", -] - [[package]] name = "polkadot-node-core-candidate-validation" version = "0.1.0" @@ -6667,7 +6651,6 @@ dependencies = [ "polkadot-node-core-av-store", "polkadot-node-core-backing", "polkadot-node-core-bitfield-signing", - "polkadot-node-core-candidate-selection", "polkadot-node-core-candidate-validation", "polkadot-node-core-chain-api", "polkadot-node-core-parachains-inherent", diff --git a/polkadot/Cargo.toml b/polkadot/Cargo.toml index 79381029e0..da24aa45cb 100644 --- a/polkadot/Cargo.toml +++ b/polkadot/Cargo.toml @@ -48,7 +48,6 @@ members = [ "node/core/av-store", "node/core/backing", "node/core/bitfield-signing", - "node/core/candidate-selection", "node/core/candidate-validation", "node/core/chain-api", "node/core/parachains-inherent", diff --git a/polkadot/node/core/backing/src/lib.rs b/polkadot/node/core/backing/src/lib.rs index 67f635a774..3d5ac78873 100644 --- a/polkadot/node/core/backing/src/lib.rs +++ b/polkadot/node/core/backing/src/lib.rs @@ -39,7 +39,7 @@ use polkadot_subsystem::{ jaeger, messages::{ AllMessages, AvailabilityDistributionMessage, AvailabilityStoreMessage, - CandidateBackingMessage, CandidateSelectionMessage, CandidateValidationMessage, + CandidateBackingMessage, CandidateValidationMessage, CollatorProtocolMessage, ProvisionableData, ProvisionerMessage, RuntimeApiRequest, StatementDistributionMessage, ValidationFailed } @@ -600,14 +600,14 @@ impl CandidateBackingJob { root_span, ).await? { sender.send_message( - CandidateSelectionMessage::Seconded(self.parent, stmt).into() + CollatorProtocolMessage::Seconded(self.parent, stmt).into() ).await; } } } Err(candidate) => { sender.send_message( - CandidateSelectionMessage::Invalid(self.parent, candidate).into() + CollatorProtocolMessage::Invalid(self.parent, candidate).into() ).await; } } @@ -685,7 +685,7 @@ impl CandidateBackingJob { .map_or(false, |c| c != &candidate.descriptor().collator) { sender.send_message( - CandidateSelectionMessage::Invalid(self.parent, candidate.clone()).into() + CollatorProtocolMessage::Invalid(self.parent, candidate.clone()).into() ).await; return Ok(()); } @@ -1332,7 +1332,7 @@ mod tests { use futures::{future, Future}; use polkadot_primitives::v1::{GroupRotationInfo, HeadData, PersistedValidationData, ScheduledCore}; use polkadot_subsystem::{ - messages::{RuntimeApiRequest, RuntimeApiMessage}, + messages::{RuntimeApiRequest, RuntimeApiMessage, CollatorProtocolMessage}, ActiveLeavesUpdate, FromOverseer, OverseerSignal, ActivatedLeaf, LeafStatus, }; use polkadot_node_primitives::{InvalidCandidate, BlockData}; @@ -1648,7 +1648,7 @@ mod tests { assert_matches!( virtual_overseer.recv().await, - AllMessages::CandidateSelection(CandidateSelectionMessage::Seconded(hash, statement)) => { + AllMessages::CollatorProtocol(CollatorProtocolMessage::Seconded(hash, statement)) => { assert_eq!(test_state.relay_parent, hash); assert_matches!(statement.payload(), Statement::Seconded(_)); } @@ -2172,8 +2172,8 @@ mod tests { assert_matches!( virtual_overseer.recv().await, - AllMessages::CandidateSelection( - CandidateSelectionMessage::Invalid(parent_hash, c) + AllMessages::CollatorProtocol( + CollatorProtocolMessage::Invalid(parent_hash, c) ) if parent_hash == test_state.relay_parent && c == candidate_a.to_plain() ); @@ -2482,8 +2482,8 @@ mod tests { assert_matches!( virtual_overseer.recv().await, - AllMessages::CandidateSelection( - CandidateSelectionMessage::Invalid(parent, c) + AllMessages::CollatorProtocol( + CollatorProtocolMessage::Invalid(parent, c) ) if parent == test_state.relay_parent && c == candidate.to_plain() => { } ); diff --git a/polkadot/node/core/candidate-selection/Cargo.toml b/polkadot/node/core/candidate-selection/Cargo.toml deleted file mode 100644 index be9be709da..0000000000 --- a/polkadot/node/core/candidate-selection/Cargo.toml +++ /dev/null @@ -1,21 +0,0 @@ -[package] -name = "polkadot-node-core-candidate-selection" -version = "0.1.0" -authors = ["Parity Technologies "] -edition = "2018" - -[dependencies] -futures = "0.3.12" -tracing = "0.1.26" -thiserror = "1.0.23" - -sp-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" } - -polkadot-primitives = { path = "../../../primitives" } -polkadot-node-subsystem = { path = "../../subsystem" } -polkadot-node-primitives = { path = "../../primitives" } -polkadot-node-subsystem-util = { path = "../../subsystem-util" } - -[dev-dependencies] -sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" } -polkadot-node-subsystem-test-helpers = { path = "../../subsystem-test-helpers" } diff --git a/polkadot/node/core/candidate-selection/src/lib.rs b/polkadot/node/core/candidate-selection/src/lib.rs deleted file mode 100644 index 37433ec634..0000000000 --- a/polkadot/node/core/candidate-selection/src/lib.rs +++ /dev/null @@ -1,715 +0,0 @@ -// Copyright 2020 Parity Technologies (UK) Ltd. -// This file is part of Polkadot. - -// Polkadot is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Polkadot is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Polkadot. If not, see . - -//! The provisioner is responsible for assembling a relay chain block -//! from a set of available parachain candidates of its choice. - -#![deny(missing_docs, unused_crate_dependencies, unused_results)] - -use futures::{ - channel::{mpsc, oneshot}, - prelude::*, -}; -use sp_keystore::SyncCryptoStorePtr; -use polkadot_node_subsystem::{ - jaeger, PerLeafSpan, SubsystemSender, - errors::ChainApiError, - messages::{ - CandidateBackingMessage, CandidateSelectionMessage, CollatorProtocolMessage, - RuntimeApiRequest, - }, -}; -use polkadot_node_subsystem_util::{ - self as util, request_from_runtime, request_validator_groups, JobSubsystem, - JobTrait, JobSender, Validator, metrics::{self, prometheus}, -}; -use polkadot_primitives::v1::{ - CandidateReceipt, CollatorId, CoreState, CoreIndex, Hash, Id as ParaId, BlockNumber, -}; -use polkadot_node_primitives::{SignedFullStatement, PoV}; -use std::{pin::Pin, sync::Arc}; -use thiserror::Error; - -const LOG_TARGET: &'static str = "parachain::candidate-selection"; - -/// A per-block job in the candidate selection subsystem. -pub struct CandidateSelectionJob { - assignment: ParaId, - receiver: mpsc::Receiver, - metrics: Metrics, - seconded_candidate: Option, -} - -/// Errors in the candidate selection subsystem. -#[derive(Debug, Error)] -pub enum Error { - /// An error in utilities. - #[error(transparent)] - Util(#[from] util::Error), - /// An error receiving on a oneshot channel. - #[error(transparent)] - OneshotRecv(#[from] oneshot::Canceled), - /// An error interacting with the chain API. - #[error(transparent)] - ChainApi(#[from] ChainApiError), -} - -macro_rules! try_runtime_api { - ($x: expr) => { - match $x { - Ok(x) => x, - Err(e) => { - tracing::warn!( - target: LOG_TARGET, - err = ?e, - "Failed to fetch runtime API data for job", - ); - - // We can't do candidate selection work if we don't have the - // requisite runtime API data. But these errors should not take - // down the node. - return Ok(()); - } - } - } -} - -impl JobTrait for CandidateSelectionJob { - type ToJob = CandidateSelectionMessage; - type Error = Error; - type RunArgs = SyncCryptoStorePtr; - type Metrics = Metrics; - - const NAME: &'static str = "CandidateSelectionJob"; - - #[tracing::instrument(skip(keystore, metrics, receiver, sender), fields(subsystem = LOG_TARGET))] - fn run( - relay_parent: Hash, - span: Arc, - keystore: Self::RunArgs, - metrics: Self::Metrics, - receiver: mpsc::Receiver, - mut sender: JobSender, - ) -> Pin> + Send>> { - let span = PerLeafSpan::new(span, "candidate-selection"); - async move { - let _span = span.child("query-runtime") - .with_relay_parent(relay_parent) - .with_stage(jaeger::Stage::CandidateSelection); - let (groups, cores) = futures::try_join!( - request_validator_groups(relay_parent, &mut sender).await, - request_from_runtime( - relay_parent, - &mut sender, - |tx| RuntimeApiRequest::AvailabilityCores(tx), - ).await, - )?; - - let (validator_groups, group_rotation_info) = try_runtime_api!(groups); - let cores = try_runtime_api!(cores); - - drop(_span); - let _span = span.child("validator-construction") - .with_relay_parent(relay_parent) - .with_stage(jaeger::Stage::CandidateSelection); - - let n_cores = cores.len(); - - let validator = match Validator::new(relay_parent, keystore.clone(), &mut sender).await { - Ok(validator) => validator, - Err(util::Error::NotAValidator) => return Ok(()), - Err(err) => return Err(Error::Util(err)), - }; - - let assignment_span = span.child("find-assignment") - .with_relay_parent(relay_parent) - .with_stage(jaeger::Stage::CandidateSelection); - - #[derive(Debug)] - enum AssignmentState { - Unassigned, - Scheduled(ParaId), - Occupied(BlockNumber), - Free, - } - - let mut assignment = AssignmentState::Unassigned; - - for (idx, core) in cores.into_iter().enumerate() { - let core_index = CoreIndex(idx as _); - let group_index = group_rotation_info.group_for_core(core_index, n_cores); - if let Some(g) = validator_groups.get(group_index.0 as usize) { - if g.contains(&validator.index()) { - match core { - CoreState::Scheduled(scheduled) => { - assignment = AssignmentState::Scheduled(scheduled.para_id); - } - CoreState::Occupied(occupied) => { - // Ignore prospective assignments on occupied cores - // for the time being. - assignment = AssignmentState::Occupied(occupied.occupied_since); - } - CoreState::Free => { - assignment = AssignmentState::Free; - } - } - break; - } - } - } - - let (assignment, assignment_span) = match assignment { - AssignmentState::Scheduled(assignment) => { - let assignment_span = assignment_span - .with_string_tag("assigned", "true") - .with_para_id(assignment); - - (assignment, assignment_span) - } - assignment => { - let _assignment_span = assignment_span.with_string_tag("assigned", "false"); - - let validator_index = validator.index(); - let validator_id = validator.id(); - - tracing::debug!( - target: LOG_TARGET, - ?relay_parent, - ?validator_index, - ?validator_id, - ?assignment, - "No assignment. Will not select candidate." - ); - - return Ok(()) - } - }; - - drop(assignment_span); - - CandidateSelectionJob::new(assignment, metrics, receiver) - .run_loop(&span, sender.subsystem_sender()) - .await - }.boxed() - } -} - -impl CandidateSelectionJob { - fn new( - assignment: ParaId, - metrics: Metrics, - receiver: mpsc::Receiver, - ) -> Self { - Self { - receiver, - metrics, - assignment, - seconded_candidate: None, - } - } - - async fn run_loop( - &mut self, - span: &jaeger::Span, - sender: &mut impl SubsystemSender, - ) -> Result<(), Error> { - let span = span.child("run-loop") - .with_stage(jaeger::Stage::CandidateSelection); - - loop { - match self.receiver.next().await { - Some(CandidateSelectionMessage::Collation( - relay_parent, - para_id, - collator_id, - )) => { - let _span = span.child("handle-collation"); - self.handle_collation(sender, relay_parent, para_id, collator_id).await; - } - Some(CandidateSelectionMessage::Invalid( - _relay_parent, - candidate_receipt, - )) => { - let _span = span.child("handle-invalid") - .with_stage(jaeger::Stage::CandidateSelection) - .with_candidate(candidate_receipt.hash()) - .with_relay_parent(_relay_parent); - self.handle_invalid(sender, candidate_receipt).await; - } - Some(CandidateSelectionMessage::Seconded(relay_parent, statement)) => { - let _span = span.child("handle-seconded") - .with_stage(jaeger::Stage::CandidateSelection) - .with_candidate(statement.payload().candidate_hash()) - .with_relay_parent(relay_parent); - self.handle_seconded(sender, relay_parent, statement).await; - } - None => break, - } - } - - Ok(()) - } - - #[tracing::instrument(level = "trace", skip(self, sender), fields(subsystem = LOG_TARGET))] - async fn handle_collation( - &mut self, - sender: &mut impl SubsystemSender, - relay_parent: Hash, - para_id: ParaId, - collator_id: CollatorId, - ) { - let _timer = self.metrics.time_handle_collation(); - - if self.assignment != para_id { - tracing::info!( - target: LOG_TARGET, - "Collator {:?} sent a collation outside of our assignment {:?}", - collator_id, - para_id, - ); - forward_invalidity_note(&collator_id, sender).await; - return; - } - - if self.seconded_candidate.is_none() { - let (candidate_receipt, pov) = - match get_collation( - relay_parent, - para_id, - collator_id.clone(), - sender, - ).await { - Ok(response) => response, - Err(err) => { - tracing::debug!( - target: LOG_TARGET, - err = ?err, - "failed to get collation from collator protocol subsystem", - ); - return; - } - }; - - second_candidate( - relay_parent, - candidate_receipt, - pov, - sender, - &self.metrics, - ).await; - self.seconded_candidate = Some(collator_id); - } - } - - #[tracing::instrument(level = "trace", skip(self, sender), fields(subsystem = LOG_TARGET))] - async fn handle_invalid( - &mut self, - sender: &mut impl SubsystemSender, - candidate_receipt: CandidateReceipt, - ) { - let _timer = self.metrics.time_handle_invalid(); - - let received_from = match &self.seconded_candidate { - Some(peer) => peer, - None => { - tracing::warn!( - target: LOG_TARGET, - "received invalidity notice for a candidate we don't remember seconding" - ); - return; - } - }; - tracing::info!( - target: LOG_TARGET, - candidate_receipt = ?candidate_receipt, - "received invalidity note for candidate", - ); - - forward_invalidity_note(received_from, sender).await; - self.metrics.on_invalid_selection(); - } - - async fn handle_seconded( - &mut self, - sender: &mut impl SubsystemSender, - relay_parent: Hash, - statement: SignedFullStatement, - ) { - let received_from = match &self.seconded_candidate { - Some(peer) => peer, - None => { - tracing::warn!( - target: LOG_TARGET, - "received seconded notice for a candidate we don't remember seconding" - ); - return; - } - }; - tracing::debug!( - target: LOG_TARGET, - statement = ?statement, - "received seconded note for candidate", - ); - - sender - .send_message(CollatorProtocolMessage::NoteGoodCollation(received_from.clone()).into()) - .await; - - sender.send_message( - CollatorProtocolMessage::NotifyCollationSeconded( - received_from.clone(), - relay_parent, - statement - ).into() - ).await; - } -} - -// get a collation from the Collator Protocol subsystem -// -// note that this gets an owned clone of the sender; that's becuase unlike `forward_invalidity_note`, it's expected to take a while longer -#[tracing::instrument(level = "trace", skip(sender), fields(subsystem = LOG_TARGET))] -async fn get_collation( - relay_parent: Hash, - para_id: ParaId, - collator_id: CollatorId, - sender: &mut impl SubsystemSender, -) -> Result<(CandidateReceipt, PoV), Error> { - let (tx, rx) = oneshot::channel(); - sender - .send_message(CollatorProtocolMessage::FetchCollation( - relay_parent, - collator_id, - para_id, - tx, - ).into()) - .await; - - rx.await.map_err(Into::into) -} - -async fn second_candidate( - relay_parent: Hash, - candidate_receipt: CandidateReceipt, - pov: PoV, - sender: &mut impl SubsystemSender, - metrics: &Metrics, -) { - sender - .send_message(CandidateBackingMessage::Second( - relay_parent, - candidate_receipt, - pov, - ).into()) - .await; - - metrics.on_second(); -} - -async fn forward_invalidity_note( - received_from: &CollatorId, - sender: &mut impl SubsystemSender, -) { - sender - .send_message(CollatorProtocolMessage::ReportCollator(received_from.clone()).into()) - .await -} - -#[derive(Clone)] -struct MetricsInner { - seconds: prometheus::Counter, - invalid_selections: prometheus::Counter, - handle_collation: prometheus::Histogram, - handle_invalid: prometheus::Histogram, -} - -/// Candidate selection metrics. -#[derive(Default, Clone)] -pub struct Metrics(Option); - -impl Metrics { - fn on_second(&self) { - if let Some(metrics) = &self.0 { - metrics.seconds.inc(); - } - } - - fn on_invalid_selection(&self) { - if let Some(metrics) = &self.0 { - metrics.invalid_selections.inc(); - } - } - - /// Provide a timer for `handle_collation` which observes on drop. - fn time_handle_collation(&self) -> Option { - self.0.as_ref().map(|metrics| metrics.handle_collation.start_timer()) - } - - /// Provide a timer for `handle_invalid` which observes on drop. - fn time_handle_invalid(&self) -> Option { - self.0.as_ref().map(|metrics| metrics.handle_invalid.start_timer()) - } -} - -impl metrics::Metrics for Metrics { - fn try_register(registry: &prometheus::Registry) -> Result { - let metrics = MetricsInner { - seconds: prometheus::register( - prometheus::Counter::with_opts( - prometheus::Opts::new( - "candidate_selection_seconds_total", - "Number of Candidate Selection subsystem seconding events.", - ), - )?, - registry, - )?, - invalid_selections: prometheus::register( - prometheus::Counter::with_opts( - prometheus::Opts::new( - "candidate_selection_invalid_selections_total", - "Number of Candidate Selection subsystem seconding selections which proved to be invalid.", - ), - )?, - registry, - )?, - handle_collation: prometheus::register( - prometheus::Histogram::with_opts( - prometheus::HistogramOpts::new( - "parachain_candidate_selection_handle_collation", - "Time spent within `candidate_selection::handle_collation`", - ) - )?, - registry, - )?, - handle_invalid: prometheus::register( - prometheus::Histogram::with_opts( - prometheus::HistogramOpts::new( - "parachain_candidate_selection:handle_invalid", - "Time spent within `candidate_selection::handle_invalid`", - ) - )?, - registry, - )?, - }; - Ok(Metrics(Some(metrics))) - } -} - -/// The candidate selection subsystem. -pub type CandidateSelectionSubsystem = JobSubsystem; - -#[cfg(test)] -mod tests { - use super::*; - use futures::lock::Mutex; - use polkadot_node_primitives::BlockData; - use polkadot_node_subsystem::messages::AllMessages; - use sp_core::crypto::Public; - use std::sync::Arc; - - fn test_harness( - preconditions: Preconditions, - test: TestBuilder, - postconditions: Postconditions, - ) where - Preconditions: FnOnce(&mut CandidateSelectionJob), - TestBuilder: FnOnce(mpsc::Sender, mpsc::UnboundedReceiver) -> Test, - Test: Future, - Postconditions: FnOnce(CandidateSelectionJob, Result<(), Error>), - { - let (to_job_tx, to_job_rx) = mpsc::channel(0); - let (mut from_job_tx, from_job_rx) = polkadot_node_subsystem_test_helpers::sender_receiver(); - let mut job = CandidateSelectionJob { - assignment: 123.into(), - receiver: to_job_rx, - metrics: Default::default(), - seconded_candidate: None, - }; - - preconditions(&mut job); - let span = jaeger::Span::Disabled; - let (_, (job, job_result)) = futures::executor::block_on(future::join( - test(to_job_tx, from_job_rx), - async move { - let res = job.run_loop(&span, &mut from_job_tx).await; - drop(from_job_tx); - (job, res) - }, - )); - - postconditions(job, job_result); - } - - /// when nothing is seconded so far, the collation is fetched and seconded - #[test] - fn fetches_and_seconds_a_collation() { - let relay_parent = Hash::random(); - let para_id: ParaId = 123.into(); - let collator_id = CollatorId::from_slice(&(0..32).collect::>()); - let collator_id_clone = collator_id.clone(); - - let candidate_receipt = CandidateReceipt::default(); - let pov = PoV { - block_data: BlockData((0..32).cycle().take(256).collect()), - }; - - let was_seconded = Arc::new(Mutex::new(false)); - let was_seconded_clone = was_seconded.clone(); - - test_harness( - |_job| {}, - |mut to_job, mut from_job| async move { - to_job - .send(CandidateSelectionMessage::Collation( - relay_parent, - para_id, - collator_id_clone.clone(), - )) - .await - .unwrap(); - std::mem::drop(to_job); - - while let Some(msg) = from_job.next().await { - match msg { - AllMessages::CollatorProtocol(CollatorProtocolMessage::FetchCollation( - got_relay_parent, - collator_id, - got_para_id, - return_sender, - )) => { - assert_eq!(got_relay_parent, relay_parent); - assert_eq!(got_para_id, para_id); - assert_eq!(collator_id, collator_id_clone); - - return_sender - .send((candidate_receipt.clone(), pov.clone())) - .unwrap(); - } - AllMessages::CandidateBacking(CandidateBackingMessage::Second( - got_relay_parent, - got_candidate_receipt, - got_pov, - )) => { - assert_eq!(got_relay_parent, relay_parent); - assert_eq!(got_candidate_receipt, candidate_receipt); - assert_eq!(got_pov, pov); - - *was_seconded_clone.lock().await = true; - } - other => panic!("unexpected message from job: {:?}", other), - } - } - }, - |job, job_result| { - assert!(job_result.is_ok()); - assert_eq!(job.seconded_candidate.unwrap(), collator_id); - }, - ); - - assert!(Arc::try_unwrap(was_seconded).unwrap().into_inner()); - } - - /// when something has been seconded, further collation notifications are ignored - #[test] - fn ignores_collation_notifications_after_the_first() { - let relay_parent = Hash::random(); - let para_id: ParaId = 123.into(); - let prev_collator_id = CollatorId::from_slice(&(0..32).rev().collect::>()); - let collator_id = CollatorId::from_slice(&(0..32).collect::>()); - let collator_id_clone = collator_id.clone(); - - let was_seconded = Arc::new(Mutex::new(false)); - let was_seconded_clone = was_seconded.clone(); - - test_harness( - |job| job.seconded_candidate = Some(prev_collator_id.clone()), - |mut to_job, mut from_job| async move { - to_job - .send(CandidateSelectionMessage::Collation( - relay_parent, - para_id, - collator_id_clone, - )) - .await - .unwrap(); - std::mem::drop(to_job); - - while let Some(msg) = from_job.next().await { - match msg { - AllMessages::CandidateBacking(CandidateBackingMessage::Second( - _got_relay_parent, - _got_candidate_receipt, - _got_pov, - )) => { - *was_seconded_clone.lock().await = true; - } - other => panic!("unexpected message from job: {:?}", other), - } - } - }, - |job, job_result| { - assert!(job_result.is_ok()); - assert_eq!(job.seconded_candidate.unwrap(), prev_collator_id); - }, - ); - - assert!(!Arc::try_unwrap(was_seconded).unwrap().into_inner()); - } - - /// reports of invalidity from candidate backing are propagated - #[test] - fn propagates_invalidity_reports() { - let relay_parent = Hash::random(); - let collator_id = CollatorId::from_slice(&(0..32).collect::>()); - let collator_id_clone = collator_id.clone(); - - let candidate_receipt = CandidateReceipt::default(); - - let sent_report = Arc::new(Mutex::new(false)); - let sent_report_clone = sent_report.clone(); - - test_harness( - |job| job.seconded_candidate = Some(collator_id.clone()), - |mut to_job, mut from_job| async move { - to_job - .send(CandidateSelectionMessage::Invalid(relay_parent, candidate_receipt)) - .await - .unwrap(); - - std::mem::drop(to_job); - - while let Some(msg) = from_job.next().await { - match msg { - AllMessages::CollatorProtocol(CollatorProtocolMessage::ReportCollator( - got_collator_id, - )) => { - assert_eq!(got_collator_id, collator_id_clone); - - *sent_report_clone.lock().await = true; - } - other => panic!("unexpected message from job: {:?}", other), - } - } - }, - |job, job_result| { - assert!(job_result.is_ok()); - assert_eq!(job.seconded_candidate.unwrap(), collator_id); - }, - ); - - assert!(Arc::try_unwrap(sent_report).unwrap().into_inner()); - } -} diff --git a/polkadot/node/network/bridge/src/lib.rs b/polkadot/node/network/bridge/src/lib.rs index 1f8d89d4ea..1c348adc28 100644 --- a/polkadot/node/network/bridge/src/lib.rs +++ b/polkadot/node/network/bridge/src/lib.rs @@ -2365,7 +2365,6 @@ mod tests { match msg { AllMessages::CandidateValidation(_) => unreachable!("Not interested in network events"), AllMessages::CandidateBacking(_) => unreachable!("Not interested in network events"), - AllMessages::CandidateSelection(_) => unreachable!("Not interested in network events"), AllMessages::ChainApi(_) => unreachable!("Not interested in network events"), AllMessages::CollatorProtocol(_) => unreachable!("Not interested in network events"), AllMessages::StatementDistribution(_) => { cnt += 1; } diff --git a/polkadot/node/network/collator-protocol/src/collator_side.rs b/polkadot/node/network/collator-protocol/src/collator_side.rs index 87b88a15f4..a73490989c 100644 --- a/polkadot/node/network/collator-protocol/src/collator_side.rs +++ b/polkadot/node/network/collator-protocol/src/collator_side.rs @@ -570,30 +570,12 @@ async fn process_msg( } } } - FetchCollation(_, _, _, _) => { - tracing::warn!( - target: LOG_TARGET, - "FetchCollation message is not expected on the collator side of the protocol", - ); - } ReportCollator(_) => { tracing::warn!( target: LOG_TARGET, "ReportCollator message is not expected on the collator side of the protocol", ); } - NoteGoodCollation(_) => { - tracing::warn!( - target: LOG_TARGET, - "NoteGoodCollation message is not expected on the collator side of the protocol", - ); - } - NotifyCollationSeconded(_, _, _) => { - tracing::warn!( - target: LOG_TARGET, - "NotifyCollationSeconded message is not expected on the collator side of the protocol", - ); - } NetworkBridgeUpdateV1(event) => { if let Err(e) = handle_network_msg( ctx, @@ -646,6 +628,7 @@ async fn process_msg( } } } + _ => {}, } Ok(()) diff --git a/polkadot/node/network/collator-protocol/src/validator_side.rs b/polkadot/node/network/collator-protocol/src/validator_side/mod.rs similarity index 78% rename from polkadot/node/network/collator-protocol/src/validator_side.rs rename to polkadot/node/network/collator-protocol/src/validator_side/mod.rs index 2a161c65d8..4e0e9266ab 100644 --- a/polkadot/node/network/collator-protocol/src/validator_side.rs +++ b/polkadot/node/network/collator-protocol/src/validator_side/mod.rs @@ -19,7 +19,8 @@ use std::collections::hash_map::Entry; use std::time::{Duration, Instant}; use always_assert::never; use futures::{ - channel::oneshot, future::{BoxFuture, Either, Fuse, FusedFuture}, FutureExt, StreamExt, + channel::oneshot, future::{BoxFuture, Fuse, FusedFuture}, FutureExt, StreamExt, + stream::FuturesUnordered, select, }; use futures_timer::Delay; @@ -35,22 +36,22 @@ use polkadot_node_network_protocol::{ }, OurView, PeerId, UnifiedReputationChange as Rep, View, }; -use polkadot_node_primitives::{SignedFullStatement, Statement, PoV}; -use polkadot_node_subsystem_util::metrics::{self, prometheus}; +use polkadot_node_primitives::{SignedFullStatement, PoV}; +use polkadot_node_subsystem_util::{TimeoutExt, metrics::{self, prometheus}}; use polkadot_primitives::v1::{CandidateReceipt, CollatorId, Hash, Id as ParaId}; use polkadot_subsystem::{ jaeger, messages::{ - AllMessages, CandidateSelectionMessage, CollatorProtocolMessage, IfDisconnected, - NetworkBridgeEvent, NetworkBridgeMessage, + AllMessages, CollatorProtocolMessage, IfDisconnected, + NetworkBridgeEvent, NetworkBridgeMessage, CandidateBackingMessage, }, FromOverseer, OverseerSignal, PerLeafSpan, SubsystemContext, SubsystemSender, }; -use crate::error::Fatal; - use super::{modify_reputation, Result, LOG_TARGET}; +const COLLATION_FETCH_TIMEOUT: Duration = Duration::from_secs(2); + const COST_UNEXPECTED_MESSAGE: Rep = Rep::CostMinor("An unexpected message"); /// Message could not be decoded properly. const COST_CORRUPTED_MESSAGE: Rep = Rep::CostMinor("Message was corrupt"); @@ -439,6 +440,28 @@ impl ActiveParas { } } +#[derive(Debug, Clone, Hash, Eq, PartialEq)] +struct PendingCollation { + relay_parent: Hash, + para_id: ParaId, + peer_id: PeerId, + commitments_hash: Option, +} + +impl PendingCollation { + fn new(relay_parent: Hash, para_id: &ParaId, peer_id: &PeerId) -> Self { + let commitments_hash = None; + Self { relay_parent, para_id: para_id.clone(), peer_id: peer_id.clone(), commitments_hash } + } +} + +type CollationEvent = (CollatorId, PendingCollation); + +type PendingCollationFetch = ( + CollationEvent, + Option> +); + /// All state relevant for the validator side of the protocol lives here. #[derive(Default)] struct State { @@ -456,13 +479,19 @@ struct State { /// For each relay parent and para id we may be connected to a number /// of collators each of those may have advertised a different collation. /// So we group such cases here. - requested_collations: HashMap<(Hash, ParaId, PeerId), PerRequest>, + requested_collations: HashMap, /// Metrics. metrics: Metrics, /// Span per relay parent. span_per_relay_parent: HashMap, + + /// Keep track of all fetch collation requests + collations: FuturesUnordered>, + + /// Keep track of all pending candidate collations + pending_candidates: HashMap, } // O(n) search for collator ID by iterating through the peers map. This should be fast enough @@ -484,23 +513,17 @@ async fn disconnect_peer(ctx: &mut impl SubsystemContext, peer_id: PeerId) { } /// Another subsystem has requested to fetch collations on a particular leaf for some para. -#[tracing::instrument(level = "trace", skip(ctx, state, tx), fields(subsystem = LOG_TARGET))] +#[tracing::instrument(level = "trace", skip(ctx, state, tx, pc), fields(subsystem = LOG_TARGET))] async fn fetch_collation( ctx: &mut Context, state: &mut State, - relay_parent: Hash, - collator_id: CollatorId, - para_id: ParaId, + pc: PendingCollation, tx: oneshot::Sender<(CandidateReceipt, PoV)> ) where Context: SubsystemContext { - let peer_id = match collator_peer_id(&state.peer_data, &collator_id) { - None => return, - Some(p) => p, - }; - + let PendingCollation { relay_parent, para_id, peer_id, .. } = pc; if state.peer_data.get(&peer_id).map_or(false, |d| d.has_advertised(&relay_parent)) { request_collation(ctx, state, relay_parent, para_id, peer_id, tx).await; } @@ -537,33 +560,22 @@ where } /// Notify a collator that its collation got seconded. -#[tracing::instrument(level = "trace", skip(ctx, peer_data), fields(subsystem = LOG_TARGET))] +#[tracing::instrument(level = "trace", skip(ctx), fields(subsystem = LOG_TARGET))] async fn notify_collation_seconded( ctx: &mut impl SubsystemContext, - peer_data: &HashMap, - id: CollatorId, + peer_id: PeerId, relay_parent: Hash, statement: SignedFullStatement, ) { - if !matches!(statement.payload(), Statement::Seconded(_)) { - tracing::error!( - target: LOG_TARGET, - statement = ?statement, - "Notify collation seconded called with a wrong statement.", - ); - return; - } + let wire_message = protocol_v1::CollatorProtocolMessage::CollationSeconded(relay_parent, statement.into()); + ctx.send_message(AllMessages::NetworkBridge( + NetworkBridgeMessage::SendCollationMessage( + vec![peer_id], + protocol_v1::CollationProtocol::CollatorProtocol(wire_message), + ) + )).await; - if let Some(peer_id) = collator_peer_id(peer_data, &id) { - let wire_message = protocol_v1::CollatorProtocolMessage::CollationSeconded(relay_parent, statement.into()); - - ctx.send_message(AllMessages::NetworkBridge( - NetworkBridgeMessage::SendCollationMessage( - vec![peer_id], - protocol_v1::CollationProtocol::CollatorProtocol(wire_message), - ) - )).await; - } + modify_reputation(ctx, peer_id, BENEFIT_NOTIFY_GOOD).await; } /// A peer's view has changed. A number of things should be done: @@ -579,7 +591,7 @@ async fn handle_peer_view_change( peer_data.update_view(view); state.requested_collations - .retain(|(rp, _, pid), _| pid != &peer_id || !peer_data.has_advertised(&rp)); + .retain(|pc, _| pc.peer_id != peer_id || !peer_data.has_advertised(&pc.relay_parent)); Ok(()) } @@ -612,13 +624,13 @@ where ); return; } - - if state.requested_collations.contains_key(&(relay_parent, para_id.clone(), peer_id.clone())) { + let pending_collation = PendingCollation::new(relay_parent, ¶_id, &peer_id); + if state.requested_collations.contains_key(&pending_collation) { tracing::warn!( target: LOG_TARGET, - peer_id = %peer_id, - %para_id, - ?relay_parent, + peer_id = %pending_collation.peer_id, + %pending_collation.para_id, + ?pending_collation.relay_parent, "collation has already been requested", ); return; @@ -640,7 +652,10 @@ where }), }; - state.requested_collations.insert((relay_parent, para_id.clone(), peer_id.clone()), per_request); + state.requested_collations.insert( + PendingCollation::new(relay_parent, ¶_id, &peer_id), + per_request + ); tracing::debug!( target: LOG_TARGET, @@ -655,26 +670,6 @@ where ).await; } -/// Notify `CandidateSelectionSubsystem` that a collation has been advertised. -#[tracing::instrument(level = "trace", skip(ctx), fields(subsystem = LOG_TARGET))] -async fn notify_candidate_selection( - ctx: &mut Context, - collator: CollatorId, - relay_parent: Hash, - para_id: ParaId, -) -where - Context: SubsystemContext -{ - ctx.send_message(AllMessages::CandidateSelection( - CandidateSelectionMessage::Collation( - relay_parent, - para_id, - collator, - ) - )).await; -} - /// Networking message has been received. #[tracing::instrument(level = "trace", skip(ctx, state), fields(subsystem = LOG_TARGET))] async fn process_incoming_peer_message( @@ -688,7 +683,6 @@ where { use protocol_v1::CollatorProtocolMessage::*; use sp_runtime::traits::AppVerify; - match msg { Declare(collator_id, para_id, signature) => { if collator_peer_id(&state.peer_data, &collator_id).is_some() { @@ -739,7 +733,6 @@ where } AdvertiseCollation(relay_parent) => { let _span = state.span_per_relay_parent.get(&relay_parent).map(|s| s.child("advertise-collation")); - if !state.view.contains(&relay_parent) { tracing::debug!( target: LOG_TARGET, @@ -761,7 +754,7 @@ where }; match peer_data.insert_advertisement(relay_parent, &state.view) { - Ok((collator_id, para_id)) => { + Ok((id, para_id)) => { tracing::debug!( target: LOG_TARGET, peer_id = ?origin, @@ -769,8 +762,23 @@ where ?relay_parent, "Received advertise collation", ); + let (tx, rx) = oneshot::channel::<( + CandidateReceipt, + PoV, + )>(); - notify_candidate_selection(ctx, collator_id, relay_parent, para_id).await; + + let pending_collation = PendingCollation::new( + relay_parent, + ¶_id, + &origin, + ); + fetch_collation(ctx, state, pending_collation.clone(), tx).await; + + let future = async move { + ((id, pending_collation), rx.timeout(COLLATION_FETCH_TIMEOUT).await) + }; + state.collations.push(Box::pin(future)); } Err(e) => { tracing::debug!( @@ -791,7 +799,6 @@ where peer_id = ?origin, "Unexpected `CollationSeconded` message, decreasing reputation", ); - modify_reputation(ctx, origin, COST_UNEXPECTED_MESSAGE).await; } } } @@ -805,7 +812,11 @@ async fn remove_relay_parent( relay_parent: Hash, ) -> Result<()> { state.requested_collations.retain(|k, _| { - k.0 != relay_parent + k.relay_parent != relay_parent + }); + + state.pending_candidates.retain(|k, _| { + k != &relay_parent }); Ok(()) } @@ -927,19 +938,9 @@ where "DistributeCollation message is not expected on the validator side of the protocol", ); } - FetchCollation(relay_parent, collator_id, para_id, tx) => { - let _span = state.span_per_relay_parent.get(&relay_parent).map(|s| s.child("fetch-collation")); - fetch_collation(ctx, state, relay_parent, collator_id, para_id, tx).await; - } ReportCollator(id) => { report_collator(ctx, &state.peer_data, id).await; } - NoteGoodCollation(id) => { - note_good_collation(ctx, &state.peer_data, id).await; - } - NotifyCollationSeconded(id, relay_parent, statement) => { - notify_collation_seconded(ctx, &state.peer_data, id, relay_parent, statement).await; - } NetworkBridgeUpdateV1(event) => { if let Err(e) = handle_network_msg( ctx, @@ -960,6 +961,32 @@ where "CollationFetchingRequest message is not expected on the validator side of the protocol", ); } + Seconded(parent, stmt) => { + if let Some(collation_event) = state.pending_candidates.remove(&parent) { + let (collator_id, pending_collation) = collation_event; + let PendingCollation { relay_parent, peer_id, .. } = pending_collation; + note_good_collation(ctx, &state.peer_data, collator_id).await; + notify_collation_seconded(ctx, peer_id, relay_parent, stmt).await; + } else { + tracing::debug!( + target: LOG_TARGET, + relay_parent = ?parent, + "Collation has been seconded, but the relay parent is deactivated", + ); + } + } + Invalid(parent, candidate_receipt) => { + if match state.pending_candidates.get(&parent) { + Some(collation_event) + if Some(candidate_receipt.commitments_hash) == collation_event.1.commitments_hash + => true, + _ => false, + } { + if let Some((id, _)) = state.pending_candidates.remove(&parent) { + report_collator(ctx, &state.peer_data, id).await; + } + } + } } } @@ -990,6 +1017,7 @@ pub(crate) async fn run( let mut state = State { metrics, + ..Default::default() }; @@ -1001,57 +1029,87 @@ pub(crate) async fn run( futures::pin_mut!(next_inactivity_stream); loop { - let res = { - let s = futures::future::select(ctx.recv().fuse(), next_inactivity_stream.next().fuse()); - - if let Poll::Ready(res) = futures::poll!(s) { - Some(match res { - Either::Left((msg, _)) => Either::Left(msg.map_err(Fatal::SubsystemReceive)?), - Either::Right((_, _)) => Either::Right(()), - }) - } else { - None - } - }; - - match res { - Some(Either::Left(msg)) => { - tracing::trace!(target: LOG_TARGET, msg = ?msg, "received a message"); - - match msg { - Communication { msg } => process_msg( - &mut ctx, - &keystore, - msg, - &mut state, - ).await, - Signal(BlockFinalized(..)) => {} - Signal(ActiveLeaves(_)) => {} - Signal(Conclude) => { break } + select! { + res = ctx.recv().fuse() => { + match res { + Ok(Communication { msg }) => { + tracing::trace!(target: LOG_TARGET, msg = ?msg, "received a message"); + process_msg( + &mut ctx, + &keystore, + msg, + &mut state, + ).await; + } + Ok(Signal(Conclude)) => break, + _ => {}, } - - continue } - Some(Either::Right(())) => { + _ = next_inactivity_stream.next() => { disconnect_inactive_peers(&mut ctx, &eviction_policy, &state.peer_data).await; - continue } - None => {} + res = state.collations.next() => { + // If no prior collation for this relay parent has been seconded, then + // memoize the collation_event for that relay_parent, such that we may + // notify the collator of their successful second backing + if let Some((relay_parent, collation_event)) = match res { + Some( + (mut collation_event, Some(Ok((candidate_receipt, pov)))) + ) => { + let relay_parent = &collation_event.1.relay_parent; + // Verify whether this relay_parent has already been seconded + if state.pending_candidates.get(relay_parent).is_none() { + // Forward Candidate Receipt and PoV to candidate backing [CB] + collation_event.1 + .commitments_hash = Some(candidate_receipt.commitments_hash); + ctx.send_message( + CandidateBackingMessage::Second( + relay_parent.clone(), + candidate_receipt, + pov, + ).into() + ).await; + Some((relay_parent.clone(), collation_event)) + } else { + tracing::debug!( + target: LOG_TARGET, + relay_parent = ?relay_parent, + collator_id = ?collation_event.0, + "Collation for this relay parent has already been seconded.", + ); + None + } + } + Some( + (collation_event, _) + ) => { + let (id, pending_collation) = collation_event; + tracing::debug!( + target: LOG_TARGET, + relay_parent = ?pending_collation.relay_parent, + collator_id = ?id, + "Collation fetching has timed out.", + ); + None + } + _ => None, + } { + state.pending_candidates.insert(relay_parent, collation_event); + } + } } let mut retained_requested = HashSet::new(); - for ((hash, para_id, peer_id), per_req) in state.requested_collations.iter_mut() { + for (pending_collation, per_req) in state.requested_collations.iter_mut() { // Despite the await, this won't block on the response itself. let finished = poll_collation_response( - &mut ctx, &state.metrics, &state.span_per_relay_parent, - hash, para_id, peer_id, per_req + &mut ctx, &state.metrics, &state.span_per_relay_parent, pending_collation, per_req, ).await; if !finished { - retained_requested.insert((*hash, *para_id, *peer_id)); + retained_requested.insert(pending_collation.clone()); } } state.requested_collations.retain(|k, _| retained_requested.contains(k)); - futures::pending!(); } Ok(()) } @@ -1082,9 +1140,7 @@ async fn poll_collation_response( ctx: &mut Context, metrics: &Metrics, spans: &HashMap, - hash: &Hash, - para_id: &ParaId, - peer_id: &PeerId, + pending_collation: &PendingCollation, per_req: &mut PerRequest ) -> bool @@ -1100,7 +1156,7 @@ where } if let Poll::Ready(response) = futures::poll!(&mut per_req.from_collator) { - let _span = spans.get(&hash) + let _span = spans.get(&pending_collation.relay_parent) .map(|s| s.child("received-collation")); let _timer = metrics.time_handle_collation_request_result(); @@ -1111,20 +1167,24 @@ where Err(RequestError::InvalidResponse(err)) => { tracing::warn!( target: LOG_TARGET, - hash = ?hash, - para_id = ?para_id, - peer_id = ?peer_id, + hash = ?pending_collation.relay_parent, + para_id = ?pending_collation.para_id, + peer_id = ?pending_collation.peer_id, err = ?err, "Collator provided response that could not be decoded" ); - modify_reputation(ctx, *peer_id, COST_CORRUPTED_MESSAGE).await; + modify_reputation( + ctx, + pending_collation.peer_id.clone(), + COST_CORRUPTED_MESSAGE + ).await; } Err(RequestError::NetworkError(err)) => { tracing::warn!( target: LOG_TARGET, - hash = ?hash, - para_id = ?para_id, - peer_id = ?peer_id, + hash = ?pending_collation.relay_parent, + para_id = ?pending_collation.para_id, + peer_id = ?pending_collation.peer_id, err = ?err, "Fetching collation failed due to network error" ); @@ -1132,44 +1192,43 @@ where // sensbile. In theory this could be exploited, by DoSing this node, // which would result in reduced reputation for proper nodes, but the // same can happen for penalities on timeouts, which we also have. - modify_reputation(ctx, *peer_id, COST_NETWORK_ERROR).await; + modify_reputation(ctx, pending_collation.peer_id.clone(), COST_NETWORK_ERROR).await; } Err(RequestError::Canceled(_)) => { tracing::warn!( target: LOG_TARGET, - hash = ?hash, - para_id = ?para_id, - peer_id = ?peer_id, + hash = ?pending_collation.relay_parent, + para_id = ?pending_collation.para_id, + peer_id = ?pending_collation.peer_id, "Request timed out" ); // A minor decrease in reputation for any network failure seems // sensbile. In theory this could be exploited, by DoSing this node, // which would result in reduced reputation for proper nodes, but the // same can happen for penalities on timeouts, which we also have. - modify_reputation(ctx, *peer_id, COST_REQUEST_TIMED_OUT).await; + modify_reputation(ctx, pending_collation.peer_id.clone(), COST_REQUEST_TIMED_OUT).await; } Ok(CollationFetchingResponse::Collation(receipt, _)) - if receipt.descriptor().para_id != *para_id => + if receipt.descriptor().para_id != pending_collation.para_id => { tracing::debug!( target: LOG_TARGET, - expected_para_id = ?para_id, + expected_para_id = ?pending_collation.para_id, got_para_id = ?receipt.descriptor().para_id, - peer_id = ?peer_id, + peer_id = ?pending_collation.peer_id, "Got wrong para ID for requested collation." ); - modify_reputation(ctx, *peer_id, COST_WRONG_PARA).await; + modify_reputation(ctx, pending_collation.peer_id.clone(), COST_WRONG_PARA).await; } Ok(CollationFetchingResponse::Collation(receipt, pov)) => { tracing::debug!( target: LOG_TARGET, - para_id = %para_id, - hash = ?hash, + para_id = %pending_collation.para_id, + hash = ?pending_collation.relay_parent, candidate_hash = ?receipt.hash(), "Received collation", ); - // Actual sending: let _span = jaeger::Span::new(&pov, "received-collation"); let (mut tx, _) = oneshot::channel(); @@ -1179,9 +1238,9 @@ where if let Err(_) = result { tracing::warn!( target: LOG_TARGET, - hash = ?hash, - para_id = ?para_id, - peer_id = ?peer_id, + hash = ?pending_collation.relay_parent, + para_id = ?pending_collation.para_id, + peer_id = ?pending_collation.peer_id, "Sending response back to requester failed (receiving side closed)" ); } else { @@ -1505,16 +1564,20 @@ mod tests { assert_matches!( overseer_recv(&mut virtual_overseer).await, - AllMessages::CandidateSelection(CandidateSelectionMessage::Collation( - relay_parent, - para_id, - collator, - ) + AllMessages::NetworkBridge(NetworkBridgeMessage::SendRequests(reqs, IfDisconnected::ImmediateError) ) => { - assert_eq!(relay_parent, test_state.relay_parent); - assert_eq!(para_id, test_state.chain_ids[0]); - assert_eq!(collator, pair.public()); + let req = reqs.into_iter().next() + .expect("There should be exactly one request"); + match req { + Requests::CollationFetching(req) => { + let payload = req.payload; + assert_eq!(payload.relay_parent, test_state.relay_parent); + assert_eq!(payload.para_id, test_state.chain_ids[0]); + } + _ => panic!("Unexpected request"), + } }); + virtual_overseer }); } @@ -1606,20 +1669,6 @@ mod tests { } ); - overseer_send( - &mut virtual_overseer, - CollatorProtocolMessage::NoteGoodCollation(test_state.collators[1].public()), - ).await; - - assert_matches!( - overseer_recv(&mut virtual_overseer).await, - AllMessages::NetworkBridge( - NetworkBridgeMessage::ReportPeer(peer, rep), - ) => { - assert_eq!(peer, peer_c); - assert_eq!(rep, BENEFIT_NOTIFY_GOOD); - } - ); virtual_overseer }); } @@ -1677,9 +1726,9 @@ mod tests { // A test scenario that takes the following steps // - Two collators connect, declare themselves and advertise a collation relevant to - // our view. + // our view. // - This results subsystem acting upon these advertisements and issuing two messages to - // the CandidateBacking subsystem. + // the CandidateBacking subsystem. // - CandidateBacking requests both of the collations. // - Collation protocol requests these collations. // - The collations are sent to it. @@ -1767,19 +1816,6 @@ mod tests { ) ).await; - assert_matches!( - overseer_recv(&mut virtual_overseer).await, - AllMessages::CandidateSelection(CandidateSelectionMessage::Collation( - relay_parent, - para_id, - collator, - ) - ) => { - assert_eq!(relay_parent, test_state.relay_parent); - assert_eq!(para_id, test_state.chain_ids[0]); - assert_eq!(collator, test_state.collators[0].public()); - }); - overseer_send( &mut virtual_overseer, CollatorProtocolMessage::NetworkBridgeUpdateV1( @@ -1792,43 +1828,6 @@ mod tests { ) ).await; - assert_matches!( - overseer_recv(&mut virtual_overseer).await, - AllMessages::CandidateSelection(CandidateSelectionMessage::Collation( - relay_parent, - para_id, - collator, - ) - ) => { - assert_eq!(relay_parent, test_state.relay_parent); - assert_eq!(para_id, test_state.chain_ids[0]); - assert_eq!(collator, test_state.collators[1].public()); - }); - - let (tx_0, rx_0) = oneshot::channel(); - - overseer_send( - &mut virtual_overseer, - CollatorProtocolMessage::FetchCollation( - test_state.relay_parent, - test_state.collators[0].public(), - test_state.chain_ids[0], - tx_0, - ) - ).await; - - let (tx_1, rx_1) = oneshot::channel(); - - overseer_send( - &mut virtual_overseer, - CollatorProtocolMessage::FetchCollation( - test_state.relay_parent, - test_state.collators[1].public(), - test_state.chain_ids[0], - tx_1, - ) - ).await; - let response_channel = assert_matches!( overseer_recv(&mut virtual_overseer).await, AllMessages::NetworkBridge(NetworkBridgeMessage::SendRequests(reqs, IfDisconnected::ImmediateError) @@ -1858,7 +1857,7 @@ mod tests { ).encode() )).expect("Sending response should succeed"); - let response_channel = assert_matches!( + let _ = assert_matches!( overseer_recv(&mut virtual_overseer).await, AllMessages::NetworkBridge(NetworkBridgeMessage::SendRequests(reqs, IfDisconnected::ImmediateError) ) => { @@ -1875,24 +1874,6 @@ mod tests { } }); - let mut candidate_b = CandidateReceipt::default(); - candidate_b.descriptor.para_id = test_state.chain_ids[0]; - candidate_b.descriptor.relay_parent = test_state.relay_parent; - - response_channel.send(Ok( - CollationFetchingResponse::Collation( - candidate_b.clone(), - PoV { - block_data: BlockData(vec![1, 2, 3]), - }, - ).encode() - )).expect("Sending response should succeed"); - - let collation_0 = rx_0.await.unwrap(); - let collation_1 = rx_1.await.unwrap(); - - assert_eq!(collation_0.0, candidate_a); - assert_eq!(collation_1.0, candidate_b); virtual_overseer }); } @@ -1958,20 +1939,35 @@ mod tests { ) ).await; - assert_matches!( + let _ = assert_matches!( overseer_recv(&mut virtual_overseer).await, - AllMessages::CandidateSelection(CandidateSelectionMessage::Collation( - relay_parent, - para_id, - collator, - ) + AllMessages::NetworkBridge(NetworkBridgeMessage::SendRequests(reqs, IfDisconnected::ImmediateError) ) => { - assert_eq!(relay_parent, test_state.relay_parent); - assert_eq!(para_id, test_state.chain_ids[0]); - assert_eq!(collator, pair.public()); + let req = reqs.into_iter().next() + .expect("There should be exactly one request"); + match req { + Requests::CollationFetching(req) => { + let payload = req.payload; + assert_eq!(payload.relay_parent, test_state.relay_parent); + assert_eq!(payload.para_id, test_state.chain_ids[0]); + req.pending_response + } + _ => panic!("Unexpected request"), + } }); - Delay::new(ACTIVITY_TIMEOUT * 2).await; + Delay::new(ACTIVITY_TIMEOUT * 3).await; + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::NetworkBridge(NetworkBridgeMessage::ReportPeer( + peer, + rep, + )) => { + assert_eq!(peer, peer_b); + assert_eq!(rep, COST_REQUEST_TIMED_OUT); + } + ); assert_matches!( overseer_recv(&mut virtual_overseer).await, @@ -2057,15 +2053,18 @@ mod tests { assert_matches!( overseer_recv(&mut virtual_overseer).await, - AllMessages::CandidateSelection(CandidateSelectionMessage::Collation( - relay_parent, - para_id, - collator, - ) + AllMessages::NetworkBridge(NetworkBridgeMessage::SendRequests(reqs, IfDisconnected::ImmediateError) ) => { - assert_eq!(relay_parent, test_state.relay_parent); - assert_eq!(para_id, test_state.chain_ids[0]); - assert_eq!(collator, pair.public()); + let req = reqs.into_iter().next() + .expect("There should be exactly one request"); + match req { + Requests::CollationFetching(req) => { + let payload = req.payload; + assert_eq!(payload.relay_parent, hash_a); + assert_eq!(payload.para_id, test_state.chain_ids[0]); + } + _ => panic!("Unexpected request"), + } }); Delay::new(ACTIVITY_TIMEOUT * 2 / 3).await; @@ -2084,15 +2083,29 @@ mod tests { assert_matches!( overseer_recv(&mut virtual_overseer).await, - AllMessages::CandidateSelection(CandidateSelectionMessage::Collation( - relay_parent, - para_id, - collator, - ) + AllMessages::NetworkBridge(NetworkBridgeMessage::ReportPeer( + peer, + rep, + )) => { + assert_eq!(peer, peer_b); + assert_eq!(rep, COST_REQUEST_TIMED_OUT); + } + ); + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::NetworkBridge(NetworkBridgeMessage::SendRequests(reqs, IfDisconnected::ImmediateError) ) => { - assert_eq!(relay_parent, hash_b); - assert_eq!(para_id, test_state.chain_ids[0]); - assert_eq!(collator, pair.public()); + let req = reqs.into_iter().next() + .expect("There should be exactly one request"); + match req { + Requests::CollationFetching(req) => { + let payload = req.payload; + assert_eq!(payload.relay_parent, hash_b); + assert_eq!(payload.para_id, test_state.chain_ids[0]); + } + _ => panic!("Unexpected request"), + } }); Delay::new(ACTIVITY_TIMEOUT * 2 / 3).await; @@ -2111,19 +2124,44 @@ mod tests { assert_matches!( overseer_recv(&mut virtual_overseer).await, - AllMessages::CandidateSelection(CandidateSelectionMessage::Collation( - relay_parent, - para_id, - collator, - ) + AllMessages::NetworkBridge(NetworkBridgeMessage::ReportPeer( + peer, + rep, + )) => { + assert_eq!(peer, peer_b); + assert_eq!(rep, COST_REQUEST_TIMED_OUT); + } + ); + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::NetworkBridge(NetworkBridgeMessage::SendRequests(reqs, IfDisconnected::ImmediateError) ) => { - assert_eq!(relay_parent, hash_c); - assert_eq!(para_id, test_state.chain_ids[0]); - assert_eq!(collator, pair.public()); + let req = reqs.into_iter().next() + .expect("There should be exactly one request"); + match req { + Requests::CollationFetching(req) => { + let payload = req.payload; + assert_eq!(payload.relay_parent, hash_c); + assert_eq!(payload.para_id, test_state.chain_ids[0]); + } + _ => panic!("Unexpected request"), + } }); Delay::new(ACTIVITY_TIMEOUT * 3 / 2).await; + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::NetworkBridge(NetworkBridgeMessage::ReportPeer( + peer, + rep, + )) => { + assert_eq!(peer, peer_b); + assert_eq!(rep, COST_REQUEST_TIMED_OUT); + } + ); + assert_matches!( overseer_recv(&mut virtual_overseer).await, AllMessages::NetworkBridge(NetworkBridgeMessage::DisconnectPeer( @@ -2327,4 +2365,190 @@ mod tests { virtual_overseer }) } + + // A test scenario that takes the following steps + // - Two collators connect, declare themselves and advertise a collation relevant to + // our view. + // - This results subsystem acting upon these advertisements and issuing two messages to + // the CandidateBacking subsystem. + // - CandidateBacking requests both of the collations. + // - Collation protocol requests these collations. + // - The collations are sent to it. + // - Collations are fetched correctly. + #[test] + fn seconding_works() { + let test_state = TestState::default(); + + test_harness(|test_harness| async move { + let TestHarness { + mut virtual_overseer, + } = test_harness; + + overseer_send( + &mut virtual_overseer, + CollatorProtocolMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::OurViewChange(our_view![test_state.relay_parent]) + ), + ).await; + + respond_to_core_info_queries(&mut virtual_overseer, &test_state).await; + + let peer_b = PeerId::random(); + let peer_c = PeerId::random(); + + overseer_send( + &mut virtual_overseer, + CollatorProtocolMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::PeerConnected( + peer_b, + ObservedRole::Full, + None, + ), + ) + ).await; + + overseer_send( + &mut virtual_overseer, + CollatorProtocolMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::PeerConnected( + peer_c, + ObservedRole::Full, + None, + ), + ) + ).await; + + overseer_send( + &mut virtual_overseer, + CollatorProtocolMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::PeerMessage( + peer_b.clone(), + protocol_v1::CollatorProtocolMessage::Declare( + test_state.collators[0].public(), + test_state.chain_ids[0], + test_state.collators[0].sign(&protocol_v1::declare_signature_payload(&peer_b)), + ) + ) + ) + ).await; + + overseer_send( + &mut virtual_overseer, + CollatorProtocolMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::PeerMessage( + peer_c.clone(), + protocol_v1::CollatorProtocolMessage::Declare( + test_state.collators[1].public(), + test_state.chain_ids[0], + test_state.collators[1].sign(&protocol_v1::declare_signature_payload(&peer_c)), + ) + ) + ) + ).await; + + overseer_send( + &mut virtual_overseer, + CollatorProtocolMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::PeerMessage( + peer_b.clone(), + protocol_v1::CollatorProtocolMessage::AdvertiseCollation( + test_state.relay_parent, + ) + ) + ) + ).await; + + overseer_send( + &mut virtual_overseer, + CollatorProtocolMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::PeerMessage( + peer_c.clone(), + protocol_v1::CollatorProtocolMessage::AdvertiseCollation( + test_state.relay_parent, + ) + ) + ) + ).await; + + let response_channel = assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::NetworkBridge(NetworkBridgeMessage::SendRequests(reqs, IfDisconnected::ImmediateError) + ) => { + let req = reqs.into_iter().next() + .expect("There should be exactly one request"); + match req { + Requests::CollationFetching(req) => { + let payload = req.payload; + assert_eq!(payload.relay_parent, test_state.relay_parent); + assert_eq!(payload.para_id, test_state.chain_ids[0]); + req.pending_response + } + _ => panic!("Unexpected request"), + } + }); + + let mut candidate_a = CandidateReceipt::default(); + // Memoize PoV data to ensure we receive the right one + let pov = PoV { + block_data: BlockData(vec![1, 2, 3, 4, 5]), + }; + candidate_a.descriptor.para_id = test_state.chain_ids[0]; + candidate_a.descriptor.relay_parent = test_state.relay_parent; + response_channel.send(Ok( + CollationFetchingResponse::Collation( + candidate_a.clone(), + pov.clone(), + ).encode() + )).expect("Sending response should succeed"); + + let response_channel = assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::NetworkBridge(NetworkBridgeMessage::SendRequests(reqs, IfDisconnected::ImmediateError) + ) => { + let req = reqs.into_iter().next() + .expect("There should be exactly one request"); + match req { + Requests::CollationFetching(req) => { + let payload = req.payload; + assert_eq!(payload.relay_parent, test_state.relay_parent); + assert_eq!(payload.para_id, test_state.chain_ids[0]); + req.pending_response + } + _ => panic!("Unexpected request"), + } + }); + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::CandidateBacking(CandidateBackingMessage::Second(relay_parent, candidate_receipt, incoming_pov) + ) => { + assert_eq!(relay_parent, test_state.relay_parent); + assert_eq!(candidate_receipt.descriptor.para_id, test_state.chain_ids[0]); + assert_eq!(incoming_pov, pov); + }); + + let mut candidate_b = CandidateReceipt::default(); + candidate_b.descriptor.para_id = test_state.chain_ids[0]; + candidate_b.descriptor.relay_parent = test_state.relay_parent; + + // Send second collation to ensure first collation gets seconded + response_channel.send(Ok( + CollationFetchingResponse::Collation( + candidate_b.clone(), + PoV { + block_data: BlockData(vec![]), + }, + ).encode() + )).expect("Sending response should succeed after seconding"); + + // Ensure we don't receive any message related to candidate backing + // All Peers should get disconnected after successful Candidate Backing Message + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::NetworkBridge(NetworkBridgeMessage::DisconnectPeer(_, _) + ) => {}); + + virtual_overseer + }); + } } diff --git a/polkadot/node/network/protocol/src/lib.rs b/polkadot/node/network/protocol/src/lib.rs index 75e9a88052..5d15bbde59 100644 --- a/polkadot/node/network/protocol/src/lib.rs +++ b/polkadot/node/network/protocol/src/lib.rs @@ -294,8 +294,9 @@ pub mod v1 { use polkadot_primitives::v1::{ CandidateHash, CandidateIndex, CollatorId, CollatorSignature, CompactStatement, Hash, Id as ParaId, UncheckedSignedAvailabilityBitfield, - ValidatorIndex, ValidatorSignature + ValidatorIndex, ValidatorSignature, }; + use polkadot_node_primitives::{ approval::{IndirectAssignmentCert, IndirectSignedApprovalVote}, UncheckedSignedFullStatement, diff --git a/polkadot/node/overseer/src/lib.rs b/polkadot/node/overseer/src/lib.rs index 2c656b6396..b5c2523aef 100644 --- a/polkadot/node/overseer/src/lib.rs +++ b/polkadot/node/overseer/src/lib.rs @@ -82,7 +82,7 @@ use sp_api::{ApiExt, ProvideRuntimeApi}; use polkadot_subsystem::messages::{ CandidateValidationMessage, CandidateBackingMessage, - CandidateSelectionMessage, ChainApiMessage, StatementDistributionMessage, + ChainApiMessage, StatementDistributionMessage, AvailabilityDistributionMessage, BitfieldSigningMessage, BitfieldDistributionMessage, ProvisionerMessage, RuntimeApiMessage, AvailabilityStoreMessage, NetworkBridgeMessage, AllMessages, CollationGenerationMessage, @@ -148,7 +148,7 @@ impl HeadSupportsParachains for Arc where /// subsystems are implemented and the rest can be mocked with the [`DummySubsystem`]. #[derive(Debug, Clone, AllSubsystemsGen)] pub struct AllSubsystems< - CV = (), CB = (), CS = (), SD = (), AD = (), AR = (), BS = (), BD = (), P = (), + CV = (), CB = (), SD = (), AD = (), AR = (), BS = (), BD = (), P = (), RA = (), AS = (), NB = (), CA = (), CG = (), CP = (), ApD = (), ApV = (), GS = (), > { @@ -156,8 +156,6 @@ pub struct AllSubsystems< pub candidate_validation: CV, /// A candidate backing subsystem. pub candidate_backing: CB, - /// A candidate selection subsystem. - pub candidate_selection: CS, /// A statement distribution subsystem. pub statement_distribution: SD, /// An availability distribution subsystem. @@ -190,8 +188,8 @@ pub struct AllSubsystems< pub gossip_support: GS, } -impl - AllSubsystems +impl + AllSubsystems { /// Create a new instance of [`AllSubsystems`]. /// @@ -223,12 +221,10 @@ impl DummySubsystem, DummySubsystem, DummySubsystem, - DummySubsystem, > { AllSubsystems { candidate_validation: DummySubsystem, candidate_backing: DummySubsystem, - candidate_selection: DummySubsystem, statement_distribution: DummySubsystem, availability_distribution: DummySubsystem, availability_recovery: DummySubsystem, @@ -247,11 +243,10 @@ impl } } - fn as_ref(&self) -> AllSubsystems<&'_ CV, &'_ CB, &'_ CS, &'_ SD, &'_ AD, &'_ AR, &'_ BS, &'_ BD, &'_ P, &'_ RA, &'_ AS, &'_ NB, &'_ CA, &'_ CG, &'_ CP, &'_ ApD, &'_ ApV, &'_ GS> { + fn as_ref(&self) -> AllSubsystems<&'_ CV, &'_ CB, &'_ SD, &'_ AD, &'_ AR, &'_ BS, &'_ BD, &'_ P, &'_ RA, &'_ AS, &'_ NB, &'_ CA, &'_ CG, &'_ CP, &'_ ApD, &'_ ApV, &'_ GS> { AllSubsystems { candidate_validation: &self.candidate_validation, candidate_backing: &self.candidate_backing, - candidate_selection: &self.candidate_selection, statement_distribution: &self.statement_distribution, availability_distribution: &self.availability_distribution, availability_recovery: &self.availability_recovery, @@ -274,7 +269,6 @@ impl -> AllSubsystems< >::Output, >::Output, - >::Output, >::Output, >::Output, >::Output, @@ -294,7 +288,6 @@ impl where M: MapSubsystem, M: MapSubsystem, - M: MapSubsystem, M: MapSubsystem, M: MapSubsystem, M: MapSubsystem, @@ -314,7 +307,6 @@ impl AllSubsystems { candidate_validation: m.map_subsystem(self.candidate_validation), candidate_backing: m.map_subsystem(self.candidate_backing), - candidate_selection: m.map_subsystem(self.candidate_selection), statement_distribution: m.map_subsystem(self.statement_distribution), availability_distribution: m.map_subsystem(self.availability_distribution), availability_recovery: m.map_subsystem(self.availability_recovery), @@ -338,7 +330,7 @@ type AllSubsystemsSame = AllSubsystems< T, T, T, T, T, T, T, T, T, T, T, T, T, T, T, - T, T, T, + T, T, >; /// A type of messages that are sent from [`Subsystem`] to [`Overseer`]. @@ -546,7 +538,6 @@ fn make_packet(signals_received: usize, message: T) -> MessagePacket { struct ChannelsOut { candidate_validation: metered::MeteredSender>, candidate_backing: metered::MeteredSender>, - candidate_selection: metered::MeteredSender>, statement_distribution: metered::MeteredSender>, availability_distribution: metered::MeteredSender>, availability_recovery: metered::MeteredSender>, @@ -565,7 +556,6 @@ struct ChannelsOut { candidate_validation_unbounded: metered::UnboundedMeteredSender>, candidate_backing_unbounded: metered::UnboundedMeteredSender>, - candidate_selection_unbounded: metered::UnboundedMeteredSender>, statement_distribution_unbounded: metered::UnboundedMeteredSender>, availability_distribution_unbounded: metered::UnboundedMeteredSender>, availability_recovery_unbounded: metered::UnboundedMeteredSender>, @@ -596,9 +586,6 @@ impl ChannelsOut { AllMessages::CandidateBacking(msg) => { self.candidate_backing.send(make_packet(signals_received, msg)).await }, - AllMessages::CandidateSelection(msg) => { - self.candidate_selection.send(make_packet(signals_received, msg)).await - }, AllMessages::StatementDistribution(msg) => { self.statement_distribution.send(make_packet(signals_received, msg)).await }, @@ -671,11 +658,6 @@ impl ChannelsOut { .unbounded_send(make_packet(signals_received, msg)) .map_err(|e| e.into_send_error()) }, - AllMessages::CandidateSelection(msg) => { - self.candidate_selection_unbounded - .unbounded_send(make_packet(signals_received, msg)) - .map_err(|e| e.into_send_error()) - }, AllMessages::StatementDistribution(msg) => { self.statement_distribution_unbounded .unbounded_send(make_packet(signals_received, msg)) @@ -1058,7 +1040,6 @@ pub struct Overseer { subsystems: AllSubsystems< OverseenSubsystem, OverseenSubsystem, - OverseenSubsystem, OverseenSubsystem, OverseenSubsystem, OverseenSubsystem, @@ -1379,9 +1360,9 @@ where /// # /// # }); } /// ``` - pub fn new( + pub fn new( leaves: impl IntoIterator, - all_subsystems: AllSubsystems, + all_subsystems: AllSubsystems, prometheus_registry: Option<&prometheus::Registry>, supports_parachains: SupportsParachains, mut s: S, @@ -1389,7 +1370,6 @@ where where CV: Subsystem> + Send, CB: Subsystem> + Send, - CS: Subsystem> + Send, SD: Subsystem> + Send, AD: Subsystem> + Send, AR: Subsystem> + Send, @@ -1422,8 +1402,6 @@ where = metered::channel(CHANNEL_CAPACITY); let (candidate_backing_bounded_tx, candidate_backing_bounded_rx) = metered::channel(CHANNEL_CAPACITY); - let (candidate_selection_bounded_tx, candidate_selection_bounded_rx) - = metered::channel(CHANNEL_CAPACITY); let (statement_distribution_bounded_tx, statement_distribution_bounded_rx) = metered::channel(CHANNEL_CAPACITY); let (availability_distribution_bounded_tx, availability_distribution_bounded_rx) @@ -1459,8 +1437,6 @@ where = metered::unbounded(); let (candidate_backing_unbounded_tx, candidate_backing_unbounded_rx) = metered::unbounded(); - let (candidate_selection_unbounded_tx, candidate_selection_unbounded_rx) - = metered::unbounded(); let (statement_distribution_unbounded_tx, statement_distribution_unbounded_rx) = metered::unbounded(); let (availability_distribution_unbounded_tx, availability_distribution_unbounded_rx) @@ -1495,7 +1471,6 @@ where let channels_out = ChannelsOut { candidate_validation: candidate_validation_bounded_tx.clone(), candidate_backing: candidate_backing_bounded_tx.clone(), - candidate_selection: candidate_selection_bounded_tx.clone(), statement_distribution: statement_distribution_bounded_tx.clone(), availability_distribution: availability_distribution_bounded_tx.clone(), availability_recovery: availability_recovery_bounded_tx.clone(), @@ -1514,7 +1489,6 @@ where candidate_validation_unbounded: candidate_validation_unbounded_tx.clone(), candidate_backing_unbounded: candidate_backing_unbounded_tx.clone(), - candidate_selection_unbounded: candidate_selection_unbounded_tx.clone(), statement_distribution_unbounded: statement_distribution_unbounded_tx.clone(), availability_distribution_unbounded: availability_distribution_unbounded_tx.clone(), availability_recovery_unbounded: availability_recovery_unbounded_tx.clone(), @@ -1558,19 +1532,6 @@ where TaskKind::Regular, )?; - let candidate_selection_subsystem = spawn( - &mut s, - candidate_selection_bounded_tx, - stream::select(candidate_selection_bounded_rx, candidate_selection_unbounded_rx), - candidate_selection_unbounded_tx.meter().clone(), - channels_out.clone(), - to_overseer_tx.clone(), - all_subsystems.candidate_selection, - &metrics, - &mut running_subsystems, - TaskKind::Regular, - )?; - let statement_distribution_subsystem = spawn( &mut s, statement_distribution_bounded_tx, @@ -1777,7 +1738,6 @@ where let subsystems = AllSubsystems { candidate_validation: candidate_validation_subsystem, candidate_backing: candidate_backing_subsystem, - candidate_selection: candidate_selection_subsystem, statement_distribution: statement_distribution_subsystem, availability_distribution: availability_distribution_subsystem, availability_recovery: availability_recovery_subsystem, @@ -1853,7 +1813,6 @@ where async fn stop(mut self) { let _ = self.subsystems.candidate_validation.send_signal(OverseerSignal::Conclude).await; let _ = self.subsystems.candidate_backing.send_signal(OverseerSignal::Conclude).await; - let _ = self.subsystems.candidate_selection.send_signal(OverseerSignal::Conclude).await; let _ = self.subsystems.statement_distribution.send_signal(OverseerSignal::Conclude).await; let _ = self.subsystems.availability_distribution.send_signal(OverseerSignal::Conclude).await; let _ = self.subsystems.availability_recovery.send_signal(OverseerSignal::Conclude).await; @@ -2036,7 +1995,6 @@ where async fn broadcast_signal(&mut self, signal: OverseerSignal) -> SubsystemResult<()> { self.subsystems.candidate_validation.send_signal(signal.clone()).await?; self.subsystems.candidate_backing.send_signal(signal.clone()).await?; - self.subsystems.candidate_selection.send_signal(signal.clone()).await?; self.subsystems.statement_distribution.send_signal(signal.clone()).await?; self.subsystems.availability_distribution.send_signal(signal.clone()).await?; self.subsystems.availability_recovery.send_signal(signal.clone()).await?; @@ -2066,9 +2024,6 @@ where AllMessages::CandidateBacking(msg) => { self.subsystems.candidate_backing.send_message(msg).await?; }, - AllMessages::CandidateSelection(msg) => { - self.subsystems.candidate_selection.send_message(msg).await?; - }, AllMessages::StatementDistribution(msg) => { self.subsystems.statement_distribution.send_message(msg).await?; }, @@ -3092,10 +3047,6 @@ mod tests { CandidateBackingMessage::GetBackedCandidates(Default::default(), Vec::new(), sender) } - fn test_candidate_selection_msg() -> CandidateSelectionMessage { - CandidateSelectionMessage::default() - } - fn test_chain_api_msg() -> ChainApiMessage { let (sender, _) = oneshot::channel(); ChainApiMessage::FinalizedBlockNumber(sender) @@ -3177,7 +3128,7 @@ mod tests { // Checks that `stop`, `broadcast_signal` and `broadcast_message` are implemented correctly. #[test] fn overseer_all_subsystems_receive_signals_and_messages() { - const NUM_SUBSYSTEMS: usize = 18; + const NUM_SUBSYSTEMS: usize = 17; // -3 for BitfieldSigning, GossipSupport and AvailabilityDistribution const NUM_SUBSYSTEMS_MESSAGED: usize = NUM_SUBSYSTEMS - 3; @@ -3196,7 +3147,6 @@ mod tests { let all_subsystems = AllSubsystems { candidate_validation: subsystem.clone(), candidate_backing: subsystem.clone(), - candidate_selection: subsystem.clone(), collation_generation: subsystem.clone(), collator_protocol: subsystem.clone(), statement_distribution: subsystem.clone(), @@ -3235,7 +3185,6 @@ mod tests { // except for BitfieldSigning and GossipSupport as the messages are not instantiable handler.send_msg(AllMessages::CandidateValidation(test_candidate_validation_msg())).await; handler.send_msg(AllMessages::CandidateBacking(test_candidate_backing_msg())).await; - handler.send_msg(AllMessages::CandidateSelection(test_candidate_selection_msg())).await; handler.send_msg(AllMessages::CollationGeneration(test_collator_generation_msg())).await; handler.send_msg(AllMessages::CollatorProtocol(test_collator_protocol_msg())).await; handler.send_msg(AllMessages::StatementDistribution(test_statement_distribution_msg())).await; @@ -3285,7 +3234,6 @@ mod tests { fn context_holds_onto_message_until_enough_signals_received() { let (candidate_validation_bounded_tx, _) = metered::channel(CHANNEL_CAPACITY); let (candidate_backing_bounded_tx, _) = metered::channel(CHANNEL_CAPACITY); - let (candidate_selection_bounded_tx, _) = metered::channel(CHANNEL_CAPACITY); let (statement_distribution_bounded_tx, _) = metered::channel(CHANNEL_CAPACITY); let (availability_distribution_bounded_tx, _) = metered::channel(CHANNEL_CAPACITY); let (availability_recovery_bounded_tx, _) = metered::channel(CHANNEL_CAPACITY); @@ -3304,7 +3252,6 @@ mod tests { let (candidate_validation_unbounded_tx, _) = metered::unbounded(); let (candidate_backing_unbounded_tx, _) = metered::unbounded(); - let (candidate_selection_unbounded_tx, _) = metered::unbounded(); let (statement_distribution_unbounded_tx, _) = metered::unbounded(); let (availability_distribution_unbounded_tx, _) = metered::unbounded(); let (availability_recovery_unbounded_tx, _) = metered::unbounded(); @@ -3324,7 +3271,6 @@ mod tests { let channels_out = ChannelsOut { candidate_validation: candidate_validation_bounded_tx.clone(), candidate_backing: candidate_backing_bounded_tx.clone(), - candidate_selection: candidate_selection_bounded_tx.clone(), statement_distribution: statement_distribution_bounded_tx.clone(), availability_distribution: availability_distribution_bounded_tx.clone(), availability_recovery: availability_recovery_bounded_tx.clone(), @@ -3343,7 +3289,6 @@ mod tests { candidate_validation_unbounded: candidate_validation_unbounded_tx.clone(), candidate_backing_unbounded: candidate_backing_unbounded_tx.clone(), - candidate_selection_unbounded: candidate_selection_unbounded_tx.clone(), statement_distribution_unbounded: statement_distribution_unbounded_tx.clone(), availability_distribution_unbounded: availability_distribution_unbounded_tx.clone(), availability_recovery_unbounded: availability_recovery_unbounded_tx.clone(), diff --git a/polkadot/node/service/Cargo.toml b/polkadot/node/service/Cargo.toml index 52578716aa..1ede8c5a9b 100644 --- a/polkadot/node/service/Cargo.toml +++ b/polkadot/node/service/Cargo.toml @@ -97,7 +97,6 @@ polkadot-node-collation-generation = { path = "../collation-generation", optiona polkadot-node-core-av-store = { path = "../core/av-store", optional = true } polkadot-node-core-backing = { path = "../core/backing", optional = true } polkadot-node-core-bitfield-signing = { path = "../core/bitfield-signing", optional = true } -polkadot-node-core-candidate-selection = { path = "../core/candidate-selection", optional = true } polkadot-node-core-candidate-validation = { path = "../core/candidate-validation", optional = true } polkadot-node-core-chain-api = { path = "../core/chain-api", optional = true } polkadot-node-core-provisioner = { path = "../core/provisioner", optional = true } @@ -125,7 +124,6 @@ full-node = [ "polkadot-node-collation-generation", "polkadot-node-core-backing", "polkadot-node-core-bitfield-signing", - "polkadot-node-core-candidate-selection", "polkadot-node-core-candidate-validation", "polkadot-node-core-chain-api", "polkadot-node-core-provisioner", diff --git a/polkadot/node/service/src/lib.rs b/polkadot/node/service/src/lib.rs index 66c5be840f..eab6442fd1 100644 --- a/polkadot/node/service/src/lib.rs +++ b/polkadot/node/service/src/lib.rs @@ -447,7 +447,6 @@ where use polkadot_availability_bitfield_distribution::BitfieldDistribution as BitfieldDistributionSubsystem; use polkadot_node_core_bitfield_signing::BitfieldSigningSubsystem; use polkadot_node_core_backing::CandidateBackingSubsystem; - use polkadot_node_core_candidate_selection::CandidateSelectionSubsystem; use polkadot_node_core_candidate_validation::CandidateValidationSubsystem; use polkadot_node_core_chain_api::ChainApiSubsystem; use polkadot_node_collation_generation::CollationGenerationSubsystem; @@ -486,11 +485,6 @@ where keystore.clone(), Metrics::register(registry)?, ), - candidate_selection: CandidateSelectionSubsystem::new( - spawner.clone(), - keystore.clone(), - Metrics::register(registry)?, - ), candidate_validation: CandidateValidationSubsystem::with_config( candidate_validation_config, Metrics::register(registry)?, diff --git a/polkadot/node/subsystem-test-helpers/src/lib.rs b/polkadot/node/subsystem-test-helpers/src/lib.rs index 44d2d92288..f5333f72aa 100644 --- a/polkadot/node/subsystem-test-helpers/src/lib.rs +++ b/polkadot/node/subsystem-test-helpers/src/lib.rs @@ -362,8 +362,8 @@ mod tests { use super::*; use polkadot_overseer::{Overseer, HeadSupportsParachains, AllSubsystems}; use futures::executor::block_on; - use polkadot_node_subsystem::messages::CandidateSelectionMessage; use polkadot_primitives::v1::Hash; + use polkadot_node_subsystem::messages::CollatorProtocolMessage; struct AlwaysSupportsParachains; impl HeadSupportsParachains for AlwaysSupportsParachains { @@ -374,7 +374,7 @@ mod tests { fn forward_subsystem_works() { let spawner = sp_core::testing::TaskExecutor::new(); let (tx, rx) = mpsc::channel(2); - let all_subsystems = AllSubsystems::<()>::dummy().replace_candidate_selection(ForwardSubsystem(tx)); + let all_subsystems = AllSubsystems::<()>::dummy().replace_collator_protocol(ForwardSubsystem(tx)); let (overseer, mut handler) = Overseer::new( Vec::new(), all_subsystems, @@ -385,7 +385,7 @@ mod tests { spawner.spawn("overseer", overseer.run().then(|_| async { () }).boxed()); - block_on(handler.send_msg(CandidateSelectionMessage::Invalid(Default::default(), Default::default()))); - assert!(matches!(block_on(rx.into_future()).0.unwrap(), CandidateSelectionMessage::Invalid(_, _))); + block_on(handler.send_msg(CollatorProtocolMessage::CollateOn(Default::default()))); + assert!(matches!(block_on(rx.into_future()).0.unwrap(), CollatorProtocolMessage::CollateOn(_))); } } diff --git a/polkadot/node/subsystem-util/src/lib.rs b/polkadot/node/subsystem-util/src/lib.rs index 0664d70438..27433a3baa 100644 --- a/polkadot/node/subsystem-util/src/lib.rs +++ b/polkadot/node/subsystem-util/src/lib.rs @@ -867,7 +867,7 @@ mod tests { use thiserror::Error; use polkadot_node_jaeger as jaeger; use polkadot_node_subsystem::{ - messages::{AllMessages, CandidateSelectionMessage}, ActiveLeavesUpdate, FromOverseer, OverseerSignal, + messages::{AllMessages, CollatorProtocolMessage}, ActiveLeavesUpdate, FromOverseer, OverseerSignal, SpawnedSubsystem, ActivatedLeaf, LeafStatus, }; use assert_matches::assert_matches; @@ -884,8 +884,8 @@ mod tests { // job structs are constructed within JobTrait::run // most will want to retain the sender and receiver, as well as whatever other data they like - struct FakeCandidateSelectionJob { - receiver: mpsc::Receiver, + struct FakeCollatorProtocolJob { + receiver: mpsc::Receiver, } // Error will mostly be a wrapper to make the try operator more convenient; @@ -897,13 +897,13 @@ mod tests { Sending(#[from]mpsc::SendError), } - impl JobTrait for FakeCandidateSelectionJob { - type ToJob = CandidateSelectionMessage; + impl JobTrait for FakeCollatorProtocolJob { + type ToJob = CollatorProtocolMessage; type Error = Error; type RunArgs = bool; type Metrics = (); - const NAME: &'static str = "FakeCandidateSelectionJob"; + const NAME: &'static str = "FakeCollatorProtocolJob"; /// Run a job for the parent block indicated // @@ -913,14 +913,14 @@ mod tests { _: Arc, run_args: Self::RunArgs, _metrics: Self::Metrics, - receiver: mpsc::Receiver, + receiver: mpsc::Receiver, mut sender: JobSender, ) -> Pin> + Send>> { async move { - let job = FakeCandidateSelectionJob { receiver }; + let job = FakeCollatorProtocolJob { receiver }; if run_args { - sender.send_message(CandidateSelectionMessage::Invalid( + sender.send_message(CollatorProtocolMessage::Invalid( Default::default(), Default::default(), ).into()).await; @@ -934,7 +934,7 @@ mod tests { } } - impl FakeCandidateSelectionJob { + impl FakeCollatorProtocolJob { async fn run_loop(mut self) -> Result<(), Error> { loop { match self.receiver.next().await { @@ -950,11 +950,11 @@ mod tests { } // with the job defined, it's straightforward to get a subsystem implementation. - type FakeCandidateSelectionSubsystem = - JobSubsystem; + type FakeCollatorProtocolSubsystem = + JobSubsystem; // this type lets us pretend to be the overseer - type OverseerHandle = test_helpers::TestSubsystemContextHandle; + type OverseerHandle = test_helpers::TestSubsystemContextHandle; fn test_harness>( run_args: bool, @@ -971,7 +971,7 @@ mod tests { let pool = sp_core::testing::TaskExecutor::new(); let (context, overseer_handle) = make_subsystem_context(pool.clone()); - let subsystem = FakeCandidateSelectionSubsystem::new( + let subsystem = FakeCollatorProtocolSubsystem::new( pool, run_args, (), @@ -1005,7 +1005,7 @@ mod tests { .await; assert_matches!( overseer_handle.recv().await, - AllMessages::CandidateSelection(_) + AllMessages::CollatorProtocol(_) ); overseer_handle .send(FromOverseer::Signal(OverseerSignal::ActiveLeaves( @@ -1045,7 +1045,7 @@ mod tests { // the subsystem is still alive assert_matches!( overseer_handle.recv().await, - AllMessages::CandidateSelection(_) + AllMessages::CollatorProtocol(_) ); overseer_handle @@ -1057,11 +1057,11 @@ mod tests { #[test] fn test_subsystem_impl_and_name_derivation() { let pool = sp_core::testing::TaskExecutor::new(); - let (context, _) = make_subsystem_context::(pool.clone()); + let (context, _) = make_subsystem_context::(pool.clone()); let SpawnedSubsystem { name, .. } = - FakeCandidateSelectionSubsystem::new(pool, false, ()).start(context); - assert_eq!(name, "FakeCandidateSelection"); + FakeCollatorProtocolSubsystem::new(pool, false, ()).start(context); + assert_eq!(name, "FakeCollatorProtocol"); } diff --git a/polkadot/node/subsystem/src/messages.rs b/polkadot/node/subsystem/src/messages.rs index cb9a27ea89..0bc8896466 100644 --- a/polkadot/node/subsystem/src/messages.rs +++ b/polkadot/node/subsystem/src/messages.rs @@ -64,37 +64,6 @@ pub trait BoundToRelayParent { fn relay_parent(&self) -> Hash; } -/// Messages received by the Candidate Selection subsystem. -#[derive(Debug)] -pub enum CandidateSelectionMessage { - /// A candidate collation can be fetched from a collator and should be considered for seconding. - Collation(Hash, ParaId, CollatorId), - /// We recommended a particular candidate to be seconded, but it was invalid; penalize the collator. - /// - /// The hash is the relay parent. - Invalid(Hash, CandidateReceipt), - /// The candidate we recommended to be seconded was validated successfully. - /// - /// The hash is the relay parent. - Seconded(Hash, SignedFullStatement), -} - -impl BoundToRelayParent for CandidateSelectionMessage { - fn relay_parent(&self) -> Hash { - match self { - Self::Collation(hash, ..) => *hash, - Self::Invalid(hash, _) => *hash, - Self::Seconded(hash, _) => *hash, - } - } -} - -impl Default for CandidateSelectionMessage { - fn default() -> Self { - CandidateSelectionMessage::Invalid(Default::default(), Default::default()) - } -} - /// Messages received by the Candidate Backing subsystem. #[derive(Debug)] pub enum CandidateBackingMessage { @@ -192,20 +161,34 @@ pub enum CollatorProtocolMessage { /// The result sender should be informed when at least one parachain validator seconded the collation. It is also /// completely okay to just drop the sender. DistributeCollation(CandidateReceipt, PoV, Option>), - /// Fetch a collation under the given relay-parent for the given ParaId. - FetchCollation(Hash, CollatorId, ParaId, oneshot::Sender<(CandidateReceipt, PoV)>), /// Report a collator as having provided an invalid collation. This should lead to disconnect /// and blacklist of the collator. ReportCollator(CollatorId), - /// Note a collator as having provided a good collation. - NoteGoodCollation(CollatorId), - /// Notify a collator that its collation was seconded. - NotifyCollationSeconded(CollatorId, Hash, SignedFullStatement), /// Get a network bridge update. #[from] NetworkBridgeUpdateV1(NetworkBridgeEvent), /// Incoming network request for a collation. - CollationFetchingRequest(IncomingRequest) + CollationFetchingRequest(IncomingRequest), + /// We recommended a particular candidate to be seconded, but it was invalid; penalize the collator. + /// + /// The hash is the relay parent. + Invalid(Hash, CandidateReceipt), + /// The candidate we recommended to be seconded was validated successfully. + /// + /// The hash is the relay parent. + Seconded(Hash, SignedFullStatement), +} + +impl Default for CollatorProtocolMessage { + fn default() -> Self { + Self::CollateOn(Default::default()) + } +} + +impl BoundToRelayParent for CollatorProtocolMessage { + fn relay_parent(&self) -> Hash { + Default::default() + } } /// Messages received by the network bridge subsystem. @@ -688,9 +671,6 @@ pub enum AllMessages { /// Message for the candidate backing subsystem. #[skip] CandidateBacking(CandidateBackingMessage), - /// Message for the candidate selection subsystem. - #[skip] - CandidateSelection(CandidateSelectionMessage), /// Message for the Chain API subsystem. #[skip] ChainApi(ChainApiMessage), diff --git a/polkadot/roadmap/implementers-guide/src/node/backing/candidate-selection.md b/polkadot/roadmap/implementers-guide/src/node/backing/candidate-selection.md deleted file mode 100644 index 4439f10dd1..0000000000 --- a/polkadot/roadmap/implementers-guide/src/node/backing/candidate-selection.md +++ /dev/null @@ -1,40 +0,0 @@ -# Candidate Selection - -The Candidate Selection Subsystem is run by validators, and is responsible for interfacing with Collators to select a candidate, along with its PoV, to second during the backing process relative to a specific relay parent. - -This subsystem includes networking code for communicating with collators, and tracks which collations specific collators have submitted. This subsystem is responsible for disconnecting and blacklisting collators who are found to have submitted invalid collations. Typically an invalid collation will be discovered by a different subsystem. - -This subsystem is only ever interested in parablocks assigned to the particular parachain which this validator is currently handling. - -New parablock candidates may arrive from a potentially unbounded set of collators. This subsystem chooses either 0 or 1 of them per relay parent to second. If it chooses to second a candidate, it sends an appropriate message to the [Candidate Backing subsystem](candidate-backing.md) to generate an appropriate [`Statement`](../../types/backing.md#statement-type). - -In the event that a parablock candidate proves invalid, this subsystem will receive a message back from the Candidate Backing subsystem indicating so. If that parablock candidate originated from a collator, this subsystem will blacklist that collator. If that parablock candidate originated from a peer, this subsystem generates a report for the [Misbehavior Arbitration subsystem](../utility/misbehavior-arbitration.md). - -## Protocol - -Input: [`CandidateSelectionMessage`](../../types/overseer-protocol.md#candidate-selection-message) - -Output: - -- [`CandidateBackingMessage`](../../types/overseer-protocol.md#candidate-backing-message)`::Second` -- Peer set manager: report peers (collators who have misbehaved) - -## Functionality - -Overarching network protocol + job for every relay-parent - -For the moment, the candidate selection algorithm is simply to second the first valid parablock candidate per relay head. See [Future Work](#future-work). - -## Candidate Selection Job - -- Aware of validator key and assignment -- One job for each relay-parent, which selects up to one collation for the Candidate Backing Subsystem - -## Future Work - -Several approaches have been discussed, but all have some issues: - -- The current approach is very straightforward. However, that protocol is vulnerable to a single collator which, as an attack or simply through chance, gets its block candidate to the node more often than its fair share of the time. -- It may be possible to do some BABE-like selection algorithm to choose an "Official" collator for the round, but that is tricky because the collator which produces the PoV does not necessarily actually produce the block. -- We could use relay-chain BABE randomness to generate some delay `D` on the order of 1 second, +- 1 second. The collator would then second the first valid parablock which arrives after `D`, or in case none has arrived by `2*D`, the last valid parablock which has arrived. This makes it very hard for a collator to game the system to always get its block nominated, but it reduces the maximum throughput of the system by introducing delay into an already tight schedule. -- A variation of that scheme would be to randomly choose a number `I`, and have a fixed acceptance window `D` for parablock candidates. At the end of the period `D`, count `C`: the number of parablock candidates received. Second the one with index `I % C`. Its drawback is the same: it must wait the full `D` period before seconding any of its received candidates, reducing throughput. diff --git a/polkadot/roadmap/implementers-guide/src/node/collators/collator-protocol.md b/polkadot/roadmap/implementers-guide/src/node/collators/collator-protocol.md index 052729acec..1afbaeb770 100644 --- a/polkadot/roadmap/implementers-guide/src/node/collators/collator-protocol.md +++ b/polkadot/roadmap/implementers-guide/src/node/collators/collator-protocol.md @@ -10,7 +10,7 @@ Validation of candidates is a heavy task, and furthermore, the [`PoV`][PoV] itse > TODO: note the incremental validation function Ximin proposes at https://github.com/paritytech/polkadot/issues/1348 -As this network protocol serves as a bridge between collators and validators, it communicates primarily with one subsystem on behalf of each. As a collator, this will receive messages from the [`CollationGeneration`][CG] subsystem. As a validator, this will communicate with the [`CandidateBacking`][CB] and [`CandidateSelection`][CS] subsystems. +As this network protocol serves as a bridge between collators and validators, it communicates primarily with one subsystem on behalf of each. As a collator, this will receive messages from the [`CollationGeneration`][CG] subsystem. As a validator, this will communicate only with the [`CandidateBacking`][CB]. ## Protocol @@ -20,7 +20,7 @@ Output: - [`RuntimeApiMessage`][RAM] - [`NetworkBridgeMessage`][NBM] -- [`CandidateSelectionMessage`][CSM] +- [`CandidateBackingMessage`][CBM] ## Functionality @@ -106,16 +106,24 @@ As a validator, we will handle requests from other subsystems to fetch a collati When acting on an advertisement, we issue a `Requests::CollationFetching`. If the request times out, we need to note the collator as being unreliable and reduce its priority relative to other collators. -As a validator, once the collation has been fetched some other subsystem will inspect and do deeper validation of the collation. The subsystem will report to this subsystem with a [`CollatorProtocolMessage`][CPM]`::ReportCollator` or `NoteGoodCollation` message. In that case, if we are connected directly to the collator, we apply a cost to the `PeerId` associated with the collator and potentially disconnect or blacklist it. +As a validator, once the collation has been fetched some other subsystem will inspect and do deeper validation of the collation. The subsystem will report to this subsystem with a [`CollatorProtocolMessage`][CPM]`::ReportCollator`. In that case, if we are connected directly to the collator, we apply a cost to the `PeerId` associated with the collator and potentially disconnect or blacklist it. If the collation is seconded, we notify the collator and apply a benefit to the `PeerId` associated with the collator. -### Interaction with [Candidate Selection][CS] +### Interaction with [Candidate Backing][CB] -As collators advertise the availability, we notify the Candidate Selection subsystem with a [`CandidateSelection`][CSM]`::Collation` message. Note that this message is lightweight: it only contains the relay parent, para id, and collator id. +As collators advertise the availability, a validator will simply second the first valid parablock candidate per relay head by sending a [`CandidateBackingMessage`][CBM]`::Second`. Note that this message contains the relay parent of the advertised collation, the candidate receipt and the [PoV][PoV]. -At that point, the Candidate Selection algorithm is free to use an arbitrary algorithm to determine which if any of these messages to follow up on. It is expected to use the [`CollatorProtocolMessage`][CPM]`::FetchCollation` message to follow up. +Subsequently, once a valid parablock candidate has been seconded, the [`CandidateBacking`][CB] subsystem will send a [`CollatorProtocolMessage`][CPM]`::Seconded`, which will trigger this subsystem to notify the collator at the `PeerId` that first advertised the parablock on the seconded relay head of their successful seconding. -The intent behind this design is to minimize the total number of (large) collations which must be transmitted. +## Future Work + +Several approaches have been discussed, but all have some issues: + +- The current approach is very straightforward. However, that protocol is vulnerable to a single collator which, as an attack or simply through chance, gets its block candidate to the node more often than its fair share of the time. +- If collators produce blocks via Aura, BABE or in future Sassafrass, it may be possible to choose an "Official" collator for the round, but it may be tricky to ensure that the PVF logic is enforced at collator leader election. +- We could use relay-chain BABE randomness to generate some delay `D` on the order of 1 second, +- 1 second. The collator would then second the first valid parablock which arrives after `D`, or in case none has arrived by `2*D`, the last valid parablock which has arrived. This makes it very hard for a collator to game the system to always get its block nominated, but it reduces the maximum throughput of the system by introducing delay into an already tight schedule. +- A variation of that scheme would be to have a fixed acceptance window `D` for parablock candidates and keep track of count `C`: the number of parablock candidates received. At the end of the period `D`, we choose a random number I in the range [0, C) and second the block at Index I. Its drawback is the same: it must wait the full `D` period before seconding any of its received candidates, reducing throughput. +- In order to protect against DoS attacks, it may be prudent to run throw out collations from collators that have behaved poorly (whether recently or historically) and subsequently only verify the PoV for the most suitable of collations. [CB]: ../backing/candidate-backing.md [CBM]: ../../types/overseer-protocol.md#candidate-backing-mesage