From 1008465413634d37bfe5ca928375f5012d4f21f7 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Mon, 30 Nov 2020 12:01:03 -0500 Subject: [PATCH] Move candidate validation to the background (#2028) * refactor some functions to not rely on `self` * factor out common elements of seconding and attesting * Add Spawn to backing FromJob * do candidate validation in background * tests * address grumbles --- polkadot/node/core/backing/src/lib.rs | 695 ++++++++++++++++++-------- 1 file changed, 477 insertions(+), 218 deletions(-) diff --git a/polkadot/node/core/backing/src/lib.rs b/polkadot/node/core/backing/src/lib.rs index 6adec2747a..aff2944e45 100644 --- a/polkadot/node/core/backing/src/lib.rs +++ b/polkadot/node/core/backing/src/lib.rs @@ -31,7 +31,7 @@ use polkadot_primitives::v1::{ CommittedCandidateReceipt, BackedCandidate, Id as ParaId, ValidatorId, ValidatorIndex, SigningContext, PoV, CandidateHash, CandidateDescriptor, AvailableData, ValidatorSignature, Hash, CandidateReceipt, - CoreState, CoreIndex, CollatorId, ValidityAttestation, + CoreState, CoreIndex, CollatorId, ValidityAttestation, CandidateCommitments, }; use polkadot_node_primitives::{ FromTableMisbehavior, Statement, SignedFullStatement, MisbehaviorReport, ValidationResult, @@ -87,6 +87,36 @@ enum Error { UtilError(#[from] util::Error), } +enum ValidatedCandidateCommand { + // We were instructed to second the candidate. + Second(BackgroundValidationResult), + // We were instructed to validate the candidate. + Attest(BackgroundValidationResult), +} + +impl std::fmt::Debug for ValidatedCandidateCommand { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + let candidate_hash = self.candidate_hash(); + match *self { + ValidatedCandidateCommand::Second(_) => + write!(f, "Second({})", candidate_hash), + ValidatedCandidateCommand::Attest(_) => + write!(f, "Attest({})", candidate_hash), + } + } +} + +impl ValidatedCandidateCommand { + fn candidate_hash(&self) -> CandidateHash { + match *self { + ValidatedCandidateCommand::Second(Ok((ref candidate, _, _))) => candidate.hash(), + ValidatedCandidateCommand::Second(Err(ref candidate)) => candidate.hash(), + ValidatedCandidateCommand::Attest(Ok((ref candidate, _, _))) => candidate.hash(), + ValidatedCandidateCommand::Attest(Err(ref candidate)) => candidate.hash(), + } + } +} + /// Holds all data needed for candidate backing job operation. struct CandidateBackingJob { /// The hash of the relay parent on top of which this job is doing it's work. @@ -99,8 +129,10 @@ struct CandidateBackingJob { assignment: ParaId, /// The collator required to author the candidate, if any. required_collator: Option, - /// We issued `Valid` or `Invalid` statements on about these candidates. + /// We issued `Seconded`, `Valid` or `Invalid` statements on about these candidates. issued_statements: HashSet, + /// These candidates are undergoing validation in the background. + awaiting_validation: HashSet, /// `Some(h)` if this job has already issues `Seconded` statemt for some candidate with `h` hash. seconded: Option, /// The candidates that are includable, by hash. Each entry here indicates @@ -111,6 +143,8 @@ struct CandidateBackingJob { keystore: SyncCryptoStorePtr, table: Table, table_context: TableContext, + background_validation: mpsc::Receiver, + background_validation_tx: mpsc::Sender, metrics: Metrics, } @@ -219,19 +253,272 @@ fn table_attested_to_backed( }) } +async fn store_available_data( + tx_from: &mut mpsc::Sender, + id: Option, + n_validators: u32, + candidate_hash: CandidateHash, + available_data: AvailableData, +) -> Result<(), Error> { + let (tx, rx) = oneshot::channel(); + tx_from.send(AllMessages::AvailabilityStore( + AvailabilityStoreMessage::StoreAvailableData( + candidate_hash, + id, + n_validators, + available_data, + tx, + ) + ).into() + ).await?; + + let _ = rx.await?; + + Ok(()) +} + +// Make a `PoV` available. +// +// This will compute the erasure root internally and compare it to the expected erasure root. +// This returns `Err()` iff there is an internal error. Otherwise, it returns either `Ok(Ok(()))` or `Ok(Err(_))`. +#[tracing::instrument(level = "trace", skip(tx_from, pov), fields(subsystem = LOG_TARGET))] +async fn make_pov_available( + tx_from: &mut mpsc::Sender, + validator_index: Option, + n_validators: usize, + pov: Arc, + candidate_hash: CandidateHash, + validation_data: polkadot_primitives::v1::PersistedValidationData, + expected_erasure_root: Hash, +) -> Result, Error> { + let available_data = AvailableData { + pov, + validation_data, + }; + + let chunks = erasure_coding::obtain_chunks_v1( + n_validators, + &available_data, + )?; + + let branches = erasure_coding::branches(chunks.as_ref()); + let erasure_root = branches.root(); + + if erasure_root != expected_erasure_root { + return Ok(Err(InvalidErasureRoot)); + } + + store_available_data( + tx_from, + validator_index, + n_validators as u32, + candidate_hash, + available_data, + ).await?; + + Ok(Ok(())) +} + +async fn request_pov_from_distribution( + tx_from: &mut mpsc::Sender, + parent: Hash, + descriptor: CandidateDescriptor, +) -> Result, Error> { + let (tx, rx) = oneshot::channel(); + + tx_from.send(AllMessages::PoVDistribution( + PoVDistributionMessage::FetchPoV(parent, descriptor, tx) + ).into()).await?; + + Ok(rx.await?) +} + +async fn request_candidate_validation( + tx_from: &mut mpsc::Sender, + candidate: CandidateDescriptor, + pov: Arc, +) -> Result { + let (tx, rx) = oneshot::channel(); + + tx_from.send(AllMessages::CandidateValidation( + CandidateValidationMessage::ValidateFromChainState( + candidate, + pov, + tx, + ) + ).into() + ).await?; + + Ok(rx.await??) +} + +type BackgroundValidationResult = Result<(CandidateReceipt, CandidateCommitments, Arc), CandidateReceipt>; + +struct BackgroundValidationParams { + tx_from: mpsc::Sender, + tx_command: mpsc::Sender, + candidate: CandidateReceipt, + relay_parent: Hash, + pov: Option>, + validator_index: Option, + n_validators: usize, + make_command: F, +} + +async fn validate_and_make_available( + params: BackgroundValidationParams ValidatedCandidateCommand>, +) -> Result<(), Error> { + let BackgroundValidationParams { + mut tx_from, + mut tx_command, + candidate, + relay_parent, + pov, + validator_index, + n_validators, + make_command, + } = params; + + let pov = match pov { + Some(pov) => pov, + None => request_pov_from_distribution( + &mut tx_from, + relay_parent, + candidate.descriptor.clone(), + ).await?, + }; + + let v = request_candidate_validation(&mut tx_from, candidate.descriptor.clone(), pov.clone()).await?; + + let expected_commitments_hash = candidate.commitments_hash; + + let res = match v { + ValidationResult::Valid(commitments, validation_data) => { + // If validation produces a new set of commitments, we vote the candidate as invalid. + if commitments.hash() != expected_commitments_hash { + Err(candidate) + } else { + let erasure_valid = make_pov_available( + &mut tx_from, + validator_index, + n_validators, + pov.clone(), + candidate.hash(), + validation_data, + candidate.descriptor.erasure_root, + ).await?; + + match erasure_valid { + Ok(()) => Ok((candidate, commitments, pov.clone())), + Err(InvalidErasureRoot) => Err(candidate), + } + } + } + ValidationResult::Invalid(_reason) => { + Err(candidate) + } + }; + + let command = make_command(res); + tx_command.send(command).await?; + Ok(()) +} + impl CandidateBackingJob { /// Run asynchronously. async fn run_loop(mut self) -> Result<(), Error> { loop { - match self.rx_to.next().await { - Some(msg) => self.process_msg(msg).await?, - None => break, + futures::select! { + validated_command = self.background_validation.next() => { + if let Some(c) = validated_command { + self.handle_validated_candidate_command(c).await?; + } else { + panic!("`self` hasn't dropped and `self` holds a reference to this sender; qed"); + } + } + to_job = self.rx_to.next() => match to_job { + None => break, + Some(msg) => { + self.process_msg(msg).await?; + } + } } } Ok(()) } + #[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))] + async fn handle_validated_candidate_command( + &mut self, + command: ValidatedCandidateCommand, + ) -> Result<(), Error> { + let candidate_hash = command.candidate_hash(); + self.awaiting_validation.remove(&candidate_hash); + + match command { + ValidatedCandidateCommand::Second(res) => { + match res { + Ok((candidate, commitments, pov)) => { + // sanity check. + if self.seconded.is_none() && !self.issued_statements.contains(&candidate_hash) { + self.seconded = Some(candidate_hash); + self.issued_statements.insert(candidate_hash); + self.metrics.on_candidate_seconded(); + + let statement = Statement::Seconded(CommittedCandidateReceipt { + descriptor: candidate.descriptor.clone(), + commitments, + }); + self.sign_import_and_distribute_statement(statement).await?; + self.distribute_pov(candidate.descriptor, pov).await?; + } + } + Err(candidate) => { + self.issue_candidate_invalid_message(candidate).await?; + } + } + } + ValidatedCandidateCommand::Attest(res) => { + // sanity check. + if !self.issued_statements.contains(&candidate_hash) { + let statement = if res.is_ok() { + Statement::Valid(candidate_hash) + } else { + Statement::Invalid(candidate_hash) + }; + + self.issued_statements.insert(candidate_hash); + self.sign_import_and_distribute_statement(statement).await?; + } + } + } + + Ok(()) + } + + #[tracing::instrument(level = "trace", skip(self, params), fields(subsystem = LOG_TARGET))] + async fn background_validate_and_make_available( + &mut self, + params: BackgroundValidationParams< + impl Fn(BackgroundValidationResult) -> ValidatedCandidateCommand + Send + 'static + >, + ) -> Result<(), Error> { + let candidate_hash = params.candidate.hash(); + + if self.awaiting_validation.insert(candidate_hash) { + // spawn background task. + let bg = async move { + if let Err(e) = validate_and_make_available(params).await { + tracing::error!("Failed to validate and make available: {:?}", e); + } + }; + self.tx_from.send(FromJobCommand::Spawn("Backing Validation", bg.boxed())).await?; + } + + Ok(()) + } + async fn issue_candidate_invalid_message( &mut self, candidate: CandidateReceipt, @@ -241,83 +528,33 @@ impl CandidateBackingJob { Ok(()) } - /// Validate the candidate that is requested to be `Second`ed and distribute validation result. - /// - /// Returns `Ok(true)` if we issued a `Seconded` statement about this candidate. + /// Kick off background validation with intent to second. #[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))] async fn validate_and_second( &mut self, candidate: &CandidateReceipt, pov: Arc, - ) -> Result { + ) -> Result<(), Error> { // Check that candidate is collated by the right collator. if self.required_collator.as_ref() .map_or(false, |c| c != &candidate.descriptor().collator) { self.issue_candidate_invalid_message(candidate.clone()).await?; - return Ok(false); + return Ok(()); } - let valid = self.request_candidate_validation( - candidate.descriptor().clone(), - pov.clone(), - ).await?; + self.background_validate_and_make_available(BackgroundValidationParams { + tx_from: self.tx_from.clone(), + tx_command: self.background_validation_tx.clone(), + candidate: candidate.clone(), + relay_parent: self.parent, + pov: Some(pov), + validator_index: self.table_context.validator.as_ref().map(|v| v.index()), + n_validators: self.table_context.validators.len(), + make_command: ValidatedCandidateCommand::Second, + }).await?; - let candidate_hash = candidate.hash(); - - let statement = match valid { - ValidationResult::Valid(commitments, validation_data) => { - // make PoV available for later distribution. Send data to the availability - // store to keep. Sign and dispatch `valid` statement to network if we - // have not seconded the given candidate. - // - // If the commitments hash produced by validation is not the same as given by - // the collator, do not make available and report the collator. - if candidate.commitments_hash != commitments.hash() { - self.issue_candidate_invalid_message(candidate.clone()).await?; - None - } else { - let erasure_valid = self.make_pov_available( - pov, - candidate_hash, - validation_data, - candidate.descriptor.erasure_root, - ).await?; - - match erasure_valid { - Ok(()) => { - let candidate = CommittedCandidateReceipt { - descriptor: candidate.descriptor().clone(), - commitments, - }; - - self.issued_statements.insert(candidate_hash); - Some(Statement::Seconded(candidate)) - } - Err(InvalidErasureRoot) => { - self.issue_candidate_invalid_message(candidate.clone()).await?; - None - } - } - } - } - ValidationResult::Invalid(_reason) => { - // no need to issue a statement about this if we aren't seconding it. - // - // there's an infinite amount of garbage out there. no need to acknowledge - // all of it. - self.issue_candidate_invalid_message(candidate.clone()).await?; - None - } - }; - - let issued_statement = statement.is_some(); - - if let Some(statement) = statement { - self.sign_import_and_distribute_statement(statement).await? - } - - Ok(issued_statement) + Ok(()) } async fn sign_import_and_distribute_statement(&mut self, statement: Statement) -> Result<(), Error> { @@ -418,7 +655,6 @@ impl CandidateBackingJob { #[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))] async fn process_msg(&mut self, msg: CandidateBackingMessage) -> Result<(), Error> { - match msg { CandidateBackingMessage::Second(_, candidate, pov) => { let _timer = self.metrics.time_process_second(); @@ -437,14 +673,7 @@ impl CandidateBackingJob { let pov = Arc::new(pov); if !self.issued_statements.contains(&candidate_hash) { - if let Ok(true) = self.validate_and_second( - &candidate, - pov.clone(), - ).await { - self.metrics.on_candidate_seconded(); - self.seconded = Some(candidate_hash); - self.distribute_pov(candidate.descriptor, pov).await?; - } + self.validate_and_second(&candidate, pov.clone()).await?; } } } @@ -485,10 +714,7 @@ impl CandidateBackingJob { // We clone the commitments here because there are borrowck // errors relating to this being a struct and methods borrowing the entirety of self // and not just those things that the function uses. - let candidate = self.table.get_candidate(&candidate_hash).ok_or(Error::CandidateNotFound)?; - let expected_commitments = candidate.commitments.clone(); - let expected_erasure_root = candidate.descriptor.erasure_root; - + let candidate = self.table.get_candidate(&candidate_hash).ok_or(Error::CandidateNotFound)?.to_plain(); let descriptor = candidate.descriptor().clone(); // Check that candidate is collated by the right collator. @@ -503,36 +729,16 @@ impl CandidateBackingJob { return Ok(()); } - let pov = self.request_pov_from_distribution(descriptor.clone()).await?; - let v = self.request_candidate_validation(descriptor, pov.clone()).await?; - - let statement = match v { - ValidationResult::Valid(commitments, validation_data) => { - // If validation produces a new set of commitments, we vote the candidate as invalid. - if commitments != expected_commitments { - Statement::Invalid(candidate_hash) - } else { - let erasure_valid = self.make_pov_available( - pov, - candidate_hash, - validation_data, - expected_erasure_root, - ).await?; - - match erasure_valid { - Ok(()) => Statement::Valid(candidate_hash), - Err(InvalidErasureRoot) => Statement::Invalid(candidate_hash), - } - } - } - ValidationResult::Invalid(_reason) => { - Statement::Invalid(candidate_hash) - } - }; - - self.issued_statements.insert(candidate_hash); - - self.sign_import_and_distribute_statement(statement).await + self.background_validate_and_make_available(BackgroundValidationParams { + tx_from: self.tx_from.clone(), + tx_command: self.background_validation_tx.clone(), + candidate: candidate, + relay_parent: self.parent, + pov: None, + validator_index: self.table_context.validator.as_ref().map(|v| v.index()), + n_validators: self.table_context.validators.len(), + make_command: ValidatedCandidateCommand::Attest, + }).await } /// Import the statement and kick off validation work if it is a part of our assignment. @@ -596,102 +802,6 @@ impl CandidateBackingJob { ).into()).await.map_err(Into::into) } - async fn request_pov_from_distribution( - &mut self, - descriptor: CandidateDescriptor, - ) -> Result, Error> { - let (tx, rx) = oneshot::channel(); - - self.tx_from.send(AllMessages::from( - PoVDistributionMessage::FetchPoV(self.parent, descriptor, tx) - ).into()).await?; - - Ok(rx.await?) - } - - async fn request_candidate_validation( - &mut self, - candidate: CandidateDescriptor, - pov: Arc, - ) -> Result { - let (tx, rx) = oneshot::channel(); - - self.tx_from.send( - AllMessages::from( - CandidateValidationMessage::ValidateFromChainState( - candidate, - pov, - tx, - ) - ).into(), - ).await?; - - Ok(rx.await??) - } - - async fn store_available_data( - &mut self, - id: Option, - n_validators: u32, - candidate_hash: CandidateHash, - available_data: AvailableData, - ) -> Result<(), Error> { - let (tx, rx) = oneshot::channel(); - self.tx_from.send(AllMessages::from( - AvailabilityStoreMessage::StoreAvailableData( - candidate_hash, - id, - n_validators, - available_data, - tx, - ) - ).into(), - ).await?; - - let _ = rx.await?; - - Ok(()) - } - - // Make a `PoV` available. - // - // This will compute the erasure root internally and compare it to the expected erasure root. - // This returns `Err()` iff there is an internal error. Otherwise, it returns either `Ok(Ok(()))` or `Ok(Err(_))`. - #[tracing::instrument(level = "trace", skip(self, pov), fields(subsystem = LOG_TARGET))] - async fn make_pov_available( - &mut self, - pov: Arc, - candidate_hash: CandidateHash, - validation_data: polkadot_primitives::v1::PersistedValidationData, - expected_erasure_root: Hash, - ) -> Result, Error> { - let available_data = AvailableData { - pov, - validation_data, - }; - - let chunks = erasure_coding::obtain_chunks_v1( - self.table_context.validators.len(), - &available_data, - )?; - - let branches = erasure_coding::branches(chunks.as_ref()); - let erasure_root = branches.root(); - - if erasure_root != expected_erasure_root { - return Ok(Err(InvalidErasureRoot)); - } - - self.store_available_data( - self.table_context.validator.as_ref().map(|v| v.index()), - self.table_context.validators.len() as u32, - candidate_hash, - available_data, - ).await?; - - Ok(Ok(())) - } - async fn distribute_signed_statement(&mut self, s: SignedFullStatement) -> Result<(), Error> { let smsg = StatementDistributionMessage::Share(self.parent, s); @@ -804,6 +914,7 @@ impl util::JobTrait for CandidateBackingJob { Some(r) => r, }; + let (background_tx, background_rx) = mpsc::channel(16); let job = CandidateBackingJob { parent, rx_to, @@ -811,12 +922,15 @@ impl util::JobTrait for CandidateBackingJob { assignment, required_collator, issued_statements: HashSet::new(), + awaiting_validation: HashSet::new(), seconded: None, backed: HashSet::new(), reported_misbehavior_for: HashSet::new(), keystore, table: Table::default(), table_context, + background_validation: background_rx, + background_validation_tx: background_tx, metrics, }; @@ -925,9 +1039,8 @@ mod tests { use assert_matches::assert_matches; use futures::{future, Future}; use polkadot_primitives::v1::{ - ScheduledCore, BlockData, CandidateCommitments, - PersistedValidationData, ValidationData, TransientValidationData, HeadData, - GroupRotationInfo, + ScheduledCore, BlockData, PersistedValidationData, ValidationData, + TransientValidationData, HeadData, GroupRotationInfo, }; use polkadot_subsystem::{ messages::{RuntimeApiRequest, RuntimeApiMessage}, @@ -1357,13 +1470,6 @@ mod tests { } ); - let statement = CandidateBackingMessage::Statement( - test_state.relay_parent, - signed_b.clone(), - ); - - virtual_overseer.send(FromOverseer::Communication{ msg: statement }).await; - assert_matches!( virtual_overseer.recv().await, AllMessages::StatementDistribution( @@ -1374,6 +1480,13 @@ mod tests { } ); + let statement = CandidateBackingMessage::Statement( + test_state.relay_parent, + signed_b.clone(), + ); + + virtual_overseer.send(FromOverseer::Communication{ msg: statement }).await; + assert_matches!( virtual_overseer.recv().await, AllMessages::Provisioner( @@ -1404,6 +1517,152 @@ mod tests { }); } + #[test] + fn backing_works_while_validation_ongoing() { + let test_state = TestState::default(); + test_harness(test_state.keystore.clone(), |test_harness| async move { + let TestHarness { mut virtual_overseer } = test_harness; + + test_startup(&mut virtual_overseer, &test_state).await; + + let pov = PoV { + block_data: BlockData(vec![1, 2, 3]), + }; + + let pov_hash = pov.hash(); + + let expected_head_data = test_state.head_data.get(&test_state.chain_ids[0]).unwrap(); + + let candidate_a = TestCandidateBuilder { + para_id: test_state.chain_ids[0], + relay_parent: test_state.relay_parent, + pov_hash, + head_data: expected_head_data.clone(), + erasure_root: make_erasure_root(&test_state, pov.clone()), + ..Default::default() + }.build(); + + let candidate_a_hash = candidate_a.hash(); + let public1 = CryptoStore::sr25519_generate_new( + &*test_state.keystore, + ValidatorId::ID, + Some(&test_state.validators[5].to_seed()), + ).await.expect("Insert key into keystore"); + let public2 = CryptoStore::sr25519_generate_new( + &*test_state.keystore, + ValidatorId::ID, + Some(&test_state.validators[2].to_seed()), + ).await.expect("Insert key into keystore"); + let public3 = CryptoStore::sr25519_generate_new( + &*test_state.keystore, + ValidatorId::ID, + Some(&test_state.validators[3].to_seed()), + ).await.expect("Insert key into keystore"); + + let signed_a = SignedFullStatement::sign( + &test_state.keystore, + Statement::Seconded(candidate_a.clone()), + &test_state.signing_context, + 2, + &public2.into(), + ).await.expect("should be signed"); + + let signed_b = SignedFullStatement::sign( + &test_state.keystore, + Statement::Valid(candidate_a_hash), + &test_state.signing_context, + 5, + &public1.into(), + ).await.expect("should be signed"); + + let signed_c = SignedFullStatement::sign( + &test_state.keystore, + Statement::Valid(candidate_a_hash), + &test_state.signing_context, + 3, + &public3.into(), + ).await.expect("should be signed"); + + let statement = CandidateBackingMessage::Statement(test_state.relay_parent, signed_a.clone()); + virtual_overseer.send(FromOverseer::Communication{ msg: statement }).await; + + // Sending a `Statement::Seconded` for our assignment will start + // validation process. The first thing requested is PoV from the + // `PoVDistribution`. + assert_matches!( + virtual_overseer.recv().await, + AllMessages::PoVDistribution( + PoVDistributionMessage::FetchPoV(relay_parent, _, tx) + ) if relay_parent == test_state.relay_parent => { + tx.send(Arc::new(pov.clone())).unwrap(); + } + ); + + // The next step is the actual request to Validation subsystem + // to validate the `Seconded` candidate. + assert_matches!( + virtual_overseer.recv().await, + AllMessages::CandidateValidation( + CandidateValidationMessage::ValidateFromChainState( + c, + pov, + tx, + ) + ) if pov == pov && &c == candidate_a.descriptor() => { + // we never validate the candidate. our local node + // shouldn't issue any statements. + std::mem::forget(tx); + } + ); + + let statement = CandidateBackingMessage::Statement( + test_state.relay_parent, + signed_b.clone(), + ); + + virtual_overseer.send(FromOverseer::Communication{ msg: statement }).await; + + let statement = CandidateBackingMessage::Statement( + test_state.relay_parent, + signed_c.clone(), + ); + + virtual_overseer.send(FromOverseer::Communication{ msg: statement }).await; + + // Candidate gets backed entirely by other votes. + assert_matches!( + virtual_overseer.recv().await, + AllMessages::Provisioner( + ProvisionerMessage::ProvisionableData( + _, + ProvisionableData::BackedCandidate(BackedCandidate { + candidate, + validity_votes, + validator_indices, + }) + ) + ) if candidate == candidate_a => { + assert_eq!(validity_votes.len(), 3); + + assert!(validity_votes.contains( + &ValidityAttestation::Implicit(signed_a.signature().clone()) + )); + assert!(validity_votes.contains( + &ValidityAttestation::Explicit(signed_b.signature().clone()) + )); + assert!(validity_votes.contains( + &ValidityAttestation::Explicit(signed_c.signature().clone()) + )); + assert_eq!(validator_indices, bitvec::bitvec![Lsb0, u8; 1, 0, 1, 1]); + } + ); + + virtual_overseer.send(FromOverseer::Signal( + OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::stop_work(test_state.relay_parent))) + ).await; + }); + } + // Issuing conflicting statements on the same candidate should // be a misbehavior. #[test]