From e7e9605f87db07fd1082476cd1f9d33936369a71 Mon Sep 17 00:00:00 2001 From: Peter Goodspeed-Niklaus Date: Fri, 4 Dec 2020 11:24:59 +0100 Subject: [PATCH] do not store backed candidates in the provisioner (#1909) * guide: non-semantic changes * guide: update per the issue description * GetBackedCandidates operates on multiple hashes now * GetBackedCandidates still needs a relay parent * implement changes specified in guide * distinguish between various occasions for canceled oneshots * add tracing info to getbackedcandidates * REVERT ME: add tracing messages for GetBackedCandidates Note that these messages are only sometimes actually passed on to the candidate backing subsystem, with the consequence that it is unexpectedly frequent that the provisioner fails to create its provisionable data. * REVERT ME: more tracing logging * REVERT ME: log when CandidateBackingJob receives any message at all * REVERT ME: log when send_msg sends a message to a job * fix candidate-backing tests * streamline GetBackedCandidates This uses table.attested_candidate instead of table.get_candidate, because it's not obvious how to get a BackedCandidate from just a CommittedCandidateReceipt. * REVERT ME: more logging tracing job lifespans * promote warning about job premature demise * don't terminate CandiateBackingJob::run_loop in event of failure to process message * Revert "REVERT ME: more logging tracing job lifespans" This reverts commit 7365f2fb3dec988d95cfcd317eba75587fe7fd16. * Revert "REVERT ME: log when send_msg sends a message to a job" This reverts commit 58e46aad038e6517d6d56390c8be65b046a21884. * Revert "REVERT ME: log when CandidateBackingJob receives any message at all" This reverts commit 0d6f38413c7c66b5e9e81dabc587906fa9f82656. * Revert "REVERT ME: more tracing logging" This reverts commit 675fd2628e84d1596965280e7314155ef21b28e6. * Revert "REVERT ME: add tracing messages for GetBackedCandidates" This reverts commit e09e156493430b33b6c8ab4b5cedb3f2f91afd51. * formatting * add logging message to CandidateBackingJob::run_loop start * REVERT ME: add tracing to candidate-backing job creation * run candidatebacking loop even if no assignment * use unique error variants for each canceled oneshot * Revert "REVERT ME: add tracing to candidate-backing job creation" This reverts commit 8ce5f4f0bd7186dade134b118751480f72ea1fd6. * try_runtime_api more to reduce silent exits * add sanity check that returned backed candidates preserve ordering * remove redundant err attribute --- polkadot/node/core/backing/src/lib.rs | 94 +++++++++---------- polkadot/node/core/provisioner/src/lib.rs | 73 +++++++++++--- polkadot/node/subsystem-util/src/lib.rs | 5 +- polkadot/node/subsystem/src/messages.rs | 10 +- polkadot/primitives/src/v1.rs | 10 ++ .../src/node/backing/candidate-backing.md | 4 +- .../src/types/overseer-protocol.md | 4 +- 7 files changed, 121 insertions(+), 79 deletions(-) diff --git a/polkadot/node/core/backing/src/lib.rs b/polkadot/node/core/backing/src/lib.rs index 44ec79e1d7..20a1a4d421 100644 --- a/polkadot/node/core/backing/src/lib.rs +++ b/polkadot/node/core/backing/src/lib.rs @@ -39,7 +39,7 @@ use polkadot_node_primitives::{ use polkadot_subsystem::{ messages::{ AllMessages, AvailabilityStoreMessage, CandidateBackingMessage, CandidateSelectionMessage, - CandidateValidationMessage, NewBackedCandidate, PoVDistributionMessage, ProvisionableData, + CandidateValidationMessage, PoVDistributionMessage, ProvisionableData, ProvisionerMessage, StatementDistributionMessage, ValidationFailed, RuntimeApiRequest, }, }; @@ -74,11 +74,17 @@ enum Error { #[error("Signature is invalid")] InvalidSignature, #[error("Failed to send candidates {0:?}")] - Send(Vec), - #[error("Oneshot never resolved")] - Oneshot(#[from] #[source] oneshot::Canceled), + Send(Vec), + #[error("FetchPoV channel closed before receipt")] + FetchPoV(#[source] oneshot::Canceled), + #[error("ValidateFromChainState channel closed before receipt")] + ValidateFromChainState(#[source] oneshot::Canceled), + #[error("StoreAvailableData channel closed before receipt")] + StoreAvailableData(#[source] oneshot::Canceled), + #[error("a channel was closed before receipt in try_join!")] + JoinMultiple(#[source] oneshot::Canceled), #[error("Obtaining erasure chunks failed")] - ObtainErasureChunks(#[from] #[source] erasure_coding::Error), + ObtainErasureChunks(#[from] erasure_coding::Error), #[error(transparent)] ValidationFailed(#[from] ValidationFailed), #[error(transparent)] @@ -124,7 +130,7 @@ struct CandidateBackingJob { /// Outbound message channel sending part. tx_from: mpsc::Sender, /// The `ParaId` assigned to this validator - assignment: ParaId, + assignment: Option, /// The collator required to author the candidate, if any. required_collator: Option, /// We issued `Seconded`, `Valid` or `Invalid` statements on about these candidates. @@ -270,7 +276,7 @@ async fn store_available_data( ).into() ).await?; - let _ = rx.await?; + let _ = rx.await.map_err(Error::StoreAvailableData)?; Ok(()) } @@ -328,7 +334,7 @@ async fn request_pov_from_distribution( PoVDistributionMessage::FetchPoV(parent, descriptor, tx) ).into()).await?; - Ok(rx.await?) + rx.await.map_err(Error::FetchPoV) } async fn request_candidate_validation( @@ -347,7 +353,11 @@ async fn request_candidate_validation( ).into() ).await?; - Ok(rx.await??) + match rx.await { + Ok(Ok(validation_result)) => Ok(validation_result), + Ok(Err(err)) => Err(Error::ValidationFailed(err)), + Err(err) => Err(Error::ValidateFromChainState(err)), + } } type BackgroundValidationResult = Result<(CandidateReceipt, CandidateCommitments, Arc), CandidateReceipt>; @@ -567,21 +577,6 @@ impl CandidateBackingJob { Ok(()) } - #[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))] - fn get_backed(&self) -> Vec { - let proposed = self.table.proposed_candidates(&self.table_context); - let mut res = Vec::with_capacity(proposed.len()); - - for p in proposed.into_iter() { - match table_attested_to_backed(p, &self.table_context) { - None => continue, - Some(backed) => res.push(NewBackedCandidate(backed)), - } - } - - res - } - /// Check if there have happened any new misbehaviors and issue necessary messages. /// /// TODO: Report multiple misbehaviors (https://github.com/paritytech/polkadot/issues/1387) @@ -641,7 +636,7 @@ impl CandidateBackingJob { { let message = ProvisionerMessage::ProvisionableData( self.parent, - ProvisionableData::BackedCandidate(backed), + ProvisionableData::BackedCandidate(backed.receipt()), ); self.send_to_provisioner(message).await?; } @@ -661,7 +656,7 @@ impl CandidateBackingJob { let _timer = self.metrics.time_process_second(); // Sanity check that candidate is from our assignment. - if candidate.descriptor().para_id != self.assignment { + if Some(candidate.descriptor().para_id) != self.assignment { return Ok(()); } @@ -688,10 +683,16 @@ impl CandidateBackingJob { Ok(()) => (), } } - CandidateBackingMessage::GetBackedCandidates(_, tx) => { + CandidateBackingMessage::GetBackedCandidates(_, requested_candidates, tx) => { let _timer = self.metrics.time_get_backed_candidates(); - let backed = self.get_backed(); + let backed = requested_candidates + .into_iter() + .filter_map(|hash| { + self.table.attested_candidate(&hash, &self.table_context) + .and_then(|attested| table_attested_to_backed(attested, &self.table_context)) + }) + .collect(); tx.send(backed).map_err(|data| Error::Send(data))?; } @@ -750,7 +751,7 @@ impl CandidateBackingJob { ) -> Result<(), Error> { if let Some(summary) = self.import_statement(&statement).await? { if let Statement::Seconded(_) = statement.payload() { - if summary.group_id == self.assignment { + if Some(summary.group_id) == self.assignment { self.kick_off_validation_work(summary).await?; } } @@ -850,15 +851,15 @@ impl util::JobTrait for CandidateBackingJob { } let (validators, groups, session_index, cores) = futures::try_join!( - request_validators(parent, &mut tx_from).await?, - request_validator_groups(parent, &mut tx_from).await?, - request_session_index_for_child(parent, &mut tx_from).await?, - request_from_runtime( + try_runtime_api!(request_validators(parent, &mut tx_from).await), + try_runtime_api!(request_validator_groups(parent, &mut tx_from).await), + try_runtime_api!(request_session_index_for_child(parent, &mut tx_from).await), + try_runtime_api!(request_from_runtime( parent, &mut tx_from, |tx| RuntimeApiRequest::AvailabilityCores(tx), - ).await?, - )?; + ).await), + ).map_err(Error::JoinMultiple)?; let validators = try_runtime_api!(validators); let (validator_groups, group_rotation_info) = try_runtime_api!(groups); @@ -911,8 +912,8 @@ impl util::JobTrait for CandidateBackingJob { }; let (assignment, required_collator) = match assignment { - None => return Ok(()), // no need to work. - Some(r) => r, + None => (None, None), + Some((assignment, required_collator)) => (Some(assignment), required_collator), }; let (background_tx, background_rx) = mpsc::channel(16); @@ -1492,22 +1493,10 @@ mod tests { AllMessages::Provisioner( ProvisionerMessage::ProvisionableData( _, - ProvisionableData::BackedCandidate(BackedCandidate { - candidate, - validity_votes, - validator_indices, - }) + ProvisionableData::BackedCandidate(candidate_receipt) ) - ) if candidate == candidate_a => { - assert_eq!(validity_votes.len(), 3); - - assert!(validity_votes.contains( - &ValidityAttestation::Explicit(signed_b.signature().clone()) - )); - assert!(validity_votes.contains( - &ValidityAttestation::Implicit(signed_a.signature().clone()) - )); - assert_eq!(validator_indices, bitvec::bitvec![Lsb0, u8; 1, 1, 0, 1]); + ) => { + assert_eq!(candidate_receipt, candidate_a.to_plain()); } ); @@ -2190,6 +2179,7 @@ mod tests { let (tx, rx) = oneshot::channel(); let msg = CandidateBackingMessage::GetBackedCandidates( test_state.relay_parent, + vec![candidate.hash()], tx, ); diff --git a/polkadot/node/core/provisioner/src/lib.rs b/polkadot/node/core/provisioner/src/lib.rs index 560764453a..be3f38ba1e 100644 --- a/polkadot/node/core/provisioner/src/lib.rs +++ b/polkadot/node/core/provisioner/src/lib.rs @@ -26,14 +26,17 @@ use futures::{ }; use polkadot_node_subsystem::{ errors::{ChainApiError, RuntimeApiError}, - messages::{ChainApiMessage, ProvisionableData, ProvisionerInherentData, ProvisionerMessage, AllMessages}, + messages::{ + AllMessages, CandidateBackingMessage, ChainApiMessage, ProvisionableData, ProvisionerInherentData, + ProvisionerMessage, + }, }; use polkadot_node_subsystem_util::{ self as util, delegated_subsystem, FromJobCommand, request_availability_cores, request_persisted_validation_data, JobTrait, metrics::{self, prometheus}, }; use polkadot_primitives::v1::{ - BackedCandidate, BlockNumber, CoreState, Hash, OccupiedCoreAssumption, + BackedCandidate, BlockNumber, CandidateReceipt, CoreState, Hash, OccupiedCoreAssumption, SignedAvailabilityBitfield, ValidatorIndex, }; use std::{pin::Pin, collections::BTreeMap}; @@ -82,7 +85,7 @@ struct ProvisioningJob { sender: mpsc::Sender, receiver: mpsc::Receiver, provisionable_data_channels: Vec>, - backed_candidates: Vec, + backed_candidates: Vec, signed_bitfields: Vec, metrics: Metrics, inherent_after: InherentAfter, @@ -94,8 +97,17 @@ enum Error { #[error(transparent)] Util(#[from] util::Error), - #[error(transparent)] - OneshotRecv(#[from] oneshot::Canceled), + #[error("failed to get availability cores")] + CanceledAvailabilityCores(#[source] oneshot::Canceled), + + #[error("failed to get persisted validation data")] + CanceledPersistedValidationData(#[source] oneshot::Canceled), + + #[error("failed to get block number")] + CanceledBlockNumber(#[source] oneshot::Canceled), + + #[error("failed to get backed candidates")] + CanceledBackedCandidates(#[source] oneshot::Canceled), #[error(transparent)] ChainApi(#[from] ChainApiError), @@ -103,11 +115,17 @@ enum Error { #[error(transparent)] Runtime(#[from] RuntimeApiError), - #[error("Failed to send message to ChainAPI")] + #[error("failed to send message to ChainAPI")] ChainApiMessageSend(#[source] mpsc::SendError), - #[error("Failed to send return message with Inherents")] + #[error("failed to send message to CandidateBacking to get backed candidates")] + GetBackedCandidatesSend(#[source] mpsc::SendError), + + #[error("failed to send return message with Inherents")] InherentDataReturnChannel, + + #[error("backed candidate does not correspond to selected candidate; check logic in provisioner")] + BackedCandidateOrderingProblem, } impl JobTrait for ProvisioningJob { @@ -291,13 +309,13 @@ type CoreAvailability = BitVec; async fn send_inherent_data( relay_parent: Hash, bitfields: &[SignedAvailabilityBitfield], - candidates: &[BackedCandidate], + candidates: &[CandidateReceipt], return_senders: Vec>, from_job: &mut mpsc::Sender, ) -> Result<(), Error> { let availability_cores = request_availability_cores(relay_parent, from_job) .await? - .await??; + .await.map_err(|err| Error::CanceledAvailabilityCores(err))??; let bitfields = select_availability_bitfields(&availability_cores, bitfields); let candidates = select_candidates( @@ -363,7 +381,7 @@ fn select_availability_bitfields( async fn select_candidates( availability_cores: &[CoreState], bitfields: &[SignedAvailabilityBitfield], - candidates: &[BackedCandidate], + candidates: &[CandidateReceipt], relay_parent: Hash, sender: &mut mpsc::Sender, ) -> Result, Error> { @@ -403,7 +421,7 @@ async fn select_candidates( sender, ) .await? - .await?? + .await.map_err(|err| Error::CanceledPersistedValidationData(err))?? { Some(v) => v, None => continue, @@ -413,15 +431,40 @@ async fn select_candidates( // we arbitrarily pick the first of the backed candidates which match the appropriate selection criteria if let Some(candidate) = candidates.iter().find(|backed_candidate| { - let descriptor = &backed_candidate.candidate.descriptor; + let descriptor = &backed_candidate.descriptor; descriptor.para_id == scheduled_core.para_id && descriptor.persisted_validation_data_hash == computed_validation_data_hash }) { - selected_candidates.push(candidate.clone()); + selected_candidates.push(candidate.hash()); } } - Ok(selected_candidates) + // now get the backed candidates corresponding to these candidate receipts + let (tx, rx) = oneshot::channel(); + sender.send(AllMessages::CandidateBacking(CandidateBackingMessage::GetBackedCandidates( + relay_parent, + selected_candidates.clone(), + tx, + )).into()).await.map_err(|err| Error::GetBackedCandidatesSend(err))?; + let candidates = rx.await.map_err(|err| Error::CanceledBackedCandidates(err))?; + + // `selected_candidates` is generated in ascending order by core index, and `GetBackedCandidates` + // _should_ preserve that property, but let's just make sure. + // + // We can't easily map from `BackedCandidate` to `core_idx`, but we know that every selected candidate + // maps to either 0 or 1 backed candidate, and the hashes correspond. Therefore, by checking them + // in order, we can ensure that the backed candidates are also in order. + let mut backed_idx = 0; + for selected in selected_candidates.iter() { + if *selected == candidates.get(backed_idx).ok_or(Error::BackedCandidateOrderingProblem)?.hash() { + backed_idx += 1; + } + } + if candidates.len() != backed_idx { + Err(Error::BackedCandidateOrderingProblem)?; + } + + Ok(candidates) } /// Produces a block number 1 higher than that of the relay parent @@ -439,7 +482,7 @@ async fn get_block_number_under_construction( )).into()) .await .map_err(|e| Error::ChainApiMessageSend(e))?; - match rx.await? { + match rx.await.map_err(|err| Error::CanceledBlockNumber(err))? { Ok(Some(n)) => Ok(n + 1), Ok(None) => Ok(0), Err(err) => Err(err.into()), diff --git a/polkadot/node/subsystem-util/src/lib.rs b/polkadot/node/subsystem-util/src/lib.rs index 8abfcad206..82c3310f21 100644 --- a/polkadot/node/subsystem-util/src/lib.rs +++ b/polkadot/node/subsystem-util/src/lib.rs @@ -610,7 +610,10 @@ impl Jobs { async fn send_msg(&mut self, parent_hash: Hash, msg: Job::ToJob) { if let Entry::Occupied(mut job) = self.running.entry(parent_hash) { if job.get_mut().send_msg(msg).await.is_err() { - tracing::debug!(job = Job::NAME, "failed to send message to job, will remove it"); + tracing::warn!( + job = Job::NAME, + relay_parent = ?parent_hash, + "failed to send message to job, will remove it"); job.remove(); } } diff --git a/polkadot/node/subsystem/src/messages.rs b/polkadot/node/subsystem/src/messages.rs index 727814381f..3b7ee32dda 100644 --- a/polkadot/node/subsystem/src/messages.rs +++ b/polkadot/node/subsystem/src/messages.rs @@ -47,10 +47,6 @@ pub trait BoundToRelayParent { fn relay_parent(&self) -> Hash; } -/// A notification of a new backed candidate. -#[derive(Debug)] -pub struct NewBackedCandidate(pub BackedCandidate); - /// Messages received by the Candidate Selection subsystem. #[derive(Debug)] pub enum CandidateSelectionMessage { @@ -81,7 +77,7 @@ impl Default for CandidateSelectionMessage { pub enum CandidateBackingMessage { /// Requests a set of backable candidates that could be backed in a child of the given /// relay-parent, referenced by its hash. - GetBackedCandidates(Hash, oneshot::Sender>), + GetBackedCandidates(Hash, Vec, oneshot::Sender>), /// Note that the Candidate Backing subsystem should second the given candidate in the context of the /// given relay-parent (ref. by hash). This candidate must be validated. Second(Hash, CandidateReceipt, PoV), @@ -93,7 +89,7 @@ pub enum CandidateBackingMessage { impl BoundToRelayParent for CandidateBackingMessage { fn relay_parent(&self) -> Hash { match self { - Self::GetBackedCandidates(hash, _) => *hash, + Self::GetBackedCandidates(hash, _, _) => *hash, Self::Second(hash, _, _) => *hash, Self::Statement(hash, _) => *hash, } @@ -497,7 +493,7 @@ pub enum ProvisionableData { /// This bitfield indicates the availability of various candidate blocks. Bitfield(Hash, SignedAvailabilityBitfield), /// The Candidate Backing subsystem believes that this candidate is valid, pending availability. - BackedCandidate(BackedCandidate), + BackedCandidate(CandidateReceipt), /// Misbehavior reports are self-contained proofs of validator misbehavior. MisbehaviorReport(Hash, MisbehaviorReport), /// Disputes trigger a broad dispute resolution process. diff --git a/polkadot/primitives/src/v1.rs b/polkadot/primitives/src/v1.rs index 947c595036..1b46980684 100644 --- a/polkadot/primitives/src/v1.rs +++ b/polkadot/primitives/src/v1.rs @@ -416,6 +416,16 @@ impl BackedCandidate { pub fn descriptor(&self) -> &CandidateDescriptor { &self.candidate.descriptor } + + /// Compute this candidate's hash. + pub fn hash(&self) -> CandidateHash where H: Clone + Encode { + self.candidate.hash() + } + + /// Get this candidate's receipt. + pub fn receipt(&self) -> CandidateReceipt where H: Clone { + self.candidate.to_plain() + } } /// Verify the backing of the given candidate. diff --git a/polkadot/roadmap/implementers-guide/src/node/backing/candidate-backing.md b/polkadot/roadmap/implementers-guide/src/node/backing/candidate-backing.md index 3acf058a7d..016c509674 100644 --- a/polkadot/roadmap/implementers-guide/src/node/backing/candidate-backing.md +++ b/polkadot/roadmap/implementers-guide/src/node/backing/candidate-backing.md @@ -67,7 +67,7 @@ The goal of a Candidate Backing Job is to produce as many backable candidates as ```rust match msg { - CetBackedCandidates(hash, tx) => { + GetBackedCandidates(hashes, tx) => { // Send back a set of backable candidates. } CandidateBackingMessage::Second(hash, candidate) => { @@ -88,7 +88,7 @@ match msg { } ``` -Add `Seconded` statements and `Valid` statements to a quorum. If quorum reaches validator-group majority, send a [`ProvisionerMessage`][PM]`::ProvisionableData(ProvisionableData::BackedCandidate(BackedCandidate))` message. +Add `Seconded` statements and `Valid` statements to a quorum. If quorum reaches validator-group majority, send a [`ProvisionerMessage`][PM]`::ProvisionableData(ProvisionableData::BackedCandidate(CandidateReceipt))` message. `Invalid` statements that conflict with already witnessed `Seconded` and `Valid` statements for the given candidate, statements that are double-votes, self-contradictions and so on, should result in issuing a [`ProvisionerMessage`][PM]`::MisbehaviorReport` message for each newly detected case of this kind. ### Validating Candidates. diff --git a/polkadot/roadmap/implementers-guide/src/types/overseer-protocol.md b/polkadot/roadmap/implementers-guide/src/types/overseer-protocol.md index 6debd20523..44f62c3e8d 100644 --- a/polkadot/roadmap/implementers-guide/src/types/overseer-protocol.md +++ b/polkadot/roadmap/implementers-guide/src/types/overseer-protocol.md @@ -211,7 +211,7 @@ enum BitfieldSigningMessage { } enum CandidateBackingMessage { /// Requests a set of backable candidates that could be backed in a child of the given /// relay-parent, referenced by its hash. - GetBackedCandidates(Hash, ResponseChannel>), + GetBackedCandidates(Hash, Vec, ResponseChannel>), /// Note that the Candidate Backing subsystem should second the given candidate in the context of the /// given relay-parent (ref. by hash). This candidate must be validated using the provided PoV. /// The PoV is expected to match the `pov_hash` in the descriptor. @@ -384,7 +384,7 @@ enum ProvisionableData { /// This bitfield indicates the availability of various candidate blocks. Bitfield(Hash, SignedAvailabilityBitfield), /// The Candidate Backing subsystem believes that this candidate is valid, pending availability. - BackedCandidate(BackedCandidate), + BackedCandidate(CandidateReceipt), /// Misbehavior reports are self-contained proofs of validator misbehavior. MisbehaviorReport(Hash, MisbehaviorReport), /// Disputes trigger a broad dispute resolution process.