mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-09 20:11:09 +00:00
Move candidate validation to the background (#2028)
* refactor some functions to not rely on `self` * factor out common elements of seconding and attesting * Add Spawn to backing FromJob * do candidate validation in background * tests * address grumbles
This commit is contained in:
committed by
GitHub
parent
536dceb4f6
commit
1008465413
@@ -31,7 +31,7 @@ use polkadot_primitives::v1::{
|
||||
CommittedCandidateReceipt, BackedCandidate, Id as ParaId, ValidatorId,
|
||||
ValidatorIndex, SigningContext, PoV, CandidateHash,
|
||||
CandidateDescriptor, AvailableData, ValidatorSignature, Hash, CandidateReceipt,
|
||||
CoreState, CoreIndex, CollatorId, ValidityAttestation,
|
||||
CoreState, CoreIndex, CollatorId, ValidityAttestation, CandidateCommitments,
|
||||
};
|
||||
use polkadot_node_primitives::{
|
||||
FromTableMisbehavior, Statement, SignedFullStatement, MisbehaviorReport, ValidationResult,
|
||||
@@ -87,6 +87,36 @@ enum Error {
|
||||
UtilError(#[from] util::Error),
|
||||
}
|
||||
|
||||
enum ValidatedCandidateCommand {
|
||||
// We were instructed to second the candidate.
|
||||
Second(BackgroundValidationResult),
|
||||
// We were instructed to validate the candidate.
|
||||
Attest(BackgroundValidationResult),
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for ValidatedCandidateCommand {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
let candidate_hash = self.candidate_hash();
|
||||
match *self {
|
||||
ValidatedCandidateCommand::Second(_) =>
|
||||
write!(f, "Second({})", candidate_hash),
|
||||
ValidatedCandidateCommand::Attest(_) =>
|
||||
write!(f, "Attest({})", candidate_hash),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ValidatedCandidateCommand {
|
||||
fn candidate_hash(&self) -> CandidateHash {
|
||||
match *self {
|
||||
ValidatedCandidateCommand::Second(Ok((ref candidate, _, _))) => candidate.hash(),
|
||||
ValidatedCandidateCommand::Second(Err(ref candidate)) => candidate.hash(),
|
||||
ValidatedCandidateCommand::Attest(Ok((ref candidate, _, _))) => candidate.hash(),
|
||||
ValidatedCandidateCommand::Attest(Err(ref candidate)) => candidate.hash(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Holds all data needed for candidate backing job operation.
|
||||
struct CandidateBackingJob {
|
||||
/// The hash of the relay parent on top of which this job is doing it's work.
|
||||
@@ -99,8 +129,10 @@ struct CandidateBackingJob {
|
||||
assignment: ParaId,
|
||||
/// The collator required to author the candidate, if any.
|
||||
required_collator: Option<CollatorId>,
|
||||
/// We issued `Valid` or `Invalid` statements on about these candidates.
|
||||
/// We issued `Seconded`, `Valid` or `Invalid` statements on about these candidates.
|
||||
issued_statements: HashSet<CandidateHash>,
|
||||
/// These candidates are undergoing validation in the background.
|
||||
awaiting_validation: HashSet<CandidateHash>,
|
||||
/// `Some(h)` if this job has already issues `Seconded` statemt for some candidate with `h` hash.
|
||||
seconded: Option<CandidateHash>,
|
||||
/// The candidates that are includable, by hash. Each entry here indicates
|
||||
@@ -111,6 +143,8 @@ struct CandidateBackingJob {
|
||||
keystore: SyncCryptoStorePtr,
|
||||
table: Table<TableContext>,
|
||||
table_context: TableContext,
|
||||
background_validation: mpsc::Receiver<ValidatedCandidateCommand>,
|
||||
background_validation_tx: mpsc::Sender<ValidatedCandidateCommand>,
|
||||
metrics: Metrics,
|
||||
}
|
||||
|
||||
@@ -219,19 +253,272 @@ fn table_attested_to_backed(
|
||||
})
|
||||
}
|
||||
|
||||
async fn store_available_data(
|
||||
tx_from: &mut mpsc::Sender<FromJobCommand>,
|
||||
id: Option<ValidatorIndex>,
|
||||
n_validators: u32,
|
||||
candidate_hash: CandidateHash,
|
||||
available_data: AvailableData,
|
||||
) -> Result<(), Error> {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
tx_from.send(AllMessages::AvailabilityStore(
|
||||
AvailabilityStoreMessage::StoreAvailableData(
|
||||
candidate_hash,
|
||||
id,
|
||||
n_validators,
|
||||
available_data,
|
||||
tx,
|
||||
)
|
||||
).into()
|
||||
).await?;
|
||||
|
||||
let _ = rx.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Make a `PoV` available.
|
||||
//
|
||||
// This will compute the erasure root internally and compare it to the expected erasure root.
|
||||
// This returns `Err()` iff there is an internal error. Otherwise, it returns either `Ok(Ok(()))` or `Ok(Err(_))`.
|
||||
#[tracing::instrument(level = "trace", skip(tx_from, pov), fields(subsystem = LOG_TARGET))]
|
||||
async fn make_pov_available(
|
||||
tx_from: &mut mpsc::Sender<FromJobCommand>,
|
||||
validator_index: Option<ValidatorIndex>,
|
||||
n_validators: usize,
|
||||
pov: Arc<PoV>,
|
||||
candidate_hash: CandidateHash,
|
||||
validation_data: polkadot_primitives::v1::PersistedValidationData,
|
||||
expected_erasure_root: Hash,
|
||||
) -> Result<Result<(), InvalidErasureRoot>, Error> {
|
||||
let available_data = AvailableData {
|
||||
pov,
|
||||
validation_data,
|
||||
};
|
||||
|
||||
let chunks = erasure_coding::obtain_chunks_v1(
|
||||
n_validators,
|
||||
&available_data,
|
||||
)?;
|
||||
|
||||
let branches = erasure_coding::branches(chunks.as_ref());
|
||||
let erasure_root = branches.root();
|
||||
|
||||
if erasure_root != expected_erasure_root {
|
||||
return Ok(Err(InvalidErasureRoot));
|
||||
}
|
||||
|
||||
store_available_data(
|
||||
tx_from,
|
||||
validator_index,
|
||||
n_validators as u32,
|
||||
candidate_hash,
|
||||
available_data,
|
||||
).await?;
|
||||
|
||||
Ok(Ok(()))
|
||||
}
|
||||
|
||||
async fn request_pov_from_distribution(
|
||||
tx_from: &mut mpsc::Sender<FromJobCommand>,
|
||||
parent: Hash,
|
||||
descriptor: CandidateDescriptor,
|
||||
) -> Result<Arc<PoV>, Error> {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
|
||||
tx_from.send(AllMessages::PoVDistribution(
|
||||
PoVDistributionMessage::FetchPoV(parent, descriptor, tx)
|
||||
).into()).await?;
|
||||
|
||||
Ok(rx.await?)
|
||||
}
|
||||
|
||||
async fn request_candidate_validation(
|
||||
tx_from: &mut mpsc::Sender<FromJobCommand>,
|
||||
candidate: CandidateDescriptor,
|
||||
pov: Arc<PoV>,
|
||||
) -> Result<ValidationResult, Error> {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
|
||||
tx_from.send(AllMessages::CandidateValidation(
|
||||
CandidateValidationMessage::ValidateFromChainState(
|
||||
candidate,
|
||||
pov,
|
||||
tx,
|
||||
)
|
||||
).into()
|
||||
).await?;
|
||||
|
||||
Ok(rx.await??)
|
||||
}
|
||||
|
||||
type BackgroundValidationResult = Result<(CandidateReceipt, CandidateCommitments, Arc<PoV>), CandidateReceipt>;
|
||||
|
||||
struct BackgroundValidationParams<F> {
|
||||
tx_from: mpsc::Sender<FromJobCommand>,
|
||||
tx_command: mpsc::Sender<ValidatedCandidateCommand>,
|
||||
candidate: CandidateReceipt,
|
||||
relay_parent: Hash,
|
||||
pov: Option<Arc<PoV>>,
|
||||
validator_index: Option<ValidatorIndex>,
|
||||
n_validators: usize,
|
||||
make_command: F,
|
||||
}
|
||||
|
||||
async fn validate_and_make_available(
|
||||
params: BackgroundValidationParams<impl Fn(BackgroundValidationResult) -> ValidatedCandidateCommand>,
|
||||
) -> Result<(), Error> {
|
||||
let BackgroundValidationParams {
|
||||
mut tx_from,
|
||||
mut tx_command,
|
||||
candidate,
|
||||
relay_parent,
|
||||
pov,
|
||||
validator_index,
|
||||
n_validators,
|
||||
make_command,
|
||||
} = params;
|
||||
|
||||
let pov = match pov {
|
||||
Some(pov) => pov,
|
||||
None => request_pov_from_distribution(
|
||||
&mut tx_from,
|
||||
relay_parent,
|
||||
candidate.descriptor.clone(),
|
||||
).await?,
|
||||
};
|
||||
|
||||
let v = request_candidate_validation(&mut tx_from, candidate.descriptor.clone(), pov.clone()).await?;
|
||||
|
||||
let expected_commitments_hash = candidate.commitments_hash;
|
||||
|
||||
let res = match v {
|
||||
ValidationResult::Valid(commitments, validation_data) => {
|
||||
// If validation produces a new set of commitments, we vote the candidate as invalid.
|
||||
if commitments.hash() != expected_commitments_hash {
|
||||
Err(candidate)
|
||||
} else {
|
||||
let erasure_valid = make_pov_available(
|
||||
&mut tx_from,
|
||||
validator_index,
|
||||
n_validators,
|
||||
pov.clone(),
|
||||
candidate.hash(),
|
||||
validation_data,
|
||||
candidate.descriptor.erasure_root,
|
||||
).await?;
|
||||
|
||||
match erasure_valid {
|
||||
Ok(()) => Ok((candidate, commitments, pov.clone())),
|
||||
Err(InvalidErasureRoot) => Err(candidate),
|
||||
}
|
||||
}
|
||||
}
|
||||
ValidationResult::Invalid(_reason) => {
|
||||
Err(candidate)
|
||||
}
|
||||
};
|
||||
|
||||
let command = make_command(res);
|
||||
tx_command.send(command).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
impl CandidateBackingJob {
|
||||
/// Run asynchronously.
|
||||
async fn run_loop(mut self) -> Result<(), Error> {
|
||||
loop {
|
||||
match self.rx_to.next().await {
|
||||
Some(msg) => self.process_msg(msg).await?,
|
||||
None => break,
|
||||
futures::select! {
|
||||
validated_command = self.background_validation.next() => {
|
||||
if let Some(c) = validated_command {
|
||||
self.handle_validated_candidate_command(c).await?;
|
||||
} else {
|
||||
panic!("`self` hasn't dropped and `self` holds a reference to this sender; qed");
|
||||
}
|
||||
}
|
||||
to_job = self.rx_to.next() => match to_job {
|
||||
None => break,
|
||||
Some(msg) => {
|
||||
self.process_msg(msg).await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
|
||||
async fn handle_validated_candidate_command(
|
||||
&mut self,
|
||||
command: ValidatedCandidateCommand,
|
||||
) -> Result<(), Error> {
|
||||
let candidate_hash = command.candidate_hash();
|
||||
self.awaiting_validation.remove(&candidate_hash);
|
||||
|
||||
match command {
|
||||
ValidatedCandidateCommand::Second(res) => {
|
||||
match res {
|
||||
Ok((candidate, commitments, pov)) => {
|
||||
// sanity check.
|
||||
if self.seconded.is_none() && !self.issued_statements.contains(&candidate_hash) {
|
||||
self.seconded = Some(candidate_hash);
|
||||
self.issued_statements.insert(candidate_hash);
|
||||
self.metrics.on_candidate_seconded();
|
||||
|
||||
let statement = Statement::Seconded(CommittedCandidateReceipt {
|
||||
descriptor: candidate.descriptor.clone(),
|
||||
commitments,
|
||||
});
|
||||
self.sign_import_and_distribute_statement(statement).await?;
|
||||
self.distribute_pov(candidate.descriptor, pov).await?;
|
||||
}
|
||||
}
|
||||
Err(candidate) => {
|
||||
self.issue_candidate_invalid_message(candidate).await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
ValidatedCandidateCommand::Attest(res) => {
|
||||
// sanity check.
|
||||
if !self.issued_statements.contains(&candidate_hash) {
|
||||
let statement = if res.is_ok() {
|
||||
Statement::Valid(candidate_hash)
|
||||
} else {
|
||||
Statement::Invalid(candidate_hash)
|
||||
};
|
||||
|
||||
self.issued_statements.insert(candidate_hash);
|
||||
self.sign_import_and_distribute_statement(statement).await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "trace", skip(self, params), fields(subsystem = LOG_TARGET))]
|
||||
async fn background_validate_and_make_available(
|
||||
&mut self,
|
||||
params: BackgroundValidationParams<
|
||||
impl Fn(BackgroundValidationResult) -> ValidatedCandidateCommand + Send + 'static
|
||||
>,
|
||||
) -> Result<(), Error> {
|
||||
let candidate_hash = params.candidate.hash();
|
||||
|
||||
if self.awaiting_validation.insert(candidate_hash) {
|
||||
// spawn background task.
|
||||
let bg = async move {
|
||||
if let Err(e) = validate_and_make_available(params).await {
|
||||
tracing::error!("Failed to validate and make available: {:?}", e);
|
||||
}
|
||||
};
|
||||
self.tx_from.send(FromJobCommand::Spawn("Backing Validation", bg.boxed())).await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn issue_candidate_invalid_message(
|
||||
&mut self,
|
||||
candidate: CandidateReceipt,
|
||||
@@ -241,83 +528,33 @@ impl CandidateBackingJob {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Validate the candidate that is requested to be `Second`ed and distribute validation result.
|
||||
///
|
||||
/// Returns `Ok(true)` if we issued a `Seconded` statement about this candidate.
|
||||
/// Kick off background validation with intent to second.
|
||||
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
|
||||
async fn validate_and_second(
|
||||
&mut self,
|
||||
candidate: &CandidateReceipt,
|
||||
pov: Arc<PoV>,
|
||||
) -> Result<bool, Error> {
|
||||
) -> Result<(), Error> {
|
||||
// Check that candidate is collated by the right collator.
|
||||
if self.required_collator.as_ref()
|
||||
.map_or(false, |c| c != &candidate.descriptor().collator)
|
||||
{
|
||||
self.issue_candidate_invalid_message(candidate.clone()).await?;
|
||||
return Ok(false);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let valid = self.request_candidate_validation(
|
||||
candidate.descriptor().clone(),
|
||||
pov.clone(),
|
||||
).await?;
|
||||
self.background_validate_and_make_available(BackgroundValidationParams {
|
||||
tx_from: self.tx_from.clone(),
|
||||
tx_command: self.background_validation_tx.clone(),
|
||||
candidate: candidate.clone(),
|
||||
relay_parent: self.parent,
|
||||
pov: Some(pov),
|
||||
validator_index: self.table_context.validator.as_ref().map(|v| v.index()),
|
||||
n_validators: self.table_context.validators.len(),
|
||||
make_command: ValidatedCandidateCommand::Second,
|
||||
}).await?;
|
||||
|
||||
let candidate_hash = candidate.hash();
|
||||
|
||||
let statement = match valid {
|
||||
ValidationResult::Valid(commitments, validation_data) => {
|
||||
// make PoV available for later distribution. Send data to the availability
|
||||
// store to keep. Sign and dispatch `valid` statement to network if we
|
||||
// have not seconded the given candidate.
|
||||
//
|
||||
// If the commitments hash produced by validation is not the same as given by
|
||||
// the collator, do not make available and report the collator.
|
||||
if candidate.commitments_hash != commitments.hash() {
|
||||
self.issue_candidate_invalid_message(candidate.clone()).await?;
|
||||
None
|
||||
} else {
|
||||
let erasure_valid = self.make_pov_available(
|
||||
pov,
|
||||
candidate_hash,
|
||||
validation_data,
|
||||
candidate.descriptor.erasure_root,
|
||||
).await?;
|
||||
|
||||
match erasure_valid {
|
||||
Ok(()) => {
|
||||
let candidate = CommittedCandidateReceipt {
|
||||
descriptor: candidate.descriptor().clone(),
|
||||
commitments,
|
||||
};
|
||||
|
||||
self.issued_statements.insert(candidate_hash);
|
||||
Some(Statement::Seconded(candidate))
|
||||
}
|
||||
Err(InvalidErasureRoot) => {
|
||||
self.issue_candidate_invalid_message(candidate.clone()).await?;
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
ValidationResult::Invalid(_reason) => {
|
||||
// no need to issue a statement about this if we aren't seconding it.
|
||||
//
|
||||
// there's an infinite amount of garbage out there. no need to acknowledge
|
||||
// all of it.
|
||||
self.issue_candidate_invalid_message(candidate.clone()).await?;
|
||||
None
|
||||
}
|
||||
};
|
||||
|
||||
let issued_statement = statement.is_some();
|
||||
|
||||
if let Some(statement) = statement {
|
||||
self.sign_import_and_distribute_statement(statement).await?
|
||||
}
|
||||
|
||||
Ok(issued_statement)
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn sign_import_and_distribute_statement(&mut self, statement: Statement) -> Result<(), Error> {
|
||||
@@ -418,7 +655,6 @@ impl CandidateBackingJob {
|
||||
|
||||
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
|
||||
async fn process_msg(&mut self, msg: CandidateBackingMessage) -> Result<(), Error> {
|
||||
|
||||
match msg {
|
||||
CandidateBackingMessage::Second(_, candidate, pov) => {
|
||||
let _timer = self.metrics.time_process_second();
|
||||
@@ -437,14 +673,7 @@ impl CandidateBackingJob {
|
||||
let pov = Arc::new(pov);
|
||||
|
||||
if !self.issued_statements.contains(&candidate_hash) {
|
||||
if let Ok(true) = self.validate_and_second(
|
||||
&candidate,
|
||||
pov.clone(),
|
||||
).await {
|
||||
self.metrics.on_candidate_seconded();
|
||||
self.seconded = Some(candidate_hash);
|
||||
self.distribute_pov(candidate.descriptor, pov).await?;
|
||||
}
|
||||
self.validate_and_second(&candidate, pov.clone()).await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -485,10 +714,7 @@ impl CandidateBackingJob {
|
||||
// We clone the commitments here because there are borrowck
|
||||
// errors relating to this being a struct and methods borrowing the entirety of self
|
||||
// and not just those things that the function uses.
|
||||
let candidate = self.table.get_candidate(&candidate_hash).ok_or(Error::CandidateNotFound)?;
|
||||
let expected_commitments = candidate.commitments.clone();
|
||||
let expected_erasure_root = candidate.descriptor.erasure_root;
|
||||
|
||||
let candidate = self.table.get_candidate(&candidate_hash).ok_or(Error::CandidateNotFound)?.to_plain();
|
||||
let descriptor = candidate.descriptor().clone();
|
||||
|
||||
// Check that candidate is collated by the right collator.
|
||||
@@ -503,36 +729,16 @@ impl CandidateBackingJob {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let pov = self.request_pov_from_distribution(descriptor.clone()).await?;
|
||||
let v = self.request_candidate_validation(descriptor, pov.clone()).await?;
|
||||
|
||||
let statement = match v {
|
||||
ValidationResult::Valid(commitments, validation_data) => {
|
||||
// If validation produces a new set of commitments, we vote the candidate as invalid.
|
||||
if commitments != expected_commitments {
|
||||
Statement::Invalid(candidate_hash)
|
||||
} else {
|
||||
let erasure_valid = self.make_pov_available(
|
||||
pov,
|
||||
candidate_hash,
|
||||
validation_data,
|
||||
expected_erasure_root,
|
||||
).await?;
|
||||
|
||||
match erasure_valid {
|
||||
Ok(()) => Statement::Valid(candidate_hash),
|
||||
Err(InvalidErasureRoot) => Statement::Invalid(candidate_hash),
|
||||
}
|
||||
}
|
||||
}
|
||||
ValidationResult::Invalid(_reason) => {
|
||||
Statement::Invalid(candidate_hash)
|
||||
}
|
||||
};
|
||||
|
||||
self.issued_statements.insert(candidate_hash);
|
||||
|
||||
self.sign_import_and_distribute_statement(statement).await
|
||||
self.background_validate_and_make_available(BackgroundValidationParams {
|
||||
tx_from: self.tx_from.clone(),
|
||||
tx_command: self.background_validation_tx.clone(),
|
||||
candidate: candidate,
|
||||
relay_parent: self.parent,
|
||||
pov: None,
|
||||
validator_index: self.table_context.validator.as_ref().map(|v| v.index()),
|
||||
n_validators: self.table_context.validators.len(),
|
||||
make_command: ValidatedCandidateCommand::Attest,
|
||||
}).await
|
||||
}
|
||||
|
||||
/// Import the statement and kick off validation work if it is a part of our assignment.
|
||||
@@ -596,102 +802,6 @@ impl CandidateBackingJob {
|
||||
).into()).await.map_err(Into::into)
|
||||
}
|
||||
|
||||
async fn request_pov_from_distribution(
|
||||
&mut self,
|
||||
descriptor: CandidateDescriptor,
|
||||
) -> Result<Arc<PoV>, Error> {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
|
||||
self.tx_from.send(AllMessages::from(
|
||||
PoVDistributionMessage::FetchPoV(self.parent, descriptor, tx)
|
||||
).into()).await?;
|
||||
|
||||
Ok(rx.await?)
|
||||
}
|
||||
|
||||
async fn request_candidate_validation(
|
||||
&mut self,
|
||||
candidate: CandidateDescriptor,
|
||||
pov: Arc<PoV>,
|
||||
) -> Result<ValidationResult, Error> {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
|
||||
self.tx_from.send(
|
||||
AllMessages::from(
|
||||
CandidateValidationMessage::ValidateFromChainState(
|
||||
candidate,
|
||||
pov,
|
||||
tx,
|
||||
)
|
||||
).into(),
|
||||
).await?;
|
||||
|
||||
Ok(rx.await??)
|
||||
}
|
||||
|
||||
async fn store_available_data(
|
||||
&mut self,
|
||||
id: Option<ValidatorIndex>,
|
||||
n_validators: u32,
|
||||
candidate_hash: CandidateHash,
|
||||
available_data: AvailableData,
|
||||
) -> Result<(), Error> {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
self.tx_from.send(AllMessages::from(
|
||||
AvailabilityStoreMessage::StoreAvailableData(
|
||||
candidate_hash,
|
||||
id,
|
||||
n_validators,
|
||||
available_data,
|
||||
tx,
|
||||
)
|
||||
).into(),
|
||||
).await?;
|
||||
|
||||
let _ = rx.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Make a `PoV` available.
|
||||
//
|
||||
// This will compute the erasure root internally and compare it to the expected erasure root.
|
||||
// This returns `Err()` iff there is an internal error. Otherwise, it returns either `Ok(Ok(()))` or `Ok(Err(_))`.
|
||||
#[tracing::instrument(level = "trace", skip(self, pov), fields(subsystem = LOG_TARGET))]
|
||||
async fn make_pov_available(
|
||||
&mut self,
|
||||
pov: Arc<PoV>,
|
||||
candidate_hash: CandidateHash,
|
||||
validation_data: polkadot_primitives::v1::PersistedValidationData,
|
||||
expected_erasure_root: Hash,
|
||||
) -> Result<Result<(), InvalidErasureRoot>, Error> {
|
||||
let available_data = AvailableData {
|
||||
pov,
|
||||
validation_data,
|
||||
};
|
||||
|
||||
let chunks = erasure_coding::obtain_chunks_v1(
|
||||
self.table_context.validators.len(),
|
||||
&available_data,
|
||||
)?;
|
||||
|
||||
let branches = erasure_coding::branches(chunks.as_ref());
|
||||
let erasure_root = branches.root();
|
||||
|
||||
if erasure_root != expected_erasure_root {
|
||||
return Ok(Err(InvalidErasureRoot));
|
||||
}
|
||||
|
||||
self.store_available_data(
|
||||
self.table_context.validator.as_ref().map(|v| v.index()),
|
||||
self.table_context.validators.len() as u32,
|
||||
candidate_hash,
|
||||
available_data,
|
||||
).await?;
|
||||
|
||||
Ok(Ok(()))
|
||||
}
|
||||
|
||||
async fn distribute_signed_statement(&mut self, s: SignedFullStatement) -> Result<(), Error> {
|
||||
let smsg = StatementDistributionMessage::Share(self.parent, s);
|
||||
|
||||
@@ -804,6 +914,7 @@ impl util::JobTrait for CandidateBackingJob {
|
||||
Some(r) => r,
|
||||
};
|
||||
|
||||
let (background_tx, background_rx) = mpsc::channel(16);
|
||||
let job = CandidateBackingJob {
|
||||
parent,
|
||||
rx_to,
|
||||
@@ -811,12 +922,15 @@ impl util::JobTrait for CandidateBackingJob {
|
||||
assignment,
|
||||
required_collator,
|
||||
issued_statements: HashSet::new(),
|
||||
awaiting_validation: HashSet::new(),
|
||||
seconded: None,
|
||||
backed: HashSet::new(),
|
||||
reported_misbehavior_for: HashSet::new(),
|
||||
keystore,
|
||||
table: Table::default(),
|
||||
table_context,
|
||||
background_validation: background_rx,
|
||||
background_validation_tx: background_tx,
|
||||
metrics,
|
||||
};
|
||||
|
||||
@@ -925,9 +1039,8 @@ mod tests {
|
||||
use assert_matches::assert_matches;
|
||||
use futures::{future, Future};
|
||||
use polkadot_primitives::v1::{
|
||||
ScheduledCore, BlockData, CandidateCommitments,
|
||||
PersistedValidationData, ValidationData, TransientValidationData, HeadData,
|
||||
GroupRotationInfo,
|
||||
ScheduledCore, BlockData, PersistedValidationData, ValidationData,
|
||||
TransientValidationData, HeadData, GroupRotationInfo,
|
||||
};
|
||||
use polkadot_subsystem::{
|
||||
messages::{RuntimeApiRequest, RuntimeApiMessage},
|
||||
@@ -1357,13 +1470,6 @@ mod tests {
|
||||
}
|
||||
);
|
||||
|
||||
let statement = CandidateBackingMessage::Statement(
|
||||
test_state.relay_parent,
|
||||
signed_b.clone(),
|
||||
);
|
||||
|
||||
virtual_overseer.send(FromOverseer::Communication{ msg: statement }).await;
|
||||
|
||||
assert_matches!(
|
||||
virtual_overseer.recv().await,
|
||||
AllMessages::StatementDistribution(
|
||||
@@ -1374,6 +1480,13 @@ mod tests {
|
||||
}
|
||||
);
|
||||
|
||||
let statement = CandidateBackingMessage::Statement(
|
||||
test_state.relay_parent,
|
||||
signed_b.clone(),
|
||||
);
|
||||
|
||||
virtual_overseer.send(FromOverseer::Communication{ msg: statement }).await;
|
||||
|
||||
assert_matches!(
|
||||
virtual_overseer.recv().await,
|
||||
AllMessages::Provisioner(
|
||||
@@ -1404,6 +1517,152 @@ mod tests {
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn backing_works_while_validation_ongoing() {
|
||||
let test_state = TestState::default();
|
||||
test_harness(test_state.keystore.clone(), |test_harness| async move {
|
||||
let TestHarness { mut virtual_overseer } = test_harness;
|
||||
|
||||
test_startup(&mut virtual_overseer, &test_state).await;
|
||||
|
||||
let pov = PoV {
|
||||
block_data: BlockData(vec![1, 2, 3]),
|
||||
};
|
||||
|
||||
let pov_hash = pov.hash();
|
||||
|
||||
let expected_head_data = test_state.head_data.get(&test_state.chain_ids[0]).unwrap();
|
||||
|
||||
let candidate_a = TestCandidateBuilder {
|
||||
para_id: test_state.chain_ids[0],
|
||||
relay_parent: test_state.relay_parent,
|
||||
pov_hash,
|
||||
head_data: expected_head_data.clone(),
|
||||
erasure_root: make_erasure_root(&test_state, pov.clone()),
|
||||
..Default::default()
|
||||
}.build();
|
||||
|
||||
let candidate_a_hash = candidate_a.hash();
|
||||
let public1 = CryptoStore::sr25519_generate_new(
|
||||
&*test_state.keystore,
|
||||
ValidatorId::ID,
|
||||
Some(&test_state.validators[5].to_seed()),
|
||||
).await.expect("Insert key into keystore");
|
||||
let public2 = CryptoStore::sr25519_generate_new(
|
||||
&*test_state.keystore,
|
||||
ValidatorId::ID,
|
||||
Some(&test_state.validators[2].to_seed()),
|
||||
).await.expect("Insert key into keystore");
|
||||
let public3 = CryptoStore::sr25519_generate_new(
|
||||
&*test_state.keystore,
|
||||
ValidatorId::ID,
|
||||
Some(&test_state.validators[3].to_seed()),
|
||||
).await.expect("Insert key into keystore");
|
||||
|
||||
let signed_a = SignedFullStatement::sign(
|
||||
&test_state.keystore,
|
||||
Statement::Seconded(candidate_a.clone()),
|
||||
&test_state.signing_context,
|
||||
2,
|
||||
&public2.into(),
|
||||
).await.expect("should be signed");
|
||||
|
||||
let signed_b = SignedFullStatement::sign(
|
||||
&test_state.keystore,
|
||||
Statement::Valid(candidate_a_hash),
|
||||
&test_state.signing_context,
|
||||
5,
|
||||
&public1.into(),
|
||||
).await.expect("should be signed");
|
||||
|
||||
let signed_c = SignedFullStatement::sign(
|
||||
&test_state.keystore,
|
||||
Statement::Valid(candidate_a_hash),
|
||||
&test_state.signing_context,
|
||||
3,
|
||||
&public3.into(),
|
||||
).await.expect("should be signed");
|
||||
|
||||
let statement = CandidateBackingMessage::Statement(test_state.relay_parent, signed_a.clone());
|
||||
virtual_overseer.send(FromOverseer::Communication{ msg: statement }).await;
|
||||
|
||||
// Sending a `Statement::Seconded` for our assignment will start
|
||||
// validation process. The first thing requested is PoV from the
|
||||
// `PoVDistribution`.
|
||||
assert_matches!(
|
||||
virtual_overseer.recv().await,
|
||||
AllMessages::PoVDistribution(
|
||||
PoVDistributionMessage::FetchPoV(relay_parent, _, tx)
|
||||
) if relay_parent == test_state.relay_parent => {
|
||||
tx.send(Arc::new(pov.clone())).unwrap();
|
||||
}
|
||||
);
|
||||
|
||||
// The next step is the actual request to Validation subsystem
|
||||
// to validate the `Seconded` candidate.
|
||||
assert_matches!(
|
||||
virtual_overseer.recv().await,
|
||||
AllMessages::CandidateValidation(
|
||||
CandidateValidationMessage::ValidateFromChainState(
|
||||
c,
|
||||
pov,
|
||||
tx,
|
||||
)
|
||||
) if pov == pov && &c == candidate_a.descriptor() => {
|
||||
// we never validate the candidate. our local node
|
||||
// shouldn't issue any statements.
|
||||
std::mem::forget(tx);
|
||||
}
|
||||
);
|
||||
|
||||
let statement = CandidateBackingMessage::Statement(
|
||||
test_state.relay_parent,
|
||||
signed_b.clone(),
|
||||
);
|
||||
|
||||
virtual_overseer.send(FromOverseer::Communication{ msg: statement }).await;
|
||||
|
||||
let statement = CandidateBackingMessage::Statement(
|
||||
test_state.relay_parent,
|
||||
signed_c.clone(),
|
||||
);
|
||||
|
||||
virtual_overseer.send(FromOverseer::Communication{ msg: statement }).await;
|
||||
|
||||
// Candidate gets backed entirely by other votes.
|
||||
assert_matches!(
|
||||
virtual_overseer.recv().await,
|
||||
AllMessages::Provisioner(
|
||||
ProvisionerMessage::ProvisionableData(
|
||||
_,
|
||||
ProvisionableData::BackedCandidate(BackedCandidate {
|
||||
candidate,
|
||||
validity_votes,
|
||||
validator_indices,
|
||||
})
|
||||
)
|
||||
) if candidate == candidate_a => {
|
||||
assert_eq!(validity_votes.len(), 3);
|
||||
|
||||
assert!(validity_votes.contains(
|
||||
&ValidityAttestation::Implicit(signed_a.signature().clone())
|
||||
));
|
||||
assert!(validity_votes.contains(
|
||||
&ValidityAttestation::Explicit(signed_b.signature().clone())
|
||||
));
|
||||
assert!(validity_votes.contains(
|
||||
&ValidityAttestation::Explicit(signed_c.signature().clone())
|
||||
));
|
||||
assert_eq!(validator_indices, bitvec::bitvec![Lsb0, u8; 1, 0, 1, 1]);
|
||||
}
|
||||
);
|
||||
|
||||
virtual_overseer.send(FromOverseer::Signal(
|
||||
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::stop_work(test_state.relay_parent)))
|
||||
).await;
|
||||
});
|
||||
}
|
||||
|
||||
// Issuing conflicting statements on the same candidate should
|
||||
// be a misbehavior.
|
||||
#[test]
|
||||
|
||||
Reference in New Issue
Block a user