mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-05-07 01:28:07 +00:00
Remove candidate selection (#3148)
* Create validator_side module * Subsume Candidate Selection * Add test to ensure candidate backing logic is correct * Ensure secondings are adequately cleaned up and address test flakyness * Address Feedback
This commit is contained in:
Generated
-17
@@ -6055,22 +6055,6 @@ dependencies = [
|
||||
"wasm-timer",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "polkadot-node-core-candidate-selection"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"futures 0.3.14",
|
||||
"polkadot-node-primitives",
|
||||
"polkadot-node-subsystem",
|
||||
"polkadot-node-subsystem-test-helpers",
|
||||
"polkadot-node-subsystem-util",
|
||||
"polkadot-primitives",
|
||||
"sp-core",
|
||||
"sp-keystore",
|
||||
"thiserror",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "polkadot-node-core-candidate-validation"
|
||||
version = "0.1.0"
|
||||
@@ -6667,7 +6651,6 @@ dependencies = [
|
||||
"polkadot-node-core-av-store",
|
||||
"polkadot-node-core-backing",
|
||||
"polkadot-node-core-bitfield-signing",
|
||||
"polkadot-node-core-candidate-selection",
|
||||
"polkadot-node-core-candidate-validation",
|
||||
"polkadot-node-core-chain-api",
|
||||
"polkadot-node-core-parachains-inherent",
|
||||
|
||||
@@ -48,7 +48,6 @@ members = [
|
||||
"node/core/av-store",
|
||||
"node/core/backing",
|
||||
"node/core/bitfield-signing",
|
||||
"node/core/candidate-selection",
|
||||
"node/core/candidate-validation",
|
||||
"node/core/chain-api",
|
||||
"node/core/parachains-inherent",
|
||||
|
||||
@@ -39,7 +39,7 @@ use polkadot_subsystem::{
|
||||
jaeger,
|
||||
messages::{
|
||||
AllMessages, AvailabilityDistributionMessage, AvailabilityStoreMessage,
|
||||
CandidateBackingMessage, CandidateSelectionMessage, CandidateValidationMessage,
|
||||
CandidateBackingMessage, CandidateValidationMessage, CollatorProtocolMessage,
|
||||
ProvisionableData, ProvisionerMessage, RuntimeApiRequest,
|
||||
StatementDistributionMessage, ValidationFailed
|
||||
}
|
||||
@@ -600,14 +600,14 @@ impl CandidateBackingJob {
|
||||
root_span,
|
||||
).await? {
|
||||
sender.send_message(
|
||||
CandidateSelectionMessage::Seconded(self.parent, stmt).into()
|
||||
CollatorProtocolMessage::Seconded(self.parent, stmt).into()
|
||||
).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(candidate) => {
|
||||
sender.send_message(
|
||||
CandidateSelectionMessage::Invalid(self.parent, candidate).into()
|
||||
CollatorProtocolMessage::Invalid(self.parent, candidate).into()
|
||||
).await;
|
||||
}
|
||||
}
|
||||
@@ -685,7 +685,7 @@ impl CandidateBackingJob {
|
||||
.map_or(false, |c| c != &candidate.descriptor().collator)
|
||||
{
|
||||
sender.send_message(
|
||||
CandidateSelectionMessage::Invalid(self.parent, candidate.clone()).into()
|
||||
CollatorProtocolMessage::Invalid(self.parent, candidate.clone()).into()
|
||||
).await;
|
||||
return Ok(());
|
||||
}
|
||||
@@ -1332,7 +1332,7 @@ mod tests {
|
||||
use futures::{future, Future};
|
||||
use polkadot_primitives::v1::{GroupRotationInfo, HeadData, PersistedValidationData, ScheduledCore};
|
||||
use polkadot_subsystem::{
|
||||
messages::{RuntimeApiRequest, RuntimeApiMessage},
|
||||
messages::{RuntimeApiRequest, RuntimeApiMessage, CollatorProtocolMessage},
|
||||
ActiveLeavesUpdate, FromOverseer, OverseerSignal, ActivatedLeaf, LeafStatus,
|
||||
};
|
||||
use polkadot_node_primitives::{InvalidCandidate, BlockData};
|
||||
@@ -1648,7 +1648,7 @@ mod tests {
|
||||
|
||||
assert_matches!(
|
||||
virtual_overseer.recv().await,
|
||||
AllMessages::CandidateSelection(CandidateSelectionMessage::Seconded(hash, statement)) => {
|
||||
AllMessages::CollatorProtocol(CollatorProtocolMessage::Seconded(hash, statement)) => {
|
||||
assert_eq!(test_state.relay_parent, hash);
|
||||
assert_matches!(statement.payload(), Statement::Seconded(_));
|
||||
}
|
||||
@@ -2172,8 +2172,8 @@ mod tests {
|
||||
|
||||
assert_matches!(
|
||||
virtual_overseer.recv().await,
|
||||
AllMessages::CandidateSelection(
|
||||
CandidateSelectionMessage::Invalid(parent_hash, c)
|
||||
AllMessages::CollatorProtocol(
|
||||
CollatorProtocolMessage::Invalid(parent_hash, c)
|
||||
) if parent_hash == test_state.relay_parent && c == candidate_a.to_plain()
|
||||
);
|
||||
|
||||
@@ -2482,8 +2482,8 @@ mod tests {
|
||||
|
||||
assert_matches!(
|
||||
virtual_overseer.recv().await,
|
||||
AllMessages::CandidateSelection(
|
||||
CandidateSelectionMessage::Invalid(parent, c)
|
||||
AllMessages::CollatorProtocol(
|
||||
CollatorProtocolMessage::Invalid(parent, c)
|
||||
) if parent == test_state.relay_parent && c == candidate.to_plain() => {
|
||||
}
|
||||
);
|
||||
|
||||
@@ -1,21 +0,0 @@
|
||||
[package]
|
||||
name = "polkadot-node-core-candidate-selection"
|
||||
version = "0.1.0"
|
||||
authors = ["Parity Technologies <admin@parity.io>"]
|
||||
edition = "2018"
|
||||
|
||||
[dependencies]
|
||||
futures = "0.3.12"
|
||||
tracing = "0.1.26"
|
||||
thiserror = "1.0.23"
|
||||
|
||||
sp-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
||||
|
||||
polkadot-primitives = { path = "../../../primitives" }
|
||||
polkadot-node-subsystem = { path = "../../subsystem" }
|
||||
polkadot-node-primitives = { path = "../../primitives" }
|
||||
polkadot-node-subsystem-util = { path = "../../subsystem-util" }
|
||||
|
||||
[dev-dependencies]
|
||||
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
||||
polkadot-node-subsystem-test-helpers = { path = "../../subsystem-test-helpers" }
|
||||
@@ -1,715 +0,0 @@
|
||||
// Copyright 2020 Parity Technologies (UK) Ltd.
|
||||
// This file is part of Polkadot.
|
||||
|
||||
// Polkadot is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// Polkadot is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
//! The provisioner is responsible for assembling a relay chain block
|
||||
//! from a set of available parachain candidates of its choice.
|
||||
|
||||
#![deny(missing_docs, unused_crate_dependencies, unused_results)]
|
||||
|
||||
use futures::{
|
||||
channel::{mpsc, oneshot},
|
||||
prelude::*,
|
||||
};
|
||||
use sp_keystore::SyncCryptoStorePtr;
|
||||
use polkadot_node_subsystem::{
|
||||
jaeger, PerLeafSpan, SubsystemSender,
|
||||
errors::ChainApiError,
|
||||
messages::{
|
||||
CandidateBackingMessage, CandidateSelectionMessage, CollatorProtocolMessage,
|
||||
RuntimeApiRequest,
|
||||
},
|
||||
};
|
||||
use polkadot_node_subsystem_util::{
|
||||
self as util, request_from_runtime, request_validator_groups, JobSubsystem,
|
||||
JobTrait, JobSender, Validator, metrics::{self, prometheus},
|
||||
};
|
||||
use polkadot_primitives::v1::{
|
||||
CandidateReceipt, CollatorId, CoreState, CoreIndex, Hash, Id as ParaId, BlockNumber,
|
||||
};
|
||||
use polkadot_node_primitives::{SignedFullStatement, PoV};
|
||||
use std::{pin::Pin, sync::Arc};
|
||||
use thiserror::Error;
|
||||
|
||||
const LOG_TARGET: &'static str = "parachain::candidate-selection";
|
||||
|
||||
/// A per-block job in the candidate selection subsystem.
|
||||
pub struct CandidateSelectionJob {
|
||||
assignment: ParaId,
|
||||
receiver: mpsc::Receiver<CandidateSelectionMessage>,
|
||||
metrics: Metrics,
|
||||
seconded_candidate: Option<CollatorId>,
|
||||
}
|
||||
|
||||
/// Errors in the candidate selection subsystem.
|
||||
#[derive(Debug, Error)]
|
||||
pub enum Error {
|
||||
/// An error in utilities.
|
||||
#[error(transparent)]
|
||||
Util(#[from] util::Error),
|
||||
/// An error receiving on a oneshot channel.
|
||||
#[error(transparent)]
|
||||
OneshotRecv(#[from] oneshot::Canceled),
|
||||
/// An error interacting with the chain API.
|
||||
#[error(transparent)]
|
||||
ChainApi(#[from] ChainApiError),
|
||||
}
|
||||
|
||||
macro_rules! try_runtime_api {
|
||||
($x: expr) => {
|
||||
match $x {
|
||||
Ok(x) => x,
|
||||
Err(e) => {
|
||||
tracing::warn!(
|
||||
target: LOG_TARGET,
|
||||
err = ?e,
|
||||
"Failed to fetch runtime API data for job",
|
||||
);
|
||||
|
||||
// We can't do candidate selection work if we don't have the
|
||||
// requisite runtime API data. But these errors should not take
|
||||
// down the node.
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl JobTrait for CandidateSelectionJob {
|
||||
type ToJob = CandidateSelectionMessage;
|
||||
type Error = Error;
|
||||
type RunArgs = SyncCryptoStorePtr;
|
||||
type Metrics = Metrics;
|
||||
|
||||
const NAME: &'static str = "CandidateSelectionJob";
|
||||
|
||||
#[tracing::instrument(skip(keystore, metrics, receiver, sender), fields(subsystem = LOG_TARGET))]
|
||||
fn run<S: SubsystemSender>(
|
||||
relay_parent: Hash,
|
||||
span: Arc<jaeger::Span>,
|
||||
keystore: Self::RunArgs,
|
||||
metrics: Self::Metrics,
|
||||
receiver: mpsc::Receiver<CandidateSelectionMessage>,
|
||||
mut sender: JobSender<S>,
|
||||
) -> Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send>> {
|
||||
let span = PerLeafSpan::new(span, "candidate-selection");
|
||||
async move {
|
||||
let _span = span.child("query-runtime")
|
||||
.with_relay_parent(relay_parent)
|
||||
.with_stage(jaeger::Stage::CandidateSelection);
|
||||
let (groups, cores) = futures::try_join!(
|
||||
request_validator_groups(relay_parent, &mut sender).await,
|
||||
request_from_runtime(
|
||||
relay_parent,
|
||||
&mut sender,
|
||||
|tx| RuntimeApiRequest::AvailabilityCores(tx),
|
||||
).await,
|
||||
)?;
|
||||
|
||||
let (validator_groups, group_rotation_info) = try_runtime_api!(groups);
|
||||
let cores = try_runtime_api!(cores);
|
||||
|
||||
drop(_span);
|
||||
let _span = span.child("validator-construction")
|
||||
.with_relay_parent(relay_parent)
|
||||
.with_stage(jaeger::Stage::CandidateSelection);
|
||||
|
||||
let n_cores = cores.len();
|
||||
|
||||
let validator = match Validator::new(relay_parent, keystore.clone(), &mut sender).await {
|
||||
Ok(validator) => validator,
|
||||
Err(util::Error::NotAValidator) => return Ok(()),
|
||||
Err(err) => return Err(Error::Util(err)),
|
||||
};
|
||||
|
||||
let assignment_span = span.child("find-assignment")
|
||||
.with_relay_parent(relay_parent)
|
||||
.with_stage(jaeger::Stage::CandidateSelection);
|
||||
|
||||
#[derive(Debug)]
|
||||
enum AssignmentState {
|
||||
Unassigned,
|
||||
Scheduled(ParaId),
|
||||
Occupied(BlockNumber),
|
||||
Free,
|
||||
}
|
||||
|
||||
let mut assignment = AssignmentState::Unassigned;
|
||||
|
||||
for (idx, core) in cores.into_iter().enumerate() {
|
||||
let core_index = CoreIndex(idx as _);
|
||||
let group_index = group_rotation_info.group_for_core(core_index, n_cores);
|
||||
if let Some(g) = validator_groups.get(group_index.0 as usize) {
|
||||
if g.contains(&validator.index()) {
|
||||
match core {
|
||||
CoreState::Scheduled(scheduled) => {
|
||||
assignment = AssignmentState::Scheduled(scheduled.para_id);
|
||||
}
|
||||
CoreState::Occupied(occupied) => {
|
||||
// Ignore prospective assignments on occupied cores
|
||||
// for the time being.
|
||||
assignment = AssignmentState::Occupied(occupied.occupied_since);
|
||||
}
|
||||
CoreState::Free => {
|
||||
assignment = AssignmentState::Free;
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let (assignment, assignment_span) = match assignment {
|
||||
AssignmentState::Scheduled(assignment) => {
|
||||
let assignment_span = assignment_span
|
||||
.with_string_tag("assigned", "true")
|
||||
.with_para_id(assignment);
|
||||
|
||||
(assignment, assignment_span)
|
||||
}
|
||||
assignment => {
|
||||
let _assignment_span = assignment_span.with_string_tag("assigned", "false");
|
||||
|
||||
let validator_index = validator.index();
|
||||
let validator_id = validator.id();
|
||||
|
||||
tracing::debug!(
|
||||
target: LOG_TARGET,
|
||||
?relay_parent,
|
||||
?validator_index,
|
||||
?validator_id,
|
||||
?assignment,
|
||||
"No assignment. Will not select candidate."
|
||||
);
|
||||
|
||||
return Ok(())
|
||||
}
|
||||
};
|
||||
|
||||
drop(assignment_span);
|
||||
|
||||
CandidateSelectionJob::new(assignment, metrics, receiver)
|
||||
.run_loop(&span, sender.subsystem_sender())
|
||||
.await
|
||||
}.boxed()
|
||||
}
|
||||
}
|
||||
|
||||
impl CandidateSelectionJob {
|
||||
fn new(
|
||||
assignment: ParaId,
|
||||
metrics: Metrics,
|
||||
receiver: mpsc::Receiver<CandidateSelectionMessage>,
|
||||
) -> Self {
|
||||
Self {
|
||||
receiver,
|
||||
metrics,
|
||||
assignment,
|
||||
seconded_candidate: None,
|
||||
}
|
||||
}
|
||||
|
||||
async fn run_loop(
|
||||
&mut self,
|
||||
span: &jaeger::Span,
|
||||
sender: &mut impl SubsystemSender,
|
||||
) -> Result<(), Error> {
|
||||
let span = span.child("run-loop")
|
||||
.with_stage(jaeger::Stage::CandidateSelection);
|
||||
|
||||
loop {
|
||||
match self.receiver.next().await {
|
||||
Some(CandidateSelectionMessage::Collation(
|
||||
relay_parent,
|
||||
para_id,
|
||||
collator_id,
|
||||
)) => {
|
||||
let _span = span.child("handle-collation");
|
||||
self.handle_collation(sender, relay_parent, para_id, collator_id).await;
|
||||
}
|
||||
Some(CandidateSelectionMessage::Invalid(
|
||||
_relay_parent,
|
||||
candidate_receipt,
|
||||
)) => {
|
||||
let _span = span.child("handle-invalid")
|
||||
.with_stage(jaeger::Stage::CandidateSelection)
|
||||
.with_candidate(candidate_receipt.hash())
|
||||
.with_relay_parent(_relay_parent);
|
||||
self.handle_invalid(sender, candidate_receipt).await;
|
||||
}
|
||||
Some(CandidateSelectionMessage::Seconded(relay_parent, statement)) => {
|
||||
let _span = span.child("handle-seconded")
|
||||
.with_stage(jaeger::Stage::CandidateSelection)
|
||||
.with_candidate(statement.payload().candidate_hash())
|
||||
.with_relay_parent(relay_parent);
|
||||
self.handle_seconded(sender, relay_parent, statement).await;
|
||||
}
|
||||
None => break,
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "trace", skip(self, sender), fields(subsystem = LOG_TARGET))]
|
||||
async fn handle_collation(
|
||||
&mut self,
|
||||
sender: &mut impl SubsystemSender,
|
||||
relay_parent: Hash,
|
||||
para_id: ParaId,
|
||||
collator_id: CollatorId,
|
||||
) {
|
||||
let _timer = self.metrics.time_handle_collation();
|
||||
|
||||
if self.assignment != para_id {
|
||||
tracing::info!(
|
||||
target: LOG_TARGET,
|
||||
"Collator {:?} sent a collation outside of our assignment {:?}",
|
||||
collator_id,
|
||||
para_id,
|
||||
);
|
||||
forward_invalidity_note(&collator_id, sender).await;
|
||||
return;
|
||||
}
|
||||
|
||||
if self.seconded_candidate.is_none() {
|
||||
let (candidate_receipt, pov) =
|
||||
match get_collation(
|
||||
relay_parent,
|
||||
para_id,
|
||||
collator_id.clone(),
|
||||
sender,
|
||||
).await {
|
||||
Ok(response) => response,
|
||||
Err(err) => {
|
||||
tracing::debug!(
|
||||
target: LOG_TARGET,
|
||||
err = ?err,
|
||||
"failed to get collation from collator protocol subsystem",
|
||||
);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
second_candidate(
|
||||
relay_parent,
|
||||
candidate_receipt,
|
||||
pov,
|
||||
sender,
|
||||
&self.metrics,
|
||||
).await;
|
||||
self.seconded_candidate = Some(collator_id);
|
||||
}
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "trace", skip(self, sender), fields(subsystem = LOG_TARGET))]
|
||||
async fn handle_invalid(
|
||||
&mut self,
|
||||
sender: &mut impl SubsystemSender,
|
||||
candidate_receipt: CandidateReceipt,
|
||||
) {
|
||||
let _timer = self.metrics.time_handle_invalid();
|
||||
|
||||
let received_from = match &self.seconded_candidate {
|
||||
Some(peer) => peer,
|
||||
None => {
|
||||
tracing::warn!(
|
||||
target: LOG_TARGET,
|
||||
"received invalidity notice for a candidate we don't remember seconding"
|
||||
);
|
||||
return;
|
||||
}
|
||||
};
|
||||
tracing::info!(
|
||||
target: LOG_TARGET,
|
||||
candidate_receipt = ?candidate_receipt,
|
||||
"received invalidity note for candidate",
|
||||
);
|
||||
|
||||
forward_invalidity_note(received_from, sender).await;
|
||||
self.metrics.on_invalid_selection();
|
||||
}
|
||||
|
||||
async fn handle_seconded(
|
||||
&mut self,
|
||||
sender: &mut impl SubsystemSender,
|
||||
relay_parent: Hash,
|
||||
statement: SignedFullStatement,
|
||||
) {
|
||||
let received_from = match &self.seconded_candidate {
|
||||
Some(peer) => peer,
|
||||
None => {
|
||||
tracing::warn!(
|
||||
target: LOG_TARGET,
|
||||
"received seconded notice for a candidate we don't remember seconding"
|
||||
);
|
||||
return;
|
||||
}
|
||||
};
|
||||
tracing::debug!(
|
||||
target: LOG_TARGET,
|
||||
statement = ?statement,
|
||||
"received seconded note for candidate",
|
||||
);
|
||||
|
||||
sender
|
||||
.send_message(CollatorProtocolMessage::NoteGoodCollation(received_from.clone()).into())
|
||||
.await;
|
||||
|
||||
sender.send_message(
|
||||
CollatorProtocolMessage::NotifyCollationSeconded(
|
||||
received_from.clone(),
|
||||
relay_parent,
|
||||
statement
|
||||
).into()
|
||||
).await;
|
||||
}
|
||||
}
|
||||
|
||||
// get a collation from the Collator Protocol subsystem
|
||||
//
|
||||
// note that this gets an owned clone of the sender; that's becuase unlike `forward_invalidity_note`, it's expected to take a while longer
|
||||
#[tracing::instrument(level = "trace", skip(sender), fields(subsystem = LOG_TARGET))]
|
||||
async fn get_collation(
|
||||
relay_parent: Hash,
|
||||
para_id: ParaId,
|
||||
collator_id: CollatorId,
|
||||
sender: &mut impl SubsystemSender,
|
||||
) -> Result<(CandidateReceipt, PoV), Error> {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
sender
|
||||
.send_message(CollatorProtocolMessage::FetchCollation(
|
||||
relay_parent,
|
||||
collator_id,
|
||||
para_id,
|
||||
tx,
|
||||
).into())
|
||||
.await;
|
||||
|
||||
rx.await.map_err(Into::into)
|
||||
}
|
||||
|
||||
async fn second_candidate(
|
||||
relay_parent: Hash,
|
||||
candidate_receipt: CandidateReceipt,
|
||||
pov: PoV,
|
||||
sender: &mut impl SubsystemSender,
|
||||
metrics: &Metrics,
|
||||
) {
|
||||
sender
|
||||
.send_message(CandidateBackingMessage::Second(
|
||||
relay_parent,
|
||||
candidate_receipt,
|
||||
pov,
|
||||
).into())
|
||||
.await;
|
||||
|
||||
metrics.on_second();
|
||||
}
|
||||
|
||||
async fn forward_invalidity_note(
|
||||
received_from: &CollatorId,
|
||||
sender: &mut impl SubsystemSender,
|
||||
) {
|
||||
sender
|
||||
.send_message(CollatorProtocolMessage::ReportCollator(received_from.clone()).into())
|
||||
.await
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct MetricsInner {
|
||||
seconds: prometheus::Counter<prometheus::U64>,
|
||||
invalid_selections: prometheus::Counter<prometheus::U64>,
|
||||
handle_collation: prometheus::Histogram,
|
||||
handle_invalid: prometheus::Histogram,
|
||||
}
|
||||
|
||||
/// Candidate selection metrics.
|
||||
#[derive(Default, Clone)]
|
||||
pub struct Metrics(Option<MetricsInner>);
|
||||
|
||||
impl Metrics {
|
||||
fn on_second(&self) {
|
||||
if let Some(metrics) = &self.0 {
|
||||
metrics.seconds.inc();
|
||||
}
|
||||
}
|
||||
|
||||
fn on_invalid_selection(&self) {
|
||||
if let Some(metrics) = &self.0 {
|
||||
metrics.invalid_selections.inc();
|
||||
}
|
||||
}
|
||||
|
||||
/// Provide a timer for `handle_collation` which observes on drop.
|
||||
fn time_handle_collation(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
|
||||
self.0.as_ref().map(|metrics| metrics.handle_collation.start_timer())
|
||||
}
|
||||
|
||||
/// Provide a timer for `handle_invalid` which observes on drop.
|
||||
fn time_handle_invalid(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
|
||||
self.0.as_ref().map(|metrics| metrics.handle_invalid.start_timer())
|
||||
}
|
||||
}
|
||||
|
||||
impl metrics::Metrics for Metrics {
|
||||
fn try_register(registry: &prometheus::Registry) -> Result<Self, prometheus::PrometheusError> {
|
||||
let metrics = MetricsInner {
|
||||
seconds: prometheus::register(
|
||||
prometheus::Counter::with_opts(
|
||||
prometheus::Opts::new(
|
||||
"candidate_selection_seconds_total",
|
||||
"Number of Candidate Selection subsystem seconding events.",
|
||||
),
|
||||
)?,
|
||||
registry,
|
||||
)?,
|
||||
invalid_selections: prometheus::register(
|
||||
prometheus::Counter::with_opts(
|
||||
prometheus::Opts::new(
|
||||
"candidate_selection_invalid_selections_total",
|
||||
"Number of Candidate Selection subsystem seconding selections which proved to be invalid.",
|
||||
),
|
||||
)?,
|
||||
registry,
|
||||
)?,
|
||||
handle_collation: prometheus::register(
|
||||
prometheus::Histogram::with_opts(
|
||||
prometheus::HistogramOpts::new(
|
||||
"parachain_candidate_selection_handle_collation",
|
||||
"Time spent within `candidate_selection::handle_collation`",
|
||||
)
|
||||
)?,
|
||||
registry,
|
||||
)?,
|
||||
handle_invalid: prometheus::register(
|
||||
prometheus::Histogram::with_opts(
|
||||
prometheus::HistogramOpts::new(
|
||||
"parachain_candidate_selection:handle_invalid",
|
||||
"Time spent within `candidate_selection::handle_invalid`",
|
||||
)
|
||||
)?,
|
||||
registry,
|
||||
)?,
|
||||
};
|
||||
Ok(Metrics(Some(metrics)))
|
||||
}
|
||||
}
|
||||
|
||||
/// The candidate selection subsystem.
|
||||
pub type CandidateSelectionSubsystem<Spawner> = JobSubsystem<CandidateSelectionJob, Spawner>;
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use futures::lock::Mutex;
|
||||
use polkadot_node_primitives::BlockData;
|
||||
use polkadot_node_subsystem::messages::AllMessages;
|
||||
use sp_core::crypto::Public;
|
||||
use std::sync::Arc;
|
||||
|
||||
fn test_harness<Preconditions, TestBuilder, Test, Postconditions>(
|
||||
preconditions: Preconditions,
|
||||
test: TestBuilder,
|
||||
postconditions: Postconditions,
|
||||
) where
|
||||
Preconditions: FnOnce(&mut CandidateSelectionJob),
|
||||
TestBuilder: FnOnce(mpsc::Sender<CandidateSelectionMessage>, mpsc::UnboundedReceiver<AllMessages>) -> Test,
|
||||
Test: Future<Output = ()>,
|
||||
Postconditions: FnOnce(CandidateSelectionJob, Result<(), Error>),
|
||||
{
|
||||
let (to_job_tx, to_job_rx) = mpsc::channel(0);
|
||||
let (mut from_job_tx, from_job_rx) = polkadot_node_subsystem_test_helpers::sender_receiver();
|
||||
let mut job = CandidateSelectionJob {
|
||||
assignment: 123.into(),
|
||||
receiver: to_job_rx,
|
||||
metrics: Default::default(),
|
||||
seconded_candidate: None,
|
||||
};
|
||||
|
||||
preconditions(&mut job);
|
||||
let span = jaeger::Span::Disabled;
|
||||
let (_, (job, job_result)) = futures::executor::block_on(future::join(
|
||||
test(to_job_tx, from_job_rx),
|
||||
async move {
|
||||
let res = job.run_loop(&span, &mut from_job_tx).await;
|
||||
drop(from_job_tx);
|
||||
(job, res)
|
||||
},
|
||||
));
|
||||
|
||||
postconditions(job, job_result);
|
||||
}
|
||||
|
||||
/// when nothing is seconded so far, the collation is fetched and seconded
|
||||
#[test]
|
||||
fn fetches_and_seconds_a_collation() {
|
||||
let relay_parent = Hash::random();
|
||||
let para_id: ParaId = 123.into();
|
||||
let collator_id = CollatorId::from_slice(&(0..32).collect::<Vec<u8>>());
|
||||
let collator_id_clone = collator_id.clone();
|
||||
|
||||
let candidate_receipt = CandidateReceipt::default();
|
||||
let pov = PoV {
|
||||
block_data: BlockData((0..32).cycle().take(256).collect()),
|
||||
};
|
||||
|
||||
let was_seconded = Arc::new(Mutex::new(false));
|
||||
let was_seconded_clone = was_seconded.clone();
|
||||
|
||||
test_harness(
|
||||
|_job| {},
|
||||
|mut to_job, mut from_job| async move {
|
||||
to_job
|
||||
.send(CandidateSelectionMessage::Collation(
|
||||
relay_parent,
|
||||
para_id,
|
||||
collator_id_clone.clone(),
|
||||
))
|
||||
.await
|
||||
.unwrap();
|
||||
std::mem::drop(to_job);
|
||||
|
||||
while let Some(msg) = from_job.next().await {
|
||||
match msg {
|
||||
AllMessages::CollatorProtocol(CollatorProtocolMessage::FetchCollation(
|
||||
got_relay_parent,
|
||||
collator_id,
|
||||
got_para_id,
|
||||
return_sender,
|
||||
)) => {
|
||||
assert_eq!(got_relay_parent, relay_parent);
|
||||
assert_eq!(got_para_id, para_id);
|
||||
assert_eq!(collator_id, collator_id_clone);
|
||||
|
||||
return_sender
|
||||
.send((candidate_receipt.clone(), pov.clone()))
|
||||
.unwrap();
|
||||
}
|
||||
AllMessages::CandidateBacking(CandidateBackingMessage::Second(
|
||||
got_relay_parent,
|
||||
got_candidate_receipt,
|
||||
got_pov,
|
||||
)) => {
|
||||
assert_eq!(got_relay_parent, relay_parent);
|
||||
assert_eq!(got_candidate_receipt, candidate_receipt);
|
||||
assert_eq!(got_pov, pov);
|
||||
|
||||
*was_seconded_clone.lock().await = true;
|
||||
}
|
||||
other => panic!("unexpected message from job: {:?}", other),
|
||||
}
|
||||
}
|
||||
},
|
||||
|job, job_result| {
|
||||
assert!(job_result.is_ok());
|
||||
assert_eq!(job.seconded_candidate.unwrap(), collator_id);
|
||||
},
|
||||
);
|
||||
|
||||
assert!(Arc::try_unwrap(was_seconded).unwrap().into_inner());
|
||||
}
|
||||
|
||||
/// when something has been seconded, further collation notifications are ignored
|
||||
#[test]
|
||||
fn ignores_collation_notifications_after_the_first() {
|
||||
let relay_parent = Hash::random();
|
||||
let para_id: ParaId = 123.into();
|
||||
let prev_collator_id = CollatorId::from_slice(&(0..32).rev().collect::<Vec<u8>>());
|
||||
let collator_id = CollatorId::from_slice(&(0..32).collect::<Vec<u8>>());
|
||||
let collator_id_clone = collator_id.clone();
|
||||
|
||||
let was_seconded = Arc::new(Mutex::new(false));
|
||||
let was_seconded_clone = was_seconded.clone();
|
||||
|
||||
test_harness(
|
||||
|job| job.seconded_candidate = Some(prev_collator_id.clone()),
|
||||
|mut to_job, mut from_job| async move {
|
||||
to_job
|
||||
.send(CandidateSelectionMessage::Collation(
|
||||
relay_parent,
|
||||
para_id,
|
||||
collator_id_clone,
|
||||
))
|
||||
.await
|
||||
.unwrap();
|
||||
std::mem::drop(to_job);
|
||||
|
||||
while let Some(msg) = from_job.next().await {
|
||||
match msg {
|
||||
AllMessages::CandidateBacking(CandidateBackingMessage::Second(
|
||||
_got_relay_parent,
|
||||
_got_candidate_receipt,
|
||||
_got_pov,
|
||||
)) => {
|
||||
*was_seconded_clone.lock().await = true;
|
||||
}
|
||||
other => panic!("unexpected message from job: {:?}", other),
|
||||
}
|
||||
}
|
||||
},
|
||||
|job, job_result| {
|
||||
assert!(job_result.is_ok());
|
||||
assert_eq!(job.seconded_candidate.unwrap(), prev_collator_id);
|
||||
},
|
||||
);
|
||||
|
||||
assert!(!Arc::try_unwrap(was_seconded).unwrap().into_inner());
|
||||
}
|
||||
|
||||
/// reports of invalidity from candidate backing are propagated
|
||||
#[test]
|
||||
fn propagates_invalidity_reports() {
|
||||
let relay_parent = Hash::random();
|
||||
let collator_id = CollatorId::from_slice(&(0..32).collect::<Vec<u8>>());
|
||||
let collator_id_clone = collator_id.clone();
|
||||
|
||||
let candidate_receipt = CandidateReceipt::default();
|
||||
|
||||
let sent_report = Arc::new(Mutex::new(false));
|
||||
let sent_report_clone = sent_report.clone();
|
||||
|
||||
test_harness(
|
||||
|job| job.seconded_candidate = Some(collator_id.clone()),
|
||||
|mut to_job, mut from_job| async move {
|
||||
to_job
|
||||
.send(CandidateSelectionMessage::Invalid(relay_parent, candidate_receipt))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
std::mem::drop(to_job);
|
||||
|
||||
while let Some(msg) = from_job.next().await {
|
||||
match msg {
|
||||
AllMessages::CollatorProtocol(CollatorProtocolMessage::ReportCollator(
|
||||
got_collator_id,
|
||||
)) => {
|
||||
assert_eq!(got_collator_id, collator_id_clone);
|
||||
|
||||
*sent_report_clone.lock().await = true;
|
||||
}
|
||||
other => panic!("unexpected message from job: {:?}", other),
|
||||
}
|
||||
}
|
||||
},
|
||||
|job, job_result| {
|
||||
assert!(job_result.is_ok());
|
||||
assert_eq!(job.seconded_candidate.unwrap(), collator_id);
|
||||
},
|
||||
);
|
||||
|
||||
assert!(Arc::try_unwrap(sent_report).unwrap().into_inner());
|
||||
}
|
||||
}
|
||||
@@ -2365,7 +2365,6 @@ mod tests {
|
||||
match msg {
|
||||
AllMessages::CandidateValidation(_) => unreachable!("Not interested in network events"),
|
||||
AllMessages::CandidateBacking(_) => unreachable!("Not interested in network events"),
|
||||
AllMessages::CandidateSelection(_) => unreachable!("Not interested in network events"),
|
||||
AllMessages::ChainApi(_) => unreachable!("Not interested in network events"),
|
||||
AllMessages::CollatorProtocol(_) => unreachable!("Not interested in network events"),
|
||||
AllMessages::StatementDistribution(_) => { cnt += 1; }
|
||||
|
||||
@@ -570,30 +570,12 @@ async fn process_msg(
|
||||
}
|
||||
}
|
||||
}
|
||||
FetchCollation(_, _, _, _) => {
|
||||
tracing::warn!(
|
||||
target: LOG_TARGET,
|
||||
"FetchCollation message is not expected on the collator side of the protocol",
|
||||
);
|
||||
}
|
||||
ReportCollator(_) => {
|
||||
tracing::warn!(
|
||||
target: LOG_TARGET,
|
||||
"ReportCollator message is not expected on the collator side of the protocol",
|
||||
);
|
||||
}
|
||||
NoteGoodCollation(_) => {
|
||||
tracing::warn!(
|
||||
target: LOG_TARGET,
|
||||
"NoteGoodCollation message is not expected on the collator side of the protocol",
|
||||
);
|
||||
}
|
||||
NotifyCollationSeconded(_, _, _) => {
|
||||
tracing::warn!(
|
||||
target: LOG_TARGET,
|
||||
"NotifyCollationSeconded message is not expected on the collator side of the protocol",
|
||||
);
|
||||
}
|
||||
NetworkBridgeUpdateV1(event) => {
|
||||
if let Err(e) = handle_network_msg(
|
||||
ctx,
|
||||
@@ -646,6 +628,7 @@ async fn process_msg(
|
||||
}
|
||||
}
|
||||
}
|
||||
_ => {},
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
||||
+495
-271
File diff suppressed because it is too large
Load Diff
@@ -294,8 +294,9 @@ pub mod v1 {
|
||||
use polkadot_primitives::v1::{
|
||||
CandidateHash, CandidateIndex, CollatorId, CollatorSignature,
|
||||
CompactStatement, Hash, Id as ParaId, UncheckedSignedAvailabilityBitfield,
|
||||
ValidatorIndex, ValidatorSignature
|
||||
ValidatorIndex, ValidatorSignature,
|
||||
};
|
||||
|
||||
use polkadot_node_primitives::{
|
||||
approval::{IndirectAssignmentCert, IndirectSignedApprovalVote},
|
||||
UncheckedSignedFullStatement,
|
||||
|
||||
@@ -82,7 +82,7 @@ use sp_api::{ApiExt, ProvideRuntimeApi};
|
||||
|
||||
use polkadot_subsystem::messages::{
|
||||
CandidateValidationMessage, CandidateBackingMessage,
|
||||
CandidateSelectionMessage, ChainApiMessage, StatementDistributionMessage,
|
||||
ChainApiMessage, StatementDistributionMessage,
|
||||
AvailabilityDistributionMessage, BitfieldSigningMessage, BitfieldDistributionMessage,
|
||||
ProvisionerMessage, RuntimeApiMessage,
|
||||
AvailabilityStoreMessage, NetworkBridgeMessage, AllMessages, CollationGenerationMessage,
|
||||
@@ -148,7 +148,7 @@ impl<Client> HeadSupportsParachains for Arc<Client> where
|
||||
/// subsystems are implemented and the rest can be mocked with the [`DummySubsystem`].
|
||||
#[derive(Debug, Clone, AllSubsystemsGen)]
|
||||
pub struct AllSubsystems<
|
||||
CV = (), CB = (), CS = (), SD = (), AD = (), AR = (), BS = (), BD = (), P = (),
|
||||
CV = (), CB = (), SD = (), AD = (), AR = (), BS = (), BD = (), P = (),
|
||||
RA = (), AS = (), NB = (), CA = (), CG = (), CP = (), ApD = (), ApV = (),
|
||||
GS = (),
|
||||
> {
|
||||
@@ -156,8 +156,6 @@ pub struct AllSubsystems<
|
||||
pub candidate_validation: CV,
|
||||
/// A candidate backing subsystem.
|
||||
pub candidate_backing: CB,
|
||||
/// A candidate selection subsystem.
|
||||
pub candidate_selection: CS,
|
||||
/// A statement distribution subsystem.
|
||||
pub statement_distribution: SD,
|
||||
/// An availability distribution subsystem.
|
||||
@@ -190,8 +188,8 @@ pub struct AllSubsystems<
|
||||
pub gossip_support: GS,
|
||||
}
|
||||
|
||||
impl<CV, CB, CS, SD, AD, AR, BS, BD, P, RA, AS, NB, CA, CG, CP, ApD, ApV, GS>
|
||||
AllSubsystems<CV, CB, CS, SD, AD, AR, BS, BD, P, RA, AS, NB, CA, CG, CP, ApD, ApV, GS>
|
||||
impl<CV, CB, SD, AD, AR, BS, BD, P, RA, AS, NB, CA, CG, CP, ApD, ApV, GS>
|
||||
AllSubsystems<CV, CB, SD, AD, AR, BS, BD, P, RA, AS, NB, CA, CG, CP, ApD, ApV, GS>
|
||||
{
|
||||
/// Create a new instance of [`AllSubsystems`].
|
||||
///
|
||||
@@ -223,12 +221,10 @@ impl<CV, CB, CS, SD, AD, AR, BS, BD, P, RA, AS, NB, CA, CG, CP, ApD, ApV, GS>
|
||||
DummySubsystem,
|
||||
DummySubsystem,
|
||||
DummySubsystem,
|
||||
DummySubsystem,
|
||||
> {
|
||||
AllSubsystems {
|
||||
candidate_validation: DummySubsystem,
|
||||
candidate_backing: DummySubsystem,
|
||||
candidate_selection: DummySubsystem,
|
||||
statement_distribution: DummySubsystem,
|
||||
availability_distribution: DummySubsystem,
|
||||
availability_recovery: DummySubsystem,
|
||||
@@ -247,11 +243,10 @@ impl<CV, CB, CS, SD, AD, AR, BS, BD, P, RA, AS, NB, CA, CG, CP, ApD, ApV, GS>
|
||||
}
|
||||
}
|
||||
|
||||
fn as_ref(&self) -> AllSubsystems<&'_ CV, &'_ CB, &'_ CS, &'_ SD, &'_ AD, &'_ AR, &'_ BS, &'_ BD, &'_ P, &'_ RA, &'_ AS, &'_ NB, &'_ CA, &'_ CG, &'_ CP, &'_ ApD, &'_ ApV, &'_ GS> {
|
||||
fn as_ref(&self) -> AllSubsystems<&'_ CV, &'_ CB, &'_ SD, &'_ AD, &'_ AR, &'_ BS, &'_ BD, &'_ P, &'_ RA, &'_ AS, &'_ NB, &'_ CA, &'_ CG, &'_ CP, &'_ ApD, &'_ ApV, &'_ GS> {
|
||||
AllSubsystems {
|
||||
candidate_validation: &self.candidate_validation,
|
||||
candidate_backing: &self.candidate_backing,
|
||||
candidate_selection: &self.candidate_selection,
|
||||
statement_distribution: &self.statement_distribution,
|
||||
availability_distribution: &self.availability_distribution,
|
||||
availability_recovery: &self.availability_recovery,
|
||||
@@ -274,7 +269,6 @@ impl<CV, CB, CS, SD, AD, AR, BS, BD, P, RA, AS, NB, CA, CG, CP, ApD, ApV, GS>
|
||||
-> AllSubsystems<
|
||||
<M as MapSubsystem<CV>>::Output,
|
||||
<M as MapSubsystem<CB>>::Output,
|
||||
<M as MapSubsystem<CS>>::Output,
|
||||
<M as MapSubsystem<SD>>::Output,
|
||||
<M as MapSubsystem<AD>>::Output,
|
||||
<M as MapSubsystem<AR>>::Output,
|
||||
@@ -294,7 +288,6 @@ impl<CV, CB, CS, SD, AD, AR, BS, BD, P, RA, AS, NB, CA, CG, CP, ApD, ApV, GS>
|
||||
where
|
||||
M: MapSubsystem<CV>,
|
||||
M: MapSubsystem<CB>,
|
||||
M: MapSubsystem<CS>,
|
||||
M: MapSubsystem<SD>,
|
||||
M: MapSubsystem<AD>,
|
||||
M: MapSubsystem<AR>,
|
||||
@@ -314,7 +307,6 @@ impl<CV, CB, CS, SD, AD, AR, BS, BD, P, RA, AS, NB, CA, CG, CP, ApD, ApV, GS>
|
||||
AllSubsystems {
|
||||
candidate_validation: m.map_subsystem(self.candidate_validation),
|
||||
candidate_backing: m.map_subsystem(self.candidate_backing),
|
||||
candidate_selection: m.map_subsystem(self.candidate_selection),
|
||||
statement_distribution: m.map_subsystem(self.statement_distribution),
|
||||
availability_distribution: m.map_subsystem(self.availability_distribution),
|
||||
availability_recovery: m.map_subsystem(self.availability_recovery),
|
||||
@@ -338,7 +330,7 @@ type AllSubsystemsSame<T> = AllSubsystems<
|
||||
T, T, T, T, T,
|
||||
T, T, T, T, T,
|
||||
T, T, T, T, T,
|
||||
T, T, T,
|
||||
T, T,
|
||||
>;
|
||||
|
||||
/// A type of messages that are sent from [`Subsystem`] to [`Overseer`].
|
||||
@@ -546,7 +538,6 @@ fn make_packet<T>(signals_received: usize, message: T) -> MessagePacket<T> {
|
||||
struct ChannelsOut {
|
||||
candidate_validation: metered::MeteredSender<MessagePacket<CandidateValidationMessage>>,
|
||||
candidate_backing: metered::MeteredSender<MessagePacket<CandidateBackingMessage>>,
|
||||
candidate_selection: metered::MeteredSender<MessagePacket<CandidateSelectionMessage>>,
|
||||
statement_distribution: metered::MeteredSender<MessagePacket<StatementDistributionMessage>>,
|
||||
availability_distribution: metered::MeteredSender<MessagePacket<AvailabilityDistributionMessage>>,
|
||||
availability_recovery: metered::MeteredSender<MessagePacket<AvailabilityRecoveryMessage>>,
|
||||
@@ -565,7 +556,6 @@ struct ChannelsOut {
|
||||
|
||||
candidate_validation_unbounded: metered::UnboundedMeteredSender<MessagePacket<CandidateValidationMessage>>,
|
||||
candidate_backing_unbounded: metered::UnboundedMeteredSender<MessagePacket<CandidateBackingMessage>>,
|
||||
candidate_selection_unbounded: metered::UnboundedMeteredSender<MessagePacket<CandidateSelectionMessage>>,
|
||||
statement_distribution_unbounded: metered::UnboundedMeteredSender<MessagePacket<StatementDistributionMessage>>,
|
||||
availability_distribution_unbounded: metered::UnboundedMeteredSender<MessagePacket<AvailabilityDistributionMessage>>,
|
||||
availability_recovery_unbounded: metered::UnboundedMeteredSender<MessagePacket<AvailabilityRecoveryMessage>>,
|
||||
@@ -596,9 +586,6 @@ impl ChannelsOut {
|
||||
AllMessages::CandidateBacking(msg) => {
|
||||
self.candidate_backing.send(make_packet(signals_received, msg)).await
|
||||
},
|
||||
AllMessages::CandidateSelection(msg) => {
|
||||
self.candidate_selection.send(make_packet(signals_received, msg)).await
|
||||
},
|
||||
AllMessages::StatementDistribution(msg) => {
|
||||
self.statement_distribution.send(make_packet(signals_received, msg)).await
|
||||
},
|
||||
@@ -671,11 +658,6 @@ impl ChannelsOut {
|
||||
.unbounded_send(make_packet(signals_received, msg))
|
||||
.map_err(|e| e.into_send_error())
|
||||
},
|
||||
AllMessages::CandidateSelection(msg) => {
|
||||
self.candidate_selection_unbounded
|
||||
.unbounded_send(make_packet(signals_received, msg))
|
||||
.map_err(|e| e.into_send_error())
|
||||
},
|
||||
AllMessages::StatementDistribution(msg) => {
|
||||
self.statement_distribution_unbounded
|
||||
.unbounded_send(make_packet(signals_received, msg))
|
||||
@@ -1058,7 +1040,6 @@ pub struct Overseer<S, SupportsParachains> {
|
||||
subsystems: AllSubsystems<
|
||||
OverseenSubsystem<CandidateValidationMessage>,
|
||||
OverseenSubsystem<CandidateBackingMessage>,
|
||||
OverseenSubsystem<CandidateSelectionMessage>,
|
||||
OverseenSubsystem<StatementDistributionMessage>,
|
||||
OverseenSubsystem<AvailabilityDistributionMessage>,
|
||||
OverseenSubsystem<AvailabilityRecoveryMessage>,
|
||||
@@ -1379,9 +1360,9 @@ where
|
||||
/// #
|
||||
/// # }); }
|
||||
/// ```
|
||||
pub fn new<CV, CB, CS, SD, AD, AR, BS, BD, P, RA, AS, NB, CA, CG, CP, ApD, ApV, GS>(
|
||||
pub fn new<CV, CB, SD, AD, AR, BS, BD, P, RA, AS, NB, CA, CG, CP, ApD, ApV, GS>(
|
||||
leaves: impl IntoIterator<Item = BlockInfo>,
|
||||
all_subsystems: AllSubsystems<CV, CB, CS, SD, AD, AR, BS, BD, P, RA, AS, NB, CA, CG, CP, ApD, ApV, GS>,
|
||||
all_subsystems: AllSubsystems<CV, CB, SD, AD, AR, BS, BD, P, RA, AS, NB, CA, CG, CP, ApD, ApV, GS>,
|
||||
prometheus_registry: Option<&prometheus::Registry>,
|
||||
supports_parachains: SupportsParachains,
|
||||
mut s: S,
|
||||
@@ -1389,7 +1370,6 @@ where
|
||||
where
|
||||
CV: Subsystem<OverseerSubsystemContext<CandidateValidationMessage>> + Send,
|
||||
CB: Subsystem<OverseerSubsystemContext<CandidateBackingMessage>> + Send,
|
||||
CS: Subsystem<OverseerSubsystemContext<CandidateSelectionMessage>> + Send,
|
||||
SD: Subsystem<OverseerSubsystemContext<StatementDistributionMessage>> + Send,
|
||||
AD: Subsystem<OverseerSubsystemContext<AvailabilityDistributionMessage>> + Send,
|
||||
AR: Subsystem<OverseerSubsystemContext<AvailabilityRecoveryMessage>> + Send,
|
||||
@@ -1422,8 +1402,6 @@ where
|
||||
= metered::channel(CHANNEL_CAPACITY);
|
||||
let (candidate_backing_bounded_tx, candidate_backing_bounded_rx)
|
||||
= metered::channel(CHANNEL_CAPACITY);
|
||||
let (candidate_selection_bounded_tx, candidate_selection_bounded_rx)
|
||||
= metered::channel(CHANNEL_CAPACITY);
|
||||
let (statement_distribution_bounded_tx, statement_distribution_bounded_rx)
|
||||
= metered::channel(CHANNEL_CAPACITY);
|
||||
let (availability_distribution_bounded_tx, availability_distribution_bounded_rx)
|
||||
@@ -1459,8 +1437,6 @@ where
|
||||
= metered::unbounded();
|
||||
let (candidate_backing_unbounded_tx, candidate_backing_unbounded_rx)
|
||||
= metered::unbounded();
|
||||
let (candidate_selection_unbounded_tx, candidate_selection_unbounded_rx)
|
||||
= metered::unbounded();
|
||||
let (statement_distribution_unbounded_tx, statement_distribution_unbounded_rx)
|
||||
= metered::unbounded();
|
||||
let (availability_distribution_unbounded_tx, availability_distribution_unbounded_rx)
|
||||
@@ -1495,7 +1471,6 @@ where
|
||||
let channels_out = ChannelsOut {
|
||||
candidate_validation: candidate_validation_bounded_tx.clone(),
|
||||
candidate_backing: candidate_backing_bounded_tx.clone(),
|
||||
candidate_selection: candidate_selection_bounded_tx.clone(),
|
||||
statement_distribution: statement_distribution_bounded_tx.clone(),
|
||||
availability_distribution: availability_distribution_bounded_tx.clone(),
|
||||
availability_recovery: availability_recovery_bounded_tx.clone(),
|
||||
@@ -1514,7 +1489,6 @@ where
|
||||
|
||||
candidate_validation_unbounded: candidate_validation_unbounded_tx.clone(),
|
||||
candidate_backing_unbounded: candidate_backing_unbounded_tx.clone(),
|
||||
candidate_selection_unbounded: candidate_selection_unbounded_tx.clone(),
|
||||
statement_distribution_unbounded: statement_distribution_unbounded_tx.clone(),
|
||||
availability_distribution_unbounded: availability_distribution_unbounded_tx.clone(),
|
||||
availability_recovery_unbounded: availability_recovery_unbounded_tx.clone(),
|
||||
@@ -1558,19 +1532,6 @@ where
|
||||
TaskKind::Regular,
|
||||
)?;
|
||||
|
||||
let candidate_selection_subsystem = spawn(
|
||||
&mut s,
|
||||
candidate_selection_bounded_tx,
|
||||
stream::select(candidate_selection_bounded_rx, candidate_selection_unbounded_rx),
|
||||
candidate_selection_unbounded_tx.meter().clone(),
|
||||
channels_out.clone(),
|
||||
to_overseer_tx.clone(),
|
||||
all_subsystems.candidate_selection,
|
||||
&metrics,
|
||||
&mut running_subsystems,
|
||||
TaskKind::Regular,
|
||||
)?;
|
||||
|
||||
let statement_distribution_subsystem = spawn(
|
||||
&mut s,
|
||||
statement_distribution_bounded_tx,
|
||||
@@ -1777,7 +1738,6 @@ where
|
||||
let subsystems = AllSubsystems {
|
||||
candidate_validation: candidate_validation_subsystem,
|
||||
candidate_backing: candidate_backing_subsystem,
|
||||
candidate_selection: candidate_selection_subsystem,
|
||||
statement_distribution: statement_distribution_subsystem,
|
||||
availability_distribution: availability_distribution_subsystem,
|
||||
availability_recovery: availability_recovery_subsystem,
|
||||
@@ -1853,7 +1813,6 @@ where
|
||||
async fn stop(mut self) {
|
||||
let _ = self.subsystems.candidate_validation.send_signal(OverseerSignal::Conclude).await;
|
||||
let _ = self.subsystems.candidate_backing.send_signal(OverseerSignal::Conclude).await;
|
||||
let _ = self.subsystems.candidate_selection.send_signal(OverseerSignal::Conclude).await;
|
||||
let _ = self.subsystems.statement_distribution.send_signal(OverseerSignal::Conclude).await;
|
||||
let _ = self.subsystems.availability_distribution.send_signal(OverseerSignal::Conclude).await;
|
||||
let _ = self.subsystems.availability_recovery.send_signal(OverseerSignal::Conclude).await;
|
||||
@@ -2036,7 +1995,6 @@ where
|
||||
async fn broadcast_signal(&mut self, signal: OverseerSignal) -> SubsystemResult<()> {
|
||||
self.subsystems.candidate_validation.send_signal(signal.clone()).await?;
|
||||
self.subsystems.candidate_backing.send_signal(signal.clone()).await?;
|
||||
self.subsystems.candidate_selection.send_signal(signal.clone()).await?;
|
||||
self.subsystems.statement_distribution.send_signal(signal.clone()).await?;
|
||||
self.subsystems.availability_distribution.send_signal(signal.clone()).await?;
|
||||
self.subsystems.availability_recovery.send_signal(signal.clone()).await?;
|
||||
@@ -2066,9 +2024,6 @@ where
|
||||
AllMessages::CandidateBacking(msg) => {
|
||||
self.subsystems.candidate_backing.send_message(msg).await?;
|
||||
},
|
||||
AllMessages::CandidateSelection(msg) => {
|
||||
self.subsystems.candidate_selection.send_message(msg).await?;
|
||||
},
|
||||
AllMessages::StatementDistribution(msg) => {
|
||||
self.subsystems.statement_distribution.send_message(msg).await?;
|
||||
},
|
||||
@@ -3092,10 +3047,6 @@ mod tests {
|
||||
CandidateBackingMessage::GetBackedCandidates(Default::default(), Vec::new(), sender)
|
||||
}
|
||||
|
||||
fn test_candidate_selection_msg() -> CandidateSelectionMessage {
|
||||
CandidateSelectionMessage::default()
|
||||
}
|
||||
|
||||
fn test_chain_api_msg() -> ChainApiMessage {
|
||||
let (sender, _) = oneshot::channel();
|
||||
ChainApiMessage::FinalizedBlockNumber(sender)
|
||||
@@ -3177,7 +3128,7 @@ mod tests {
|
||||
// Checks that `stop`, `broadcast_signal` and `broadcast_message` are implemented correctly.
|
||||
#[test]
|
||||
fn overseer_all_subsystems_receive_signals_and_messages() {
|
||||
const NUM_SUBSYSTEMS: usize = 18;
|
||||
const NUM_SUBSYSTEMS: usize = 17;
|
||||
// -3 for BitfieldSigning, GossipSupport and AvailabilityDistribution
|
||||
const NUM_SUBSYSTEMS_MESSAGED: usize = NUM_SUBSYSTEMS - 3;
|
||||
|
||||
@@ -3196,7 +3147,6 @@ mod tests {
|
||||
let all_subsystems = AllSubsystems {
|
||||
candidate_validation: subsystem.clone(),
|
||||
candidate_backing: subsystem.clone(),
|
||||
candidate_selection: subsystem.clone(),
|
||||
collation_generation: subsystem.clone(),
|
||||
collator_protocol: subsystem.clone(),
|
||||
statement_distribution: subsystem.clone(),
|
||||
@@ -3235,7 +3185,6 @@ mod tests {
|
||||
// except for BitfieldSigning and GossipSupport as the messages are not instantiable
|
||||
handler.send_msg(AllMessages::CandidateValidation(test_candidate_validation_msg())).await;
|
||||
handler.send_msg(AllMessages::CandidateBacking(test_candidate_backing_msg())).await;
|
||||
handler.send_msg(AllMessages::CandidateSelection(test_candidate_selection_msg())).await;
|
||||
handler.send_msg(AllMessages::CollationGeneration(test_collator_generation_msg())).await;
|
||||
handler.send_msg(AllMessages::CollatorProtocol(test_collator_protocol_msg())).await;
|
||||
handler.send_msg(AllMessages::StatementDistribution(test_statement_distribution_msg())).await;
|
||||
@@ -3285,7 +3234,6 @@ mod tests {
|
||||
fn context_holds_onto_message_until_enough_signals_received() {
|
||||
let (candidate_validation_bounded_tx, _) = metered::channel(CHANNEL_CAPACITY);
|
||||
let (candidate_backing_bounded_tx, _) = metered::channel(CHANNEL_CAPACITY);
|
||||
let (candidate_selection_bounded_tx, _) = metered::channel(CHANNEL_CAPACITY);
|
||||
let (statement_distribution_bounded_tx, _) = metered::channel(CHANNEL_CAPACITY);
|
||||
let (availability_distribution_bounded_tx, _) = metered::channel(CHANNEL_CAPACITY);
|
||||
let (availability_recovery_bounded_tx, _) = metered::channel(CHANNEL_CAPACITY);
|
||||
@@ -3304,7 +3252,6 @@ mod tests {
|
||||
|
||||
let (candidate_validation_unbounded_tx, _) = metered::unbounded();
|
||||
let (candidate_backing_unbounded_tx, _) = metered::unbounded();
|
||||
let (candidate_selection_unbounded_tx, _) = metered::unbounded();
|
||||
let (statement_distribution_unbounded_tx, _) = metered::unbounded();
|
||||
let (availability_distribution_unbounded_tx, _) = metered::unbounded();
|
||||
let (availability_recovery_unbounded_tx, _) = metered::unbounded();
|
||||
@@ -3324,7 +3271,6 @@ mod tests {
|
||||
let channels_out = ChannelsOut {
|
||||
candidate_validation: candidate_validation_bounded_tx.clone(),
|
||||
candidate_backing: candidate_backing_bounded_tx.clone(),
|
||||
candidate_selection: candidate_selection_bounded_tx.clone(),
|
||||
statement_distribution: statement_distribution_bounded_tx.clone(),
|
||||
availability_distribution: availability_distribution_bounded_tx.clone(),
|
||||
availability_recovery: availability_recovery_bounded_tx.clone(),
|
||||
@@ -3343,7 +3289,6 @@ mod tests {
|
||||
|
||||
candidate_validation_unbounded: candidate_validation_unbounded_tx.clone(),
|
||||
candidate_backing_unbounded: candidate_backing_unbounded_tx.clone(),
|
||||
candidate_selection_unbounded: candidate_selection_unbounded_tx.clone(),
|
||||
statement_distribution_unbounded: statement_distribution_unbounded_tx.clone(),
|
||||
availability_distribution_unbounded: availability_distribution_unbounded_tx.clone(),
|
||||
availability_recovery_unbounded: availability_recovery_unbounded_tx.clone(),
|
||||
|
||||
@@ -97,7 +97,6 @@ polkadot-node-collation-generation = { path = "../collation-generation", optiona
|
||||
polkadot-node-core-av-store = { path = "../core/av-store", optional = true }
|
||||
polkadot-node-core-backing = { path = "../core/backing", optional = true }
|
||||
polkadot-node-core-bitfield-signing = { path = "../core/bitfield-signing", optional = true }
|
||||
polkadot-node-core-candidate-selection = { path = "../core/candidate-selection", optional = true }
|
||||
polkadot-node-core-candidate-validation = { path = "../core/candidate-validation", optional = true }
|
||||
polkadot-node-core-chain-api = { path = "../core/chain-api", optional = true }
|
||||
polkadot-node-core-provisioner = { path = "../core/provisioner", optional = true }
|
||||
@@ -125,7 +124,6 @@ full-node = [
|
||||
"polkadot-node-collation-generation",
|
||||
"polkadot-node-core-backing",
|
||||
"polkadot-node-core-bitfield-signing",
|
||||
"polkadot-node-core-candidate-selection",
|
||||
"polkadot-node-core-candidate-validation",
|
||||
"polkadot-node-core-chain-api",
|
||||
"polkadot-node-core-provisioner",
|
||||
|
||||
@@ -447,7 +447,6 @@ where
|
||||
use polkadot_availability_bitfield_distribution::BitfieldDistribution as BitfieldDistributionSubsystem;
|
||||
use polkadot_node_core_bitfield_signing::BitfieldSigningSubsystem;
|
||||
use polkadot_node_core_backing::CandidateBackingSubsystem;
|
||||
use polkadot_node_core_candidate_selection::CandidateSelectionSubsystem;
|
||||
use polkadot_node_core_candidate_validation::CandidateValidationSubsystem;
|
||||
use polkadot_node_core_chain_api::ChainApiSubsystem;
|
||||
use polkadot_node_collation_generation::CollationGenerationSubsystem;
|
||||
@@ -486,11 +485,6 @@ where
|
||||
keystore.clone(),
|
||||
Metrics::register(registry)?,
|
||||
),
|
||||
candidate_selection: CandidateSelectionSubsystem::new(
|
||||
spawner.clone(),
|
||||
keystore.clone(),
|
||||
Metrics::register(registry)?,
|
||||
),
|
||||
candidate_validation: CandidateValidationSubsystem::with_config(
|
||||
candidate_validation_config,
|
||||
Metrics::register(registry)?,
|
||||
|
||||
@@ -362,8 +362,8 @@ mod tests {
|
||||
use super::*;
|
||||
use polkadot_overseer::{Overseer, HeadSupportsParachains, AllSubsystems};
|
||||
use futures::executor::block_on;
|
||||
use polkadot_node_subsystem::messages::CandidateSelectionMessage;
|
||||
use polkadot_primitives::v1::Hash;
|
||||
use polkadot_node_subsystem::messages::CollatorProtocolMessage;
|
||||
|
||||
struct AlwaysSupportsParachains;
|
||||
impl HeadSupportsParachains for AlwaysSupportsParachains {
|
||||
@@ -374,7 +374,7 @@ mod tests {
|
||||
fn forward_subsystem_works() {
|
||||
let spawner = sp_core::testing::TaskExecutor::new();
|
||||
let (tx, rx) = mpsc::channel(2);
|
||||
let all_subsystems = AllSubsystems::<()>::dummy().replace_candidate_selection(ForwardSubsystem(tx));
|
||||
let all_subsystems = AllSubsystems::<()>::dummy().replace_collator_protocol(ForwardSubsystem(tx));
|
||||
let (overseer, mut handler) = Overseer::new(
|
||||
Vec::new(),
|
||||
all_subsystems,
|
||||
@@ -385,7 +385,7 @@ mod tests {
|
||||
|
||||
spawner.spawn("overseer", overseer.run().then(|_| async { () }).boxed());
|
||||
|
||||
block_on(handler.send_msg(CandidateSelectionMessage::Invalid(Default::default(), Default::default())));
|
||||
assert!(matches!(block_on(rx.into_future()).0.unwrap(), CandidateSelectionMessage::Invalid(_, _)));
|
||||
block_on(handler.send_msg(CollatorProtocolMessage::CollateOn(Default::default())));
|
||||
assert!(matches!(block_on(rx.into_future()).0.unwrap(), CollatorProtocolMessage::CollateOn(_)));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -867,7 +867,7 @@ mod tests {
|
||||
use thiserror::Error;
|
||||
use polkadot_node_jaeger as jaeger;
|
||||
use polkadot_node_subsystem::{
|
||||
messages::{AllMessages, CandidateSelectionMessage}, ActiveLeavesUpdate, FromOverseer, OverseerSignal,
|
||||
messages::{AllMessages, CollatorProtocolMessage}, ActiveLeavesUpdate, FromOverseer, OverseerSignal,
|
||||
SpawnedSubsystem, ActivatedLeaf, LeafStatus,
|
||||
};
|
||||
use assert_matches::assert_matches;
|
||||
@@ -884,8 +884,8 @@ mod tests {
|
||||
|
||||
// job structs are constructed within JobTrait::run
|
||||
// most will want to retain the sender and receiver, as well as whatever other data they like
|
||||
struct FakeCandidateSelectionJob {
|
||||
receiver: mpsc::Receiver<CandidateSelectionMessage>,
|
||||
struct FakeCollatorProtocolJob {
|
||||
receiver: mpsc::Receiver<CollatorProtocolMessage>,
|
||||
}
|
||||
|
||||
// Error will mostly be a wrapper to make the try operator more convenient;
|
||||
@@ -897,13 +897,13 @@ mod tests {
|
||||
Sending(#[from]mpsc::SendError),
|
||||
}
|
||||
|
||||
impl JobTrait for FakeCandidateSelectionJob {
|
||||
type ToJob = CandidateSelectionMessage;
|
||||
impl JobTrait for FakeCollatorProtocolJob {
|
||||
type ToJob = CollatorProtocolMessage;
|
||||
type Error = Error;
|
||||
type RunArgs = bool;
|
||||
type Metrics = ();
|
||||
|
||||
const NAME: &'static str = "FakeCandidateSelectionJob";
|
||||
const NAME: &'static str = "FakeCollatorProtocolJob";
|
||||
|
||||
/// Run a job for the parent block indicated
|
||||
//
|
||||
@@ -913,14 +913,14 @@ mod tests {
|
||||
_: Arc<jaeger::Span>,
|
||||
run_args: Self::RunArgs,
|
||||
_metrics: Self::Metrics,
|
||||
receiver: mpsc::Receiver<CandidateSelectionMessage>,
|
||||
receiver: mpsc::Receiver<CollatorProtocolMessage>,
|
||||
mut sender: JobSender<S>,
|
||||
) -> Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send>> {
|
||||
async move {
|
||||
let job = FakeCandidateSelectionJob { receiver };
|
||||
let job = FakeCollatorProtocolJob { receiver };
|
||||
|
||||
if run_args {
|
||||
sender.send_message(CandidateSelectionMessage::Invalid(
|
||||
sender.send_message(CollatorProtocolMessage::Invalid(
|
||||
Default::default(),
|
||||
Default::default(),
|
||||
).into()).await;
|
||||
@@ -934,7 +934,7 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
impl FakeCandidateSelectionJob {
|
||||
impl FakeCollatorProtocolJob {
|
||||
async fn run_loop(mut self) -> Result<(), Error> {
|
||||
loop {
|
||||
match self.receiver.next().await {
|
||||
@@ -950,11 +950,11 @@ mod tests {
|
||||
}
|
||||
|
||||
// with the job defined, it's straightforward to get a subsystem implementation.
|
||||
type FakeCandidateSelectionSubsystem<Spawner> =
|
||||
JobSubsystem<FakeCandidateSelectionJob, Spawner>;
|
||||
type FakeCollatorProtocolSubsystem<Spawner> =
|
||||
JobSubsystem<FakeCollatorProtocolJob, Spawner>;
|
||||
|
||||
// this type lets us pretend to be the overseer
|
||||
type OverseerHandle = test_helpers::TestSubsystemContextHandle<CandidateSelectionMessage>;
|
||||
type OverseerHandle = test_helpers::TestSubsystemContextHandle<CollatorProtocolMessage>;
|
||||
|
||||
fn test_harness<T: Future<Output = ()>>(
|
||||
run_args: bool,
|
||||
@@ -971,7 +971,7 @@ mod tests {
|
||||
let pool = sp_core::testing::TaskExecutor::new();
|
||||
let (context, overseer_handle) = make_subsystem_context(pool.clone());
|
||||
|
||||
let subsystem = FakeCandidateSelectionSubsystem::new(
|
||||
let subsystem = FakeCollatorProtocolSubsystem::new(
|
||||
pool,
|
||||
run_args,
|
||||
(),
|
||||
@@ -1005,7 +1005,7 @@ mod tests {
|
||||
.await;
|
||||
assert_matches!(
|
||||
overseer_handle.recv().await,
|
||||
AllMessages::CandidateSelection(_)
|
||||
AllMessages::CollatorProtocol(_)
|
||||
);
|
||||
overseer_handle
|
||||
.send(FromOverseer::Signal(OverseerSignal::ActiveLeaves(
|
||||
@@ -1045,7 +1045,7 @@ mod tests {
|
||||
// the subsystem is still alive
|
||||
assert_matches!(
|
||||
overseer_handle.recv().await,
|
||||
AllMessages::CandidateSelection(_)
|
||||
AllMessages::CollatorProtocol(_)
|
||||
);
|
||||
|
||||
overseer_handle
|
||||
@@ -1057,11 +1057,11 @@ mod tests {
|
||||
#[test]
|
||||
fn test_subsystem_impl_and_name_derivation() {
|
||||
let pool = sp_core::testing::TaskExecutor::new();
|
||||
let (context, _) = make_subsystem_context::<CandidateSelectionMessage, _>(pool.clone());
|
||||
let (context, _) = make_subsystem_context::<CollatorProtocolMessage, _>(pool.clone());
|
||||
|
||||
let SpawnedSubsystem { name, .. } =
|
||||
FakeCandidateSelectionSubsystem::new(pool, false, ()).start(context);
|
||||
assert_eq!(name, "FakeCandidateSelection");
|
||||
FakeCollatorProtocolSubsystem::new(pool, false, ()).start(context);
|
||||
assert_eq!(name, "FakeCollatorProtocol");
|
||||
}
|
||||
|
||||
|
||||
|
||||
@@ -64,37 +64,6 @@ pub trait BoundToRelayParent {
|
||||
fn relay_parent(&self) -> Hash;
|
||||
}
|
||||
|
||||
/// Messages received by the Candidate Selection subsystem.
|
||||
#[derive(Debug)]
|
||||
pub enum CandidateSelectionMessage {
|
||||
/// A candidate collation can be fetched from a collator and should be considered for seconding.
|
||||
Collation(Hash, ParaId, CollatorId),
|
||||
/// We recommended a particular candidate to be seconded, but it was invalid; penalize the collator.
|
||||
///
|
||||
/// The hash is the relay parent.
|
||||
Invalid(Hash, CandidateReceipt),
|
||||
/// The candidate we recommended to be seconded was validated successfully.
|
||||
///
|
||||
/// The hash is the relay parent.
|
||||
Seconded(Hash, SignedFullStatement),
|
||||
}
|
||||
|
||||
impl BoundToRelayParent for CandidateSelectionMessage {
|
||||
fn relay_parent(&self) -> Hash {
|
||||
match self {
|
||||
Self::Collation(hash, ..) => *hash,
|
||||
Self::Invalid(hash, _) => *hash,
|
||||
Self::Seconded(hash, _) => *hash,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for CandidateSelectionMessage {
|
||||
fn default() -> Self {
|
||||
CandidateSelectionMessage::Invalid(Default::default(), Default::default())
|
||||
}
|
||||
}
|
||||
|
||||
/// Messages received by the Candidate Backing subsystem.
|
||||
#[derive(Debug)]
|
||||
pub enum CandidateBackingMessage {
|
||||
@@ -192,20 +161,34 @@ pub enum CollatorProtocolMessage {
|
||||
/// The result sender should be informed when at least one parachain validator seconded the collation. It is also
|
||||
/// completely okay to just drop the sender.
|
||||
DistributeCollation(CandidateReceipt, PoV, Option<oneshot::Sender<SignedFullStatement>>),
|
||||
/// Fetch a collation under the given relay-parent for the given ParaId.
|
||||
FetchCollation(Hash, CollatorId, ParaId, oneshot::Sender<(CandidateReceipt, PoV)>),
|
||||
/// Report a collator as having provided an invalid collation. This should lead to disconnect
|
||||
/// and blacklist of the collator.
|
||||
ReportCollator(CollatorId),
|
||||
/// Note a collator as having provided a good collation.
|
||||
NoteGoodCollation(CollatorId),
|
||||
/// Notify a collator that its collation was seconded.
|
||||
NotifyCollationSeconded(CollatorId, Hash, SignedFullStatement),
|
||||
/// Get a network bridge update.
|
||||
#[from]
|
||||
NetworkBridgeUpdateV1(NetworkBridgeEvent<protocol_v1::CollatorProtocolMessage>),
|
||||
/// Incoming network request for a collation.
|
||||
CollationFetchingRequest(IncomingRequest<req_res_v1::CollationFetchingRequest>)
|
||||
CollationFetchingRequest(IncomingRequest<req_res_v1::CollationFetchingRequest>),
|
||||
/// We recommended a particular candidate to be seconded, but it was invalid; penalize the collator.
|
||||
///
|
||||
/// The hash is the relay parent.
|
||||
Invalid(Hash, CandidateReceipt),
|
||||
/// The candidate we recommended to be seconded was validated successfully.
|
||||
///
|
||||
/// The hash is the relay parent.
|
||||
Seconded(Hash, SignedFullStatement),
|
||||
}
|
||||
|
||||
impl Default for CollatorProtocolMessage {
|
||||
fn default() -> Self {
|
||||
Self::CollateOn(Default::default())
|
||||
}
|
||||
}
|
||||
|
||||
impl BoundToRelayParent for CollatorProtocolMessage {
|
||||
fn relay_parent(&self) -> Hash {
|
||||
Default::default()
|
||||
}
|
||||
}
|
||||
|
||||
/// Messages received by the network bridge subsystem.
|
||||
@@ -688,9 +671,6 @@ pub enum AllMessages {
|
||||
/// Message for the candidate backing subsystem.
|
||||
#[skip]
|
||||
CandidateBacking(CandidateBackingMessage),
|
||||
/// Message for the candidate selection subsystem.
|
||||
#[skip]
|
||||
CandidateSelection(CandidateSelectionMessage),
|
||||
/// Message for the Chain API subsystem.
|
||||
#[skip]
|
||||
ChainApi(ChainApiMessage),
|
||||
|
||||
@@ -1,40 +0,0 @@
|
||||
# Candidate Selection
|
||||
|
||||
The Candidate Selection Subsystem is run by validators, and is responsible for interfacing with Collators to select a candidate, along with its PoV, to second during the backing process relative to a specific relay parent.
|
||||
|
||||
This subsystem includes networking code for communicating with collators, and tracks which collations specific collators have submitted. This subsystem is responsible for disconnecting and blacklisting collators who are found to have submitted invalid collations. Typically an invalid collation will be discovered by a different subsystem.
|
||||
|
||||
This subsystem is only ever interested in parablocks assigned to the particular parachain which this validator is currently handling.
|
||||
|
||||
New parablock candidates may arrive from a potentially unbounded set of collators. This subsystem chooses either 0 or 1 of them per relay parent to second. If it chooses to second a candidate, it sends an appropriate message to the [Candidate Backing subsystem](candidate-backing.md) to generate an appropriate [`Statement`](../../types/backing.md#statement-type).
|
||||
|
||||
In the event that a parablock candidate proves invalid, this subsystem will receive a message back from the Candidate Backing subsystem indicating so. If that parablock candidate originated from a collator, this subsystem will blacklist that collator. If that parablock candidate originated from a peer, this subsystem generates a report for the [Misbehavior Arbitration subsystem](../utility/misbehavior-arbitration.md).
|
||||
|
||||
## Protocol
|
||||
|
||||
Input: [`CandidateSelectionMessage`](../../types/overseer-protocol.md#candidate-selection-message)
|
||||
|
||||
Output:
|
||||
|
||||
- [`CandidateBackingMessage`](../../types/overseer-protocol.md#candidate-backing-message)`::Second`
|
||||
- Peer set manager: report peers (collators who have misbehaved)
|
||||
|
||||
## Functionality
|
||||
|
||||
Overarching network protocol + job for every relay-parent
|
||||
|
||||
For the moment, the candidate selection algorithm is simply to second the first valid parablock candidate per relay head. See [Future Work](#future-work).
|
||||
|
||||
## Candidate Selection Job
|
||||
|
||||
- Aware of validator key and assignment
|
||||
- One job for each relay-parent, which selects up to one collation for the Candidate Backing Subsystem
|
||||
|
||||
## Future Work
|
||||
|
||||
Several approaches have been discussed, but all have some issues:
|
||||
|
||||
- The current approach is very straightforward. However, that protocol is vulnerable to a single collator which, as an attack or simply through chance, gets its block candidate to the node more often than its fair share of the time.
|
||||
- It may be possible to do some BABE-like selection algorithm to choose an "Official" collator for the round, but that is tricky because the collator which produces the PoV does not necessarily actually produce the block.
|
||||
- We could use relay-chain BABE randomness to generate some delay `D` on the order of 1 second, +- 1 second. The collator would then second the first valid parablock which arrives after `D`, or in case none has arrived by `2*D`, the last valid parablock which has arrived. This makes it very hard for a collator to game the system to always get its block nominated, but it reduces the maximum throughput of the system by introducing delay into an already tight schedule.
|
||||
- A variation of that scheme would be to randomly choose a number `I`, and have a fixed acceptance window `D` for parablock candidates. At the end of the period `D`, count `C`: the number of parablock candidates received. Second the one with index `I % C`. Its drawback is the same: it must wait the full `D` period before seconding any of its received candidates, reducing throughput.
|
||||
@@ -10,7 +10,7 @@ Validation of candidates is a heavy task, and furthermore, the [`PoV`][PoV] itse
|
||||
|
||||
> TODO: note the incremental validation function Ximin proposes at https://github.com/paritytech/polkadot/issues/1348
|
||||
|
||||
As this network protocol serves as a bridge between collators and validators, it communicates primarily with one subsystem on behalf of each. As a collator, this will receive messages from the [`CollationGeneration`][CG] subsystem. As a validator, this will communicate with the [`CandidateBacking`][CB] and [`CandidateSelection`][CS] subsystems.
|
||||
As this network protocol serves as a bridge between collators and validators, it communicates primarily with one subsystem on behalf of each. As a collator, this will receive messages from the [`CollationGeneration`][CG] subsystem. As a validator, this will communicate only with the [`CandidateBacking`][CB].
|
||||
|
||||
## Protocol
|
||||
|
||||
@@ -20,7 +20,7 @@ Output:
|
||||
|
||||
- [`RuntimeApiMessage`][RAM]
|
||||
- [`NetworkBridgeMessage`][NBM]
|
||||
- [`CandidateSelectionMessage`][CSM]
|
||||
- [`CandidateBackingMessage`][CBM]
|
||||
|
||||
## Functionality
|
||||
|
||||
@@ -106,16 +106,24 @@ As a validator, we will handle requests from other subsystems to fetch a collati
|
||||
|
||||
When acting on an advertisement, we issue a `Requests::CollationFetching`. If the request times out, we need to note the collator as being unreliable and reduce its priority relative to other collators.
|
||||
|
||||
As a validator, once the collation has been fetched some other subsystem will inspect and do deeper validation of the collation. The subsystem will report to this subsystem with a [`CollatorProtocolMessage`][CPM]`::ReportCollator` or `NoteGoodCollation` message. In that case, if we are connected directly to the collator, we apply a cost to the `PeerId` associated with the collator and potentially disconnect or blacklist it.
|
||||
As a validator, once the collation has been fetched some other subsystem will inspect and do deeper validation of the collation. The subsystem will report to this subsystem with a [`CollatorProtocolMessage`][CPM]`::ReportCollator`. In that case, if we are connected directly to the collator, we apply a cost to the `PeerId` associated with the collator and potentially disconnect or blacklist it. If the collation is seconded, we notify the collator and apply a benefit to the `PeerId` associated with the collator.
|
||||
|
||||
### Interaction with [Candidate Selection][CS]
|
||||
### Interaction with [Candidate Backing][CB]
|
||||
|
||||
As collators advertise the availability, we notify the Candidate Selection subsystem with a [`CandidateSelection`][CSM]`::Collation` message. Note that this message is lightweight: it only contains the relay parent, para id, and collator id.
|
||||
As collators advertise the availability, a validator will simply second the first valid parablock candidate per relay head by sending a [`CandidateBackingMessage`][CBM]`::Second`. Note that this message contains the relay parent of the advertised collation, the candidate receipt and the [PoV][PoV].
|
||||
|
||||
At that point, the Candidate Selection algorithm is free to use an arbitrary algorithm to determine which if any of these messages to follow up on. It is expected to use the [`CollatorProtocolMessage`][CPM]`::FetchCollation` message to follow up.
|
||||
Subsequently, once a valid parablock candidate has been seconded, the [`CandidateBacking`][CB] subsystem will send a [`CollatorProtocolMessage`][CPM]`::Seconded`, which will trigger this subsystem to notify the collator at the `PeerId` that first advertised the parablock on the seconded relay head of their successful seconding.
|
||||
|
||||
The intent behind this design is to minimize the total number of (large) collations which must be transmitted.
|
||||
|
||||
## Future Work
|
||||
|
||||
Several approaches have been discussed, but all have some issues:
|
||||
|
||||
- The current approach is very straightforward. However, that protocol is vulnerable to a single collator which, as an attack or simply through chance, gets its block candidate to the node more often than its fair share of the time.
|
||||
- If collators produce blocks via Aura, BABE or in future Sassafrass, it may be possible to choose an "Official" collator for the round, but it may be tricky to ensure that the PVF logic is enforced at collator leader election.
|
||||
- We could use relay-chain BABE randomness to generate some delay `D` on the order of 1 second, +- 1 second. The collator would then second the first valid parablock which arrives after `D`, or in case none has arrived by `2*D`, the last valid parablock which has arrived. This makes it very hard for a collator to game the system to always get its block nominated, but it reduces the maximum throughput of the system by introducing delay into an already tight schedule.
|
||||
- A variation of that scheme would be to have a fixed acceptance window `D` for parablock candidates and keep track of count `C`: the number of parablock candidates received. At the end of the period `D`, we choose a random number I in the range [0, C) and second the block at Index I. Its drawback is the same: it must wait the full `D` period before seconding any of its received candidates, reducing throughput.
|
||||
- In order to protect against DoS attacks, it may be prudent to run throw out collations from collators that have behaved poorly (whether recently or historically) and subsequently only verify the PoV for the most suitable of collations.
|
||||
|
||||
[CB]: ../backing/candidate-backing.md
|
||||
[CBM]: ../../types/overseer-protocol.md#candidate-backing-mesage
|
||||
|
||||
Reference in New Issue
Block a user