diff --git a/polkadot/node/core/backing/src/lib.rs b/polkadot/node/core/backing/src/lib.rs index 44ec79e1d7..20a1a4d421 100644 --- a/polkadot/node/core/backing/src/lib.rs +++ b/polkadot/node/core/backing/src/lib.rs @@ -39,7 +39,7 @@ use polkadot_node_primitives::{ use polkadot_subsystem::{ messages::{ AllMessages, AvailabilityStoreMessage, CandidateBackingMessage, CandidateSelectionMessage, - CandidateValidationMessage, NewBackedCandidate, PoVDistributionMessage, ProvisionableData, + CandidateValidationMessage, PoVDistributionMessage, ProvisionableData, ProvisionerMessage, StatementDistributionMessage, ValidationFailed, RuntimeApiRequest, }, }; @@ -74,11 +74,17 @@ enum Error { #[error("Signature is invalid")] InvalidSignature, #[error("Failed to send candidates {0:?}")] - Send(Vec), - #[error("Oneshot never resolved")] - Oneshot(#[from] #[source] oneshot::Canceled), + Send(Vec), + #[error("FetchPoV channel closed before receipt")] + FetchPoV(#[source] oneshot::Canceled), + #[error("ValidateFromChainState channel closed before receipt")] + ValidateFromChainState(#[source] oneshot::Canceled), + #[error("StoreAvailableData channel closed before receipt")] + StoreAvailableData(#[source] oneshot::Canceled), + #[error("a channel was closed before receipt in try_join!")] + JoinMultiple(#[source] oneshot::Canceled), #[error("Obtaining erasure chunks failed")] - ObtainErasureChunks(#[from] #[source] erasure_coding::Error), + ObtainErasureChunks(#[from] erasure_coding::Error), #[error(transparent)] ValidationFailed(#[from] ValidationFailed), #[error(transparent)] @@ -124,7 +130,7 @@ struct CandidateBackingJob { /// Outbound message channel sending part. tx_from: mpsc::Sender, /// The `ParaId` assigned to this validator - assignment: ParaId, + assignment: Option, /// The collator required to author the candidate, if any. required_collator: Option, /// We issued `Seconded`, `Valid` or `Invalid` statements on about these candidates. @@ -270,7 +276,7 @@ async fn store_available_data( ).into() ).await?; - let _ = rx.await?; + let _ = rx.await.map_err(Error::StoreAvailableData)?; Ok(()) } @@ -328,7 +334,7 @@ async fn request_pov_from_distribution( PoVDistributionMessage::FetchPoV(parent, descriptor, tx) ).into()).await?; - Ok(rx.await?) + rx.await.map_err(Error::FetchPoV) } async fn request_candidate_validation( @@ -347,7 +353,11 @@ async fn request_candidate_validation( ).into() ).await?; - Ok(rx.await??) + match rx.await { + Ok(Ok(validation_result)) => Ok(validation_result), + Ok(Err(err)) => Err(Error::ValidationFailed(err)), + Err(err) => Err(Error::ValidateFromChainState(err)), + } } type BackgroundValidationResult = Result<(CandidateReceipt, CandidateCommitments, Arc), CandidateReceipt>; @@ -567,21 +577,6 @@ impl CandidateBackingJob { Ok(()) } - #[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))] - fn get_backed(&self) -> Vec { - let proposed = self.table.proposed_candidates(&self.table_context); - let mut res = Vec::with_capacity(proposed.len()); - - for p in proposed.into_iter() { - match table_attested_to_backed(p, &self.table_context) { - None => continue, - Some(backed) => res.push(NewBackedCandidate(backed)), - } - } - - res - } - /// Check if there have happened any new misbehaviors and issue necessary messages. /// /// TODO: Report multiple misbehaviors (https://github.com/paritytech/polkadot/issues/1387) @@ -641,7 +636,7 @@ impl CandidateBackingJob { { let message = ProvisionerMessage::ProvisionableData( self.parent, - ProvisionableData::BackedCandidate(backed), + ProvisionableData::BackedCandidate(backed.receipt()), ); self.send_to_provisioner(message).await?; } @@ -661,7 +656,7 @@ impl CandidateBackingJob { let _timer = self.metrics.time_process_second(); // Sanity check that candidate is from our assignment. - if candidate.descriptor().para_id != self.assignment { + if Some(candidate.descriptor().para_id) != self.assignment { return Ok(()); } @@ -688,10 +683,16 @@ impl CandidateBackingJob { Ok(()) => (), } } - CandidateBackingMessage::GetBackedCandidates(_, tx) => { + CandidateBackingMessage::GetBackedCandidates(_, requested_candidates, tx) => { let _timer = self.metrics.time_get_backed_candidates(); - let backed = self.get_backed(); + let backed = requested_candidates + .into_iter() + .filter_map(|hash| { + self.table.attested_candidate(&hash, &self.table_context) + .and_then(|attested| table_attested_to_backed(attested, &self.table_context)) + }) + .collect(); tx.send(backed).map_err(|data| Error::Send(data))?; } @@ -750,7 +751,7 @@ impl CandidateBackingJob { ) -> Result<(), Error> { if let Some(summary) = self.import_statement(&statement).await? { if let Statement::Seconded(_) = statement.payload() { - if summary.group_id == self.assignment { + if Some(summary.group_id) == self.assignment { self.kick_off_validation_work(summary).await?; } } @@ -850,15 +851,15 @@ impl util::JobTrait for CandidateBackingJob { } let (validators, groups, session_index, cores) = futures::try_join!( - request_validators(parent, &mut tx_from).await?, - request_validator_groups(parent, &mut tx_from).await?, - request_session_index_for_child(parent, &mut tx_from).await?, - request_from_runtime( + try_runtime_api!(request_validators(parent, &mut tx_from).await), + try_runtime_api!(request_validator_groups(parent, &mut tx_from).await), + try_runtime_api!(request_session_index_for_child(parent, &mut tx_from).await), + try_runtime_api!(request_from_runtime( parent, &mut tx_from, |tx| RuntimeApiRequest::AvailabilityCores(tx), - ).await?, - )?; + ).await), + ).map_err(Error::JoinMultiple)?; let validators = try_runtime_api!(validators); let (validator_groups, group_rotation_info) = try_runtime_api!(groups); @@ -911,8 +912,8 @@ impl util::JobTrait for CandidateBackingJob { }; let (assignment, required_collator) = match assignment { - None => return Ok(()), // no need to work. - Some(r) => r, + None => (None, None), + Some((assignment, required_collator)) => (Some(assignment), required_collator), }; let (background_tx, background_rx) = mpsc::channel(16); @@ -1492,22 +1493,10 @@ mod tests { AllMessages::Provisioner( ProvisionerMessage::ProvisionableData( _, - ProvisionableData::BackedCandidate(BackedCandidate { - candidate, - validity_votes, - validator_indices, - }) + ProvisionableData::BackedCandidate(candidate_receipt) ) - ) if candidate == candidate_a => { - assert_eq!(validity_votes.len(), 3); - - assert!(validity_votes.contains( - &ValidityAttestation::Explicit(signed_b.signature().clone()) - )); - assert!(validity_votes.contains( - &ValidityAttestation::Implicit(signed_a.signature().clone()) - )); - assert_eq!(validator_indices, bitvec::bitvec![Lsb0, u8; 1, 1, 0, 1]); + ) => { + assert_eq!(candidate_receipt, candidate_a.to_plain()); } ); @@ -2190,6 +2179,7 @@ mod tests { let (tx, rx) = oneshot::channel(); let msg = CandidateBackingMessage::GetBackedCandidates( test_state.relay_parent, + vec![candidate.hash()], tx, ); diff --git a/polkadot/node/core/provisioner/src/lib.rs b/polkadot/node/core/provisioner/src/lib.rs index 560764453a..be3f38ba1e 100644 --- a/polkadot/node/core/provisioner/src/lib.rs +++ b/polkadot/node/core/provisioner/src/lib.rs @@ -26,14 +26,17 @@ use futures::{ }; use polkadot_node_subsystem::{ errors::{ChainApiError, RuntimeApiError}, - messages::{ChainApiMessage, ProvisionableData, ProvisionerInherentData, ProvisionerMessage, AllMessages}, + messages::{ + AllMessages, CandidateBackingMessage, ChainApiMessage, ProvisionableData, ProvisionerInherentData, + ProvisionerMessage, + }, }; use polkadot_node_subsystem_util::{ self as util, delegated_subsystem, FromJobCommand, request_availability_cores, request_persisted_validation_data, JobTrait, metrics::{self, prometheus}, }; use polkadot_primitives::v1::{ - BackedCandidate, BlockNumber, CoreState, Hash, OccupiedCoreAssumption, + BackedCandidate, BlockNumber, CandidateReceipt, CoreState, Hash, OccupiedCoreAssumption, SignedAvailabilityBitfield, ValidatorIndex, }; use std::{pin::Pin, collections::BTreeMap}; @@ -82,7 +85,7 @@ struct ProvisioningJob { sender: mpsc::Sender, receiver: mpsc::Receiver, provisionable_data_channels: Vec>, - backed_candidates: Vec, + backed_candidates: Vec, signed_bitfields: Vec, metrics: Metrics, inherent_after: InherentAfter, @@ -94,8 +97,17 @@ enum Error { #[error(transparent)] Util(#[from] util::Error), - #[error(transparent)] - OneshotRecv(#[from] oneshot::Canceled), + #[error("failed to get availability cores")] + CanceledAvailabilityCores(#[source] oneshot::Canceled), + + #[error("failed to get persisted validation data")] + CanceledPersistedValidationData(#[source] oneshot::Canceled), + + #[error("failed to get block number")] + CanceledBlockNumber(#[source] oneshot::Canceled), + + #[error("failed to get backed candidates")] + CanceledBackedCandidates(#[source] oneshot::Canceled), #[error(transparent)] ChainApi(#[from] ChainApiError), @@ -103,11 +115,17 @@ enum Error { #[error(transparent)] Runtime(#[from] RuntimeApiError), - #[error("Failed to send message to ChainAPI")] + #[error("failed to send message to ChainAPI")] ChainApiMessageSend(#[source] mpsc::SendError), - #[error("Failed to send return message with Inherents")] + #[error("failed to send message to CandidateBacking to get backed candidates")] + GetBackedCandidatesSend(#[source] mpsc::SendError), + + #[error("failed to send return message with Inherents")] InherentDataReturnChannel, + + #[error("backed candidate does not correspond to selected candidate; check logic in provisioner")] + BackedCandidateOrderingProblem, } impl JobTrait for ProvisioningJob { @@ -291,13 +309,13 @@ type CoreAvailability = BitVec; async fn send_inherent_data( relay_parent: Hash, bitfields: &[SignedAvailabilityBitfield], - candidates: &[BackedCandidate], + candidates: &[CandidateReceipt], return_senders: Vec>, from_job: &mut mpsc::Sender, ) -> Result<(), Error> { let availability_cores = request_availability_cores(relay_parent, from_job) .await? - .await??; + .await.map_err(|err| Error::CanceledAvailabilityCores(err))??; let bitfields = select_availability_bitfields(&availability_cores, bitfields); let candidates = select_candidates( @@ -363,7 +381,7 @@ fn select_availability_bitfields( async fn select_candidates( availability_cores: &[CoreState], bitfields: &[SignedAvailabilityBitfield], - candidates: &[BackedCandidate], + candidates: &[CandidateReceipt], relay_parent: Hash, sender: &mut mpsc::Sender, ) -> Result, Error> { @@ -403,7 +421,7 @@ async fn select_candidates( sender, ) .await? - .await?? + .await.map_err(|err| Error::CanceledPersistedValidationData(err))?? { Some(v) => v, None => continue, @@ -413,15 +431,40 @@ async fn select_candidates( // we arbitrarily pick the first of the backed candidates which match the appropriate selection criteria if let Some(candidate) = candidates.iter().find(|backed_candidate| { - let descriptor = &backed_candidate.candidate.descriptor; + let descriptor = &backed_candidate.descriptor; descriptor.para_id == scheduled_core.para_id && descriptor.persisted_validation_data_hash == computed_validation_data_hash }) { - selected_candidates.push(candidate.clone()); + selected_candidates.push(candidate.hash()); } } - Ok(selected_candidates) + // now get the backed candidates corresponding to these candidate receipts + let (tx, rx) = oneshot::channel(); + sender.send(AllMessages::CandidateBacking(CandidateBackingMessage::GetBackedCandidates( + relay_parent, + selected_candidates.clone(), + tx, + )).into()).await.map_err(|err| Error::GetBackedCandidatesSend(err))?; + let candidates = rx.await.map_err(|err| Error::CanceledBackedCandidates(err))?; + + // `selected_candidates` is generated in ascending order by core index, and `GetBackedCandidates` + // _should_ preserve that property, but let's just make sure. + // + // We can't easily map from `BackedCandidate` to `core_idx`, but we know that every selected candidate + // maps to either 0 or 1 backed candidate, and the hashes correspond. Therefore, by checking them + // in order, we can ensure that the backed candidates are also in order. + let mut backed_idx = 0; + for selected in selected_candidates.iter() { + if *selected == candidates.get(backed_idx).ok_or(Error::BackedCandidateOrderingProblem)?.hash() { + backed_idx += 1; + } + } + if candidates.len() != backed_idx { + Err(Error::BackedCandidateOrderingProblem)?; + } + + Ok(candidates) } /// Produces a block number 1 higher than that of the relay parent @@ -439,7 +482,7 @@ async fn get_block_number_under_construction( )).into()) .await .map_err(|e| Error::ChainApiMessageSend(e))?; - match rx.await? { + match rx.await.map_err(|err| Error::CanceledBlockNumber(err))? { Ok(Some(n)) => Ok(n + 1), Ok(None) => Ok(0), Err(err) => Err(err.into()), diff --git a/polkadot/node/subsystem-util/src/lib.rs b/polkadot/node/subsystem-util/src/lib.rs index 8abfcad206..82c3310f21 100644 --- a/polkadot/node/subsystem-util/src/lib.rs +++ b/polkadot/node/subsystem-util/src/lib.rs @@ -610,7 +610,10 @@ impl Jobs { async fn send_msg(&mut self, parent_hash: Hash, msg: Job::ToJob) { if let Entry::Occupied(mut job) = self.running.entry(parent_hash) { if job.get_mut().send_msg(msg).await.is_err() { - tracing::debug!(job = Job::NAME, "failed to send message to job, will remove it"); + tracing::warn!( + job = Job::NAME, + relay_parent = ?parent_hash, + "failed to send message to job, will remove it"); job.remove(); } } diff --git a/polkadot/node/subsystem/src/messages.rs b/polkadot/node/subsystem/src/messages.rs index 727814381f..3b7ee32dda 100644 --- a/polkadot/node/subsystem/src/messages.rs +++ b/polkadot/node/subsystem/src/messages.rs @@ -47,10 +47,6 @@ pub trait BoundToRelayParent { fn relay_parent(&self) -> Hash; } -/// A notification of a new backed candidate. -#[derive(Debug)] -pub struct NewBackedCandidate(pub BackedCandidate); - /// Messages received by the Candidate Selection subsystem. #[derive(Debug)] pub enum CandidateSelectionMessage { @@ -81,7 +77,7 @@ impl Default for CandidateSelectionMessage { pub enum CandidateBackingMessage { /// Requests a set of backable candidates that could be backed in a child of the given /// relay-parent, referenced by its hash. - GetBackedCandidates(Hash, oneshot::Sender>), + GetBackedCandidates(Hash, Vec, oneshot::Sender>), /// Note that the Candidate Backing subsystem should second the given candidate in the context of the /// given relay-parent (ref. by hash). This candidate must be validated. Second(Hash, CandidateReceipt, PoV), @@ -93,7 +89,7 @@ pub enum CandidateBackingMessage { impl BoundToRelayParent for CandidateBackingMessage { fn relay_parent(&self) -> Hash { match self { - Self::GetBackedCandidates(hash, _) => *hash, + Self::GetBackedCandidates(hash, _, _) => *hash, Self::Second(hash, _, _) => *hash, Self::Statement(hash, _) => *hash, } @@ -497,7 +493,7 @@ pub enum ProvisionableData { /// This bitfield indicates the availability of various candidate blocks. Bitfield(Hash, SignedAvailabilityBitfield), /// The Candidate Backing subsystem believes that this candidate is valid, pending availability. - BackedCandidate(BackedCandidate), + BackedCandidate(CandidateReceipt), /// Misbehavior reports are self-contained proofs of validator misbehavior. MisbehaviorReport(Hash, MisbehaviorReport), /// Disputes trigger a broad dispute resolution process. diff --git a/polkadot/primitives/src/v1.rs b/polkadot/primitives/src/v1.rs index 947c595036..1b46980684 100644 --- a/polkadot/primitives/src/v1.rs +++ b/polkadot/primitives/src/v1.rs @@ -416,6 +416,16 @@ impl BackedCandidate { pub fn descriptor(&self) -> &CandidateDescriptor { &self.candidate.descriptor } + + /// Compute this candidate's hash. + pub fn hash(&self) -> CandidateHash where H: Clone + Encode { + self.candidate.hash() + } + + /// Get this candidate's receipt. + pub fn receipt(&self) -> CandidateReceipt where H: Clone { + self.candidate.to_plain() + } } /// Verify the backing of the given candidate. diff --git a/polkadot/roadmap/implementers-guide/src/node/backing/candidate-backing.md b/polkadot/roadmap/implementers-guide/src/node/backing/candidate-backing.md index 3acf058a7d..016c509674 100644 --- a/polkadot/roadmap/implementers-guide/src/node/backing/candidate-backing.md +++ b/polkadot/roadmap/implementers-guide/src/node/backing/candidate-backing.md @@ -67,7 +67,7 @@ The goal of a Candidate Backing Job is to produce as many backable candidates as ```rust match msg { - CetBackedCandidates(hash, tx) => { + GetBackedCandidates(hashes, tx) => { // Send back a set of backable candidates. } CandidateBackingMessage::Second(hash, candidate) => { @@ -88,7 +88,7 @@ match msg { } ``` -Add `Seconded` statements and `Valid` statements to a quorum. If quorum reaches validator-group majority, send a [`ProvisionerMessage`][PM]`::ProvisionableData(ProvisionableData::BackedCandidate(BackedCandidate))` message. +Add `Seconded` statements and `Valid` statements to a quorum. If quorum reaches validator-group majority, send a [`ProvisionerMessage`][PM]`::ProvisionableData(ProvisionableData::BackedCandidate(CandidateReceipt))` message. `Invalid` statements that conflict with already witnessed `Seconded` and `Valid` statements for the given candidate, statements that are double-votes, self-contradictions and so on, should result in issuing a [`ProvisionerMessage`][PM]`::MisbehaviorReport` message for each newly detected case of this kind. ### Validating Candidates. diff --git a/polkadot/roadmap/implementers-guide/src/types/overseer-protocol.md b/polkadot/roadmap/implementers-guide/src/types/overseer-protocol.md index 6debd20523..44f62c3e8d 100644 --- a/polkadot/roadmap/implementers-guide/src/types/overseer-protocol.md +++ b/polkadot/roadmap/implementers-guide/src/types/overseer-protocol.md @@ -211,7 +211,7 @@ enum BitfieldSigningMessage { } enum CandidateBackingMessage { /// Requests a set of backable candidates that could be backed in a child of the given /// relay-parent, referenced by its hash. - GetBackedCandidates(Hash, ResponseChannel>), + GetBackedCandidates(Hash, Vec, ResponseChannel>), /// Note that the Candidate Backing subsystem should second the given candidate in the context of the /// given relay-parent (ref. by hash). This candidate must be validated using the provided PoV. /// The PoV is expected to match the `pov_hash` in the descriptor. @@ -384,7 +384,7 @@ enum ProvisionableData { /// This bitfield indicates the availability of various candidate blocks. Bitfield(Hash, SignedAvailabilityBitfield), /// The Candidate Backing subsystem believes that this candidate is valid, pending availability. - BackedCandidate(BackedCandidate), + BackedCandidate(CandidateReceipt), /// Misbehavior reports are self-contained proofs of validator misbehavior. MisbehaviorReport(Hash, MisbehaviorReport), /// Disputes trigger a broad dispute resolution process.