// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see .
//! Implements a `CandidateBackingSubsystem`.
#![deny(unused_crate_dependencies)]
use std::collections::{HashMap, HashSet};
use std::pin::Pin;
use std::sync::Arc;
use bitvec::vec::BitVec;
use futures::{channel::{mpsc, oneshot}, Future, FutureExt, SinkExt, StreamExt};
use sp_keystore::SyncCryptoStorePtr;
use polkadot_primitives::v1::{
BackedCandidate, CandidateCommitments, CandidateDescriptor, CandidateHash,
CandidateReceipt, CollatorId, CommittedCandidateReceipt, CoreIndex, CoreState, Hash, Id as ParaId,
SigningContext, ValidatorId, ValidatorIndex, ValidatorSignature, ValidityAttestation,
};
use polkadot_node_primitives::{
Statement, SignedFullStatement, ValidationResult, PoV, AvailableData,
};
use polkadot_subsystem::{
PerLeafSpan, Stage, SubsystemSender,
jaeger,
messages::{
AllMessages, AvailabilityDistributionMessage, AvailabilityStoreMessage,
CandidateBackingMessage, CandidateSelectionMessage, CandidateValidationMessage,
ProvisionableData, ProvisionerMessage, RuntimeApiRequest,
StatementDistributionMessage, ValidationFailed
}
};
use polkadot_node_subsystem_util::{
self as util,
request_session_index_for_child,
request_validator_groups,
request_validators,
request_from_runtime,
Validator,
FromJobCommand,
JobSender,
metrics::{self, prometheus},
};
use statement_table::{
generic::AttestedCandidate as TableAttestedCandidate,
Context as TableContextTrait,
Table,
v1::{
SignedStatement as TableSignedStatement,
Statement as TableStatement,
Summary as TableSummary,
},
};
use thiserror::Error;
const LOG_TARGET: &str = "parachain::candidate-backing";
/// Errors that can occur in candidate backing.
#[derive(Debug, Error)]
pub enum Error {
#[error("Candidate is not found")]
CandidateNotFound,
#[error("Signature is invalid")]
InvalidSignature,
#[error("Failed to send candidates {0:?}")]
Send(Vec),
#[error("FetchPoV failed")]
FetchPoV,
#[error("ValidateFromChainState channel closed before receipt")]
ValidateFromChainState(#[source] oneshot::Canceled),
#[error("StoreAvailableData channel closed before receipt")]
StoreAvailableData(#[source] oneshot::Canceled),
#[error("a channel was closed before receipt in try_join!")]
JoinMultiple(#[source] oneshot::Canceled),
#[error("Obtaining erasure chunks failed")]
ObtainErasureChunks(#[from] erasure_coding::Error),
#[error(transparent)]
ValidationFailed(#[from] ValidationFailed),
#[error(transparent)]
Mpsc(#[from] mpsc::SendError),
#[error(transparent)]
UtilError(#[from] util::Error),
}
/// PoV data to validate.
enum PoVData {
/// Allready available (from candidate selection).
Ready(Arc),
/// Needs to be fetched from validator (we are checking a signed statement).
FetchFromValidator {
from_validator: ValidatorIndex,
candidate_hash: CandidateHash,
pov_hash: Hash,
},
}
enum ValidatedCandidateCommand {
// We were instructed to second the candidate that has been already validated.
Second(BackgroundValidationResult),
// We were instructed to validate the candidate.
Attest(BackgroundValidationResult),
// We were not able to `Attest` because backing validator did not send us the PoV.
AttestNoPoV(CandidateHash),
}
impl std::fmt::Debug for ValidatedCandidateCommand {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
let candidate_hash = self.candidate_hash();
match *self {
ValidatedCandidateCommand::Second(_) =>
write!(f, "Second({})", candidate_hash),
ValidatedCandidateCommand::Attest(_) =>
write!(f, "Attest({})", candidate_hash),
ValidatedCandidateCommand::AttestNoPoV(_) =>
write!(f, "Attest({})", candidate_hash),
}
}
}
impl ValidatedCandidateCommand {
fn candidate_hash(&self) -> CandidateHash {
match *self {
ValidatedCandidateCommand::Second(Ok((ref candidate, _, _))) => candidate.hash(),
ValidatedCandidateCommand::Second(Err(ref candidate)) => candidate.hash(),
ValidatedCandidateCommand::Attest(Ok((ref candidate, _, _))) => candidate.hash(),
ValidatedCandidateCommand::Attest(Err(ref candidate)) => candidate.hash(),
ValidatedCandidateCommand::AttestNoPoV(candidate_hash) => candidate_hash,
}
}
}
/// Holds all data needed for candidate backing job operation.
pub struct CandidateBackingJob {
/// The hash of the relay parent on top of which this job is doing it's work.
parent: Hash,
/// The `ParaId` assigned to this validator
assignment: Option,
/// The collator required to author the candidate, if any.
required_collator: Option,
/// Spans for all candidates that are not yet backable.
unbacked_candidates: HashMap,
/// We issued `Seconded`, `Valid` or `Invalid` statements on about these candidates.
issued_statements: HashSet,
/// These candidates are undergoing validation in the background.
awaiting_validation: HashSet,
/// Data needed for retrying in case of `ValidatedCandidateCommand::AttestNoPoV`.
fallbacks: HashMap)>,
/// `Some(h)` if this job has already issued `Seconded` statement for some candidate with `h` hash.
seconded: Option,
/// The candidates that are includable, by hash. Each entry here indicates
/// that we've sent the provisioner the backed candidate.
backed: HashSet,
keystore: SyncCryptoStorePtr,
table: Table,
table_context: TableContext,
background_validation: mpsc::Receiver,
background_validation_tx: mpsc::Sender,
metrics: Metrics,
}
/// In case a backing validator does not provide a PoV, we need to retry with other backing
/// validators.
///
/// This is the data needed to accomplish this. Basically all the data needed for spawning a
/// validation job and a list of backing validators, we can try.
#[derive(Clone)]
struct AttestingData {
/// The candidate to attest.
candidate: CandidateReceipt,
/// Hash of the PoV we need to fetch.
pov_hash: Hash,
/// Validator we are currently trying to get the PoV from.
from_validator: ValidatorIndex,
/// Other backing validators we can try in case `from_validator` failed.
backing: Vec,
}
const fn group_quorum(n_validators: usize) -> usize {
(n_validators / 2) + 1
}
#[derive(Default)]
struct TableContext {
signing_context: SigningContext,
validator: Option,
groups: HashMap>,
validators: Vec,
}
impl TableContextTrait for TableContext {
type AuthorityId = ValidatorIndex;
type Digest = CandidateHash;
type GroupId = ParaId;
type Signature = ValidatorSignature;
type Candidate = CommittedCandidateReceipt;
fn candidate_digest(candidate: &CommittedCandidateReceipt) -> CandidateHash {
candidate.hash()
}
fn candidate_group(candidate: &CommittedCandidateReceipt) -> ParaId {
candidate.descriptor().para_id
}
fn is_member_of(&self, authority: &ValidatorIndex, group: &ParaId) -> bool {
self.groups.get(group).map_or(false, |g| g.iter().position(|a| a == authority).is_some())
}
fn requisite_votes(&self, group: &ParaId) -> usize {
self.groups.get(group).map_or(usize::max_value(), |g| group_quorum(g.len()))
}
}
struct InvalidErasureRoot;
// It looks like it's not possible to do an `impl From` given the current state of
// the code. So this does the necessary conversion.
fn primitive_statement_to_table(s: &SignedFullStatement) -> TableSignedStatement {
let statement = match s.payload() {
Statement::Seconded(c) => TableStatement::Seconded(c.clone()),
Statement::Valid(h) => TableStatement::Valid(h.clone()),
};
TableSignedStatement {
statement,
signature: s.signature().clone(),
sender: s.validator_index(),
}
}
#[tracing::instrument(level = "trace", skip(attested, table_context), fields(subsystem = LOG_TARGET))]
fn table_attested_to_backed(
attested: TableAttestedCandidate<
ParaId,
CommittedCandidateReceipt,
ValidatorIndex,
ValidatorSignature,
>,
table_context: &TableContext,
) -> Option {
let TableAttestedCandidate { candidate, validity_votes, group_id: para_id } = attested;
let (ids, validity_votes): (Vec<_>, Vec) = validity_votes
.into_iter()
.map(|(id, vote)| (id, vote.into()))
.unzip();
let group = table_context.groups.get(¶_id)?;
let mut validator_indices = BitVec::with_capacity(group.len());
validator_indices.resize(group.len(), false);
// The order of the validity votes in the backed candidate must match
// the order of bits set in the bitfield, which is not necessarily
// the order of the `validity_votes` we got from the table.
let mut vote_positions = Vec::with_capacity(validity_votes.len());
for (orig_idx, id) in ids.iter().enumerate() {
if let Some(position) = group.iter().position(|x| x == id) {
validator_indices.set(position, true);
vote_positions.push((orig_idx, position));
} else {
tracing::warn!(
target: LOG_TARGET,
"Logic error: Validity vote from table does not correspond to group",
);
return None;
}
}
vote_positions.sort_by_key(|(_orig, pos_in_group)| *pos_in_group);
Some(BackedCandidate {
candidate,
validity_votes: vote_positions.into_iter()
.map(|(pos_in_votes, _pos_in_group)| validity_votes[pos_in_votes].clone())
.collect(),
validator_indices,
})
}
async fn store_available_data(
sender: &mut JobSender,
id: Option,
n_validators: u32,
candidate_hash: CandidateHash,
available_data: AvailableData,
) -> Result<(), Error> {
let (tx, rx) = oneshot::channel();
sender.send_message(AvailabilityStoreMessage::StoreAvailableData(
candidate_hash,
id,
n_validators,
available_data,
tx,
).into()).await;
let _ = rx.await.map_err(Error::StoreAvailableData)?;
Ok(())
}
// Make a `PoV` available.
//
// This will compute the erasure root internally and compare it to the expected erasure root.
// This returns `Err()` iff there is an internal error. Otherwise, it returns either `Ok(Ok(()))` or `Ok(Err(_))`.
#[tracing::instrument(level = "trace", skip(sender, pov, span), fields(subsystem = LOG_TARGET))]
async fn make_pov_available(
sender: &mut JobSender,
validator_index: Option,
n_validators: usize,
pov: Arc,
candidate_hash: CandidateHash,
validation_data: polkadot_primitives::v1::PersistedValidationData,
expected_erasure_root: Hash,
span: Option<&jaeger::Span>,
) -> Result, Error> {
let available_data = AvailableData {
pov,
validation_data,
};
{
let _span = span.as_ref().map(|s| {
s.child("erasure-coding").with_candidate(candidate_hash)
});
let chunks = erasure_coding::obtain_chunks_v1(
n_validators,
&available_data,
)?;
let branches = erasure_coding::branches(chunks.as_ref());
let erasure_root = branches.root();
if erasure_root != expected_erasure_root {
return Ok(Err(InvalidErasureRoot));
}
}
{
let _span = span.as_ref().map(|s|
s.child("store-data").with_candidate(candidate_hash)
);
store_available_data(
sender,
validator_index,
n_validators as u32,
candidate_hash,
available_data,
).await?;
}
Ok(Ok(()))
}
async fn request_pov(
sender: &mut JobSender,
relay_parent: Hash,
from_validator: ValidatorIndex,
candidate_hash: CandidateHash,
pov_hash: Hash,
) -> Result, Error> {
let (tx, rx) = oneshot::channel();
sender.send_message(AvailabilityDistributionMessage::FetchPoV {
relay_parent,
from_validator,
candidate_hash,
pov_hash,
tx,
}.into()).await;
let pov = rx.await.map_err(|_| Error::FetchPoV)?;
Ok(Arc::new(pov))
}
async fn request_candidate_validation(
sender: &mut JobSender,
candidate: CandidateDescriptor,
pov: Arc,
) -> Result {
let (tx, rx) = oneshot::channel();
sender.send_message(AllMessages::CandidateValidation(
CandidateValidationMessage::ValidateFromChainState(
candidate,
pov,
tx,
)
).into()
).await;
match rx.await {
Ok(Ok(validation_result)) => Ok(validation_result),
Ok(Err(err)) => Err(Error::ValidationFailed(err)),
Err(err) => Err(Error::ValidateFromChainState(err)),
}
}
type BackgroundValidationResult = Result<(CandidateReceipt, CandidateCommitments, Arc), CandidateReceipt>;
struct BackgroundValidationParams {
sender: JobSender,
tx_command: mpsc::Sender,
candidate: CandidateReceipt,
relay_parent: Hash,
pov: PoVData,
validator_index: Option,
n_validators: usize,
span: Option,
make_command: F,
}
async fn validate_and_make_available(
params: BackgroundValidationParams<
impl SubsystemSender,
impl Fn(BackgroundValidationResult) -> ValidatedCandidateCommand + Sync,
>
) -> Result<(), Error> {
let BackgroundValidationParams {
mut sender,
mut tx_command,
candidate,
relay_parent,
pov,
validator_index,
n_validators,
span,
make_command,
} = params;
let pov = match pov {
PoVData::Ready(pov) => pov,
PoVData::FetchFromValidator {
from_validator,
candidate_hash,
pov_hash,
} => {
let _span = span.as_ref().map(|s| s.child("request-pov"));
match request_pov(
&mut sender,
relay_parent,
from_validator,
candidate_hash,
pov_hash,
).await {
Err(Error::FetchPoV) => {
tx_command.send(ValidatedCandidateCommand::AttestNoPoV(candidate.hash())).await.map_err(Error::Mpsc)?;
return Ok(())
}
Err(err) => return Err(err),
Ok(pov) => pov,
}
}
};
let v = {
let _span = span.as_ref().map(|s| {
s.child("request-validation")
.with_pov(&pov)
.with_para_id(candidate.descriptor().para_id)
});
request_candidate_validation(&mut sender, candidate.descriptor.clone(), pov.clone()).await?
};
let expected_commitments_hash = candidate.commitments_hash;
let res = match v {
ValidationResult::Valid(commitments, validation_data) => {
tracing::debug!(
target: LOG_TARGET,
candidate_hash = ?candidate.hash(),
"Validation successful",
);
// If validation produces a new set of commitments, we vote the candidate as invalid.
if commitments.hash() != expected_commitments_hash {
tracing::debug!(
target: LOG_TARGET,
candidate_hash = ?candidate.hash(),
actual_commitments = ?commitments,
"Commitments obtained with validation don't match the announced by the candidate receipt",
);
Err(candidate)
} else {
let erasure_valid = make_pov_available(
&mut sender,
validator_index,
n_validators,
pov.clone(),
candidate.hash(),
validation_data,
candidate.descriptor.erasure_root,
span.as_ref(),
).await?;
match erasure_valid {
Ok(()) => Ok((candidate, commitments, pov.clone())),
Err(InvalidErasureRoot) => {
tracing::debug!(
target: LOG_TARGET,
candidate_hash = ?candidate.hash(),
actual_commitments = ?commitments,
"Erasure root doesn't match the announced by the candidate receipt",
);
Err(candidate)
},
}
}
}
ValidationResult::Invalid(reason) => {
tracing::debug!(
target: LOG_TARGET,
candidate_hash = ?candidate.hash(),
reason = ?reason,
"Validation yielded an invalid candidate",
);
Err(candidate)
}
};
tx_command.send(make_command(res)).await.map_err(Into::into)
}
impl CandidateBackingJob {
/// Run asynchronously.
async fn run_loop(
mut self,
mut sender: JobSender,
mut rx_to: mpsc::Receiver,
span: PerLeafSpan,
) -> Result<(), Error> {
loop {
futures::select! {
validated_command = self.background_validation.next() => {
let _span = span.child("process-validation-result");
if let Some(c) = validated_command {
self.handle_validated_candidate_command(&span, &mut sender, c).await?;
} else {
panic!("`self` hasn't dropped and `self` holds a reference to this sender; qed");
}
}
to_job = rx_to.next() => match to_job {
None => break,
Some(msg) => {
// we intentionally want spans created in `process_msg` to descend from the
// `span ` which is longer-lived than this ephemeral timing span.
let _timing_span = span.child("process-message");
self.process_msg(&span, &mut sender, msg).await?;
}
}
}
}
Ok(())
}
#[tracing::instrument(level = "trace", skip(self, root_span, sender), fields(subsystem = LOG_TARGET))]
async fn handle_validated_candidate_command(
&mut self,
root_span: &jaeger::Span,
sender: &mut JobSender,
command: ValidatedCandidateCommand,
) -> Result<(), Error> {
let candidate_hash = command.candidate_hash();
self.awaiting_validation.remove(&candidate_hash);
match command {
ValidatedCandidateCommand::Second(res) => {
match res {
Ok((candidate, commitments, _)) => {
// sanity check.
if self.seconded.is_none() && !self.issued_statements.contains(&candidate_hash) {
self.seconded = Some(candidate_hash);
self.issued_statements.insert(candidate_hash);
self.metrics.on_candidate_seconded();
let statement = Statement::Seconded(CommittedCandidateReceipt {
descriptor: candidate.descriptor.clone(),
commitments,
});
if let Some(stmt) = self.sign_import_and_distribute_statement(
sender,
statement,
root_span,
).await? {
sender.send_message(
CandidateSelectionMessage::Seconded(self.parent, stmt).into()
).await;
}
}
}
Err(candidate) => {
sender.send_message(
CandidateSelectionMessage::Invalid(self.parent, candidate).into()
).await;
}
}
}
ValidatedCandidateCommand::Attest(res) => {
// We are done - avoid new validation spawns:
self.fallbacks.remove(&candidate_hash);
// sanity check.
if !self.issued_statements.contains(&candidate_hash) {
if res.is_ok() {
let statement = Statement::Valid(candidate_hash);
self.sign_import_and_distribute_statement(sender, statement, &root_span).await?;
}
self.issued_statements.insert(candidate_hash);
}
}
ValidatedCandidateCommand::AttestNoPoV(candidate_hash) => {
if let Some((attesting, span)) = self.fallbacks.get_mut(&candidate_hash) {
if let Some(index) = attesting.backing.pop() {
attesting.from_validator = index;
// Ok, another try:
let c_span = span.as_ref().map(|s| s.child("try"));
let attesting = attesting.clone();
self.kick_off_validation_work(sender, attesting, c_span).await?
}
} else {
tracing::warn!(
target: LOG_TARGET,
"AttestNoPoV was triggered without fallback being available."
);
debug_assert!(false);
}
}
}
Ok(())
}
#[tracing::instrument(level = "trace", skip(self, sender, params), fields(subsystem = LOG_TARGET))]
async fn background_validate_and_make_available(
&mut self,
sender: &mut JobSender,
params: BackgroundValidationParams<
impl SubsystemSender,
impl Fn(BackgroundValidationResult) -> ValidatedCandidateCommand + Send + 'static + Sync
>,
) -> Result<(), Error> {
let candidate_hash = params.candidate.hash();
if self.awaiting_validation.insert(candidate_hash) {
// spawn background task.
let bg = async move {
if let Err(e) = validate_and_make_available(params).await {
tracing::error!(target: LOG_TARGET, "Failed to validate and make available: {:?}", e);
}
};
sender.send_command(FromJobCommand::Spawn("Backing Validation", bg.boxed())).await?;
}
Ok(())
}
/// Kick off background validation with intent to second.
#[tracing::instrument(level = "trace", skip(self, parent_span, sender, pov), fields(subsystem = LOG_TARGET))]
async fn validate_and_second(
&mut self,
parent_span: &jaeger::Span,
root_span: &jaeger::Span,
sender: &mut JobSender,
candidate: &CandidateReceipt,
pov: Arc,
) -> Result<(), Error> {
// Check that candidate is collated by the right collator.
if self.required_collator.as_ref()
.map_or(false, |c| c != &candidate.descriptor().collator)
{
sender.send_message(
CandidateSelectionMessage::Invalid(self.parent, candidate.clone()).into()
).await;
return Ok(());
}
let candidate_hash = candidate.hash();
let mut span = self.get_unbacked_validation_child(
root_span,
candidate_hash,
candidate.descriptor().para_id,
);
span.as_mut().map(|span| span.add_follows_from(parent_span));
tracing::debug!(
target: LOG_TARGET,
candidate_hash = ?candidate_hash,
candidate_receipt = ?candidate,
"Validate and second candidate",
);
let bg_sender = sender.clone();
self.background_validate_and_make_available(
sender,
BackgroundValidationParams {
sender: bg_sender,
tx_command: self.background_validation_tx.clone(),
candidate: candidate.clone(),
relay_parent: self.parent,
pov: PoVData::Ready(pov),
validator_index: self.table_context.validator.as_ref().map(|v| v.index()),
n_validators: self.table_context.validators.len(),
span,
make_command: ValidatedCandidateCommand::Second,
}
).await?;
Ok(())
}
async fn sign_import_and_distribute_statement(
&mut self,
sender: &mut JobSender,
statement: Statement,
root_span: &jaeger::Span,
) -> Result