Move PVF timeouts to executor environment parameters (#6823)

* Move PVF timeouts to executor environment parameters

* Typo

Co-authored-by: Marcin S. <marcin@realemail.net>

* Fix comments

* Change handle_import_statements to FatalResult (#6820)

* Changing dispute db errors to fatal

* fmt

* Change node-key for bootnodes (#6772)

* Additional tracing in `provisioner`, `vote_selection` and `dispute-coordinator` (#6775)

* Additional tracing in `provisioner`, `vote_selection`

* Add `fetched_onchain_disputes` metric to provisioner

* Some tracelines in dispute-coordinator

TODO: cherry pick this in the initial branch!!!

* Remove spammy logs

* Remove some trace lines

* Rename and fix things

* Fix comments

* Typo

* Minor fixes

* Add codec indexes; Remove macro

---------

Co-authored-by: Marcin S. <marcin@realemail.net>
Co-authored-by: Bradley Olson <34992650+BradleyOlson64@users.noreply.github.com>
Co-authored-by: Petr Mensik <petr.mensik1@gmail.com>
Co-authored-by: Tsvetomir Dimitrov <tsvetomir@parity.io>
This commit is contained in:
s0me0ne-unkn0wn
2023-03-08 23:43:51 +01:00
committed by GitHub
parent 1c2215a75a
commit 03d4af104f
25 changed files with 359 additions and 415 deletions
@@ -26,7 +26,7 @@ use polkadot_node_primitives::{
approval::{ approval::{
BlockApprovalMeta, DelayTranche, IndirectAssignmentCert, IndirectSignedApprovalVote, BlockApprovalMeta, DelayTranche, IndirectAssignmentCert, IndirectSignedApprovalVote,
}, },
ValidationResult, APPROVAL_EXECUTION_TIMEOUT, ValidationResult,
}; };
use polkadot_node_subsystem::{ use polkadot_node_subsystem::{
errors::RecoveryError, errors::RecoveryError,
@@ -50,8 +50,8 @@ use polkadot_node_subsystem_util::{
}; };
use polkadot_primitives::{ use polkadot_primitives::{
ApprovalVote, BlockNumber, CandidateHash, CandidateIndex, CandidateReceipt, DisputeStatement, ApprovalVote, BlockNumber, CandidateHash, CandidateIndex, CandidateReceipt, DisputeStatement,
GroupIndex, Hash, SessionIndex, SessionInfo, ValidDisputeStatementKind, ValidatorId, GroupIndex, Hash, PvfExecTimeoutKind, SessionIndex, SessionInfo, ValidDisputeStatementKind,
ValidatorIndex, ValidatorPair, ValidatorSignature, ValidatorId, ValidatorIndex, ValidatorPair, ValidatorSignature,
}; };
use sc_keystore::LocalKeystore; use sc_keystore::LocalKeystore;
use sp_application_crypto::Pair; use sp_application_crypto::Pair;
@@ -2399,7 +2399,7 @@ async fn launch_approval<Context>(
validation_code, validation_code,
candidate.clone(), candidate.clone(),
available_data.pov, available_data.pov,
APPROVAL_EXECUTION_TIMEOUT, PvfExecTimeoutKind::Approval,
val_tx, val_tx,
)) ))
.await; .await;
@@ -2427,7 +2427,7 @@ pub async fn handle_double_assignment_import(
}, },
AllMessages::CandidateValidation( AllMessages::CandidateValidation(
CandidateValidationMessage::ValidateFromExhaustive(_, _, _, _, timeout, tx), CandidateValidationMessage::ValidateFromExhaustive(_, _, _, _, timeout, tx),
) if timeout == APPROVAL_EXECUTION_TIMEOUT => { ) if timeout == PvfExecTimeoutKind::Approval => {
tx.send(Ok(ValidationResult::Valid(Default::default(), Default::default()))) tx.send(Ok(ValidationResult::Valid(Default::default(), Default::default())))
.unwrap(); .unwrap();
}, },
+3 -4
View File
@@ -32,7 +32,6 @@ use futures::{
use error::{Error, FatalResult}; use error::{Error, FatalResult};
use polkadot_node_primitives::{ use polkadot_node_primitives::{
AvailableData, InvalidCandidate, PoV, SignedFullStatement, Statement, ValidationResult, AvailableData, InvalidCandidate, PoV, SignedFullStatement, Statement, ValidationResult,
BACKING_EXECUTION_TIMEOUT,
}; };
use polkadot_node_subsystem::{ use polkadot_node_subsystem::{
jaeger, jaeger,
@@ -50,8 +49,8 @@ use polkadot_node_subsystem_util::{
}; };
use polkadot_primitives::{ use polkadot_primitives::{
BackedCandidate, CandidateCommitments, CandidateHash, CandidateReceipt, CollatorId, BackedCandidate, CandidateCommitments, CandidateHash, CandidateReceipt, CollatorId,
CommittedCandidateReceipt, CoreIndex, CoreState, Hash, Id as ParaId, SigningContext, CommittedCandidateReceipt, CoreIndex, CoreState, Hash, Id as ParaId, PvfExecTimeoutKind,
ValidatorId, ValidatorIndex, ValidatorSignature, ValidityAttestation, SigningContext, ValidatorId, ValidatorIndex, ValidatorSignature, ValidityAttestation,
}; };
use sp_keystore::SyncCryptoStorePtr; use sp_keystore::SyncCryptoStorePtr;
use statement_table::{ use statement_table::{
@@ -650,7 +649,7 @@ async fn request_candidate_validation(
.send_message(CandidateValidationMessage::ValidateFromChainState( .send_message(CandidateValidationMessage::ValidateFromChainState(
candidate_receipt, candidate_receipt,
pov, pov,
BACKING_EXECUTION_TIMEOUT, PvfExecTimeoutKind::Backing,
tx, tx,
)) ))
.await; .await;
+10 -10
View File
@@ -32,7 +32,7 @@ use polkadot_node_subsystem::{
use polkadot_node_subsystem_test_helpers as test_helpers; use polkadot_node_subsystem_test_helpers as test_helpers;
use polkadot_primitives::{ use polkadot_primitives::{
CandidateDescriptor, CollatorId, GroupRotationInfo, HeadData, PersistedValidationData, CandidateDescriptor, CollatorId, GroupRotationInfo, HeadData, PersistedValidationData,
ScheduledCore, PvfExecTimeoutKind, ScheduledCore,
}; };
use sp_application_crypto::AppKey; use sp_application_crypto::AppKey;
use sp_keyring::Sr25519Keyring; use sp_keyring::Sr25519Keyring;
@@ -307,7 +307,7 @@ fn backing_second_works() {
timeout, timeout,
tx, tx,
) )
) if pov == pov && &candidate_receipt.descriptor == candidate.descriptor() && timeout == BACKING_EXECUTION_TIMEOUT && candidate.commitments.hash() == candidate_receipt.commitments_hash => { ) if pov == pov && &candidate_receipt.descriptor == candidate.descriptor() && timeout == PvfExecTimeoutKind::Backing && candidate.commitments.hash() == candidate_receipt.commitments_hash => {
tx.send(Ok( tx.send(Ok(
ValidationResult::Valid(CandidateCommitments { ValidationResult::Valid(CandidateCommitments {
head_data: expected_head_data.clone(), head_data: expected_head_data.clone(),
@@ -453,7 +453,7 @@ fn backing_works() {
timeout, timeout,
tx, tx,
) )
) if pov == pov && c.descriptor() == candidate_a.descriptor() && timeout == BACKING_EXECUTION_TIMEOUT && c.commitments_hash == candidate_a_commitments_hash=> { ) if pov == pov && c.descriptor() == candidate_a.descriptor() && timeout == PvfExecTimeoutKind::Backing && c.commitments_hash == candidate_a_commitments_hash=> {
tx.send(Ok( tx.send(Ok(
ValidationResult::Valid(CandidateCommitments { ValidationResult::Valid(CandidateCommitments {
head_data: expected_head_data.clone(), head_data: expected_head_data.clone(),
@@ -625,7 +625,7 @@ fn backing_works_while_validation_ongoing() {
timeout, timeout,
tx, tx,
) )
) if pov == pov && c.descriptor() == candidate_a.descriptor() && timeout == BACKING_EXECUTION_TIMEOUT && candidate_a_commitments_hash == c.commitments_hash => { ) if pov == pov && c.descriptor() == candidate_a.descriptor() && timeout == PvfExecTimeoutKind::Backing && candidate_a_commitments_hash == c.commitments_hash => {
// we never validate the candidate. our local node // we never validate the candidate. our local node
// shouldn't issue any statements. // shouldn't issue any statements.
std::mem::forget(tx); std::mem::forget(tx);
@@ -777,7 +777,7 @@ fn backing_misbehavior_works() {
timeout, timeout,
tx, tx,
) )
) if pov == pov && c.descriptor() == candidate_a.descriptor() && timeout == BACKING_EXECUTION_TIMEOUT && candidate_a_commitments_hash == c.commitments_hash => { ) if pov == pov && c.descriptor() == candidate_a.descriptor() && timeout == PvfExecTimeoutKind::Backing && candidate_a_commitments_hash == c.commitments_hash => {
tx.send(Ok( tx.send(Ok(
ValidationResult::Valid(CandidateCommitments { ValidationResult::Valid(CandidateCommitments {
head_data: expected_head_data.clone(), head_data: expected_head_data.clone(),
@@ -921,7 +921,7 @@ fn backing_dont_second_invalid() {
timeout, timeout,
tx, tx,
) )
) if pov == pov && c.descriptor() == candidate_a.descriptor() && timeout == BACKING_EXECUTION_TIMEOUT => { ) if pov == pov && c.descriptor() == candidate_a.descriptor() && timeout == PvfExecTimeoutKind::Backing => {
tx.send(Ok(ValidationResult::Invalid(InvalidCandidate::BadReturn))).unwrap(); tx.send(Ok(ValidationResult::Invalid(InvalidCandidate::BadReturn))).unwrap();
} }
); );
@@ -950,7 +950,7 @@ fn backing_dont_second_invalid() {
timeout, timeout,
tx, tx,
) )
) if pov == pov && c.descriptor() == candidate_b.descriptor() && timeout == BACKING_EXECUTION_TIMEOUT => { ) if pov == pov && c.descriptor() == candidate_b.descriptor() && timeout == PvfExecTimeoutKind::Backing => {
tx.send(Ok( tx.send(Ok(
ValidationResult::Valid(CandidateCommitments { ValidationResult::Valid(CandidateCommitments {
head_data: expected_head_data.clone(), head_data: expected_head_data.clone(),
@@ -1065,7 +1065,7 @@ fn backing_second_after_first_fails_works() {
timeout, timeout,
tx, tx,
) )
) if pov == pov && c.descriptor() == candidate.descriptor() && timeout == BACKING_EXECUTION_TIMEOUT && c.commitments_hash == candidate.commitments.hash() => { ) if pov == pov && c.descriptor() == candidate.descriptor() && timeout == PvfExecTimeoutKind::Backing && c.commitments_hash == candidate.commitments.hash() => {
tx.send(Ok(ValidationResult::Invalid(InvalidCandidate::BadReturn))).unwrap(); tx.send(Ok(ValidationResult::Invalid(InvalidCandidate::BadReturn))).unwrap();
} }
); );
@@ -1191,7 +1191,7 @@ fn backing_works_after_failed_validation() {
timeout, timeout,
tx, tx,
) )
) if pov == pov && c.descriptor() == candidate.descriptor() && timeout == BACKING_EXECUTION_TIMEOUT && c.commitments_hash == candidate.commitments.hash() => { ) if pov == pov && c.descriptor() == candidate.descriptor() && timeout == PvfExecTimeoutKind::Backing && c.commitments_hash == candidate.commitments.hash() => {
tx.send(Err(ValidationFailed("Internal test error".into()))).unwrap(); tx.send(Err(ValidationFailed("Internal test error".into()))).unwrap();
} }
); );
@@ -1544,7 +1544,7 @@ fn retry_works() {
timeout, timeout,
_tx, _tx,
) )
) if pov == pov && c.descriptor() == candidate.descriptor() && timeout == BACKING_EXECUTION_TIMEOUT && c.commitments_hash == candidate.commitments.hash() ) if pov == pov && c.descriptor() == candidate.descriptor() && timeout == PvfExecTimeoutKind::Backing && c.commitments_hash == candidate.commitments.hash()
); );
virtual_overseer virtual_overseer
}); });
@@ -24,7 +24,7 @@
#![warn(missing_docs)] #![warn(missing_docs)]
use polkadot_node_core_pvf::{ use polkadot_node_core_pvf::{
InvalidCandidate as WasmInvalidCandidate, PrepareError, PrepareStats, PvfWithExecutorParams, InvalidCandidate as WasmInvalidCandidate, PrepareError, PrepareStats, PvfPrepData,
ValidationError, ValidationHost, ValidationError, ValidationHost,
}; };
use polkadot_node_primitives::{ use polkadot_node_primitives::{
@@ -43,7 +43,8 @@ use polkadot_node_subsystem_util::executor_params_at_relay_parent;
use polkadot_parachain::primitives::{ValidationParams, ValidationResult as WasmValidationResult}; use polkadot_parachain::primitives::{ValidationParams, ValidationResult as WasmValidationResult};
use polkadot_primitives::{ use polkadot_primitives::{
vstaging::ExecutorParams, CandidateCommitments, CandidateDescriptor, CandidateReceipt, Hash, vstaging::ExecutorParams, CandidateCommitments, CandidateDescriptor, CandidateReceipt, Hash,
OccupiedCoreAssumption, PersistedValidationData, ValidationCode, ValidationCodeHash, OccupiedCoreAssumption, PersistedValidationData, PvfExecTimeoutKind, PvfPrepTimeoutKind,
ValidationCode, ValidationCodeHash,
}; };
use parity_scale_codec::Encode; use parity_scale_codec::Encode;
@@ -68,6 +69,13 @@ const PVF_EXECUTION_RETRY_DELAY: Duration = Duration::from_secs(3);
#[cfg(test)] #[cfg(test)]
const PVF_EXECUTION_RETRY_DELAY: Duration = Duration::from_millis(200); const PVF_EXECUTION_RETRY_DELAY: Duration = Duration::from_millis(200);
// Default PVF timeouts. Must never be changed! Use executor environment parameters in
// `session_info` pallet to adjust them. See also `PvfTimeoutKind` docs.
const DEFAULT_PRECHECK_PREPARATION_TIMEOUT: Duration = Duration::from_secs(60);
const DEFAULT_LENIENT_PREPARATION_TIMEOUT: Duration = Duration::from_secs(360);
const DEFAULT_BACKING_EXECUTION_TIMEOUT: Duration = Duration::from_secs(2);
const DEFAULT_APPROVAL_EXECUTION_TIMEOUT: Duration = Duration::from_secs(12);
/// Configuration for the candidate validation subsystem /// Configuration for the candidate validation subsystem
#[derive(Clone)] #[derive(Clone)]
pub struct Config { pub struct Config {
@@ -330,18 +338,20 @@ where
return PreCheckOutcome::Invalid return PreCheckOutcome::Invalid
}; };
let pvf_with_params = match sp_maybe_compressed_blob::decompress( let timeout = pvf_prep_timeout(&executor_params, PvfPrepTimeoutKind::Precheck);
let pvf = match sp_maybe_compressed_blob::decompress(
&validation_code.0, &validation_code.0,
VALIDATION_CODE_BOMB_LIMIT, VALIDATION_CODE_BOMB_LIMIT,
) { ) {
Ok(code) => PvfWithExecutorParams::from_code(code.into_owned(), executor_params), Ok(code) => PvfPrepData::from_code(code.into_owned(), executor_params, timeout),
Err(e) => { Err(e) => {
gum::debug!(target: LOG_TARGET, err=?e, "precheck: cannot decompress validation code"); gum::debug!(target: LOG_TARGET, err=?e, "precheck: cannot decompress validation code");
return PreCheckOutcome::Invalid return PreCheckOutcome::Invalid
}, },
}; };
match validation_backend.precheck_pvf(pvf_with_params).await { match validation_backend.precheck_pvf(pvf).await {
Ok(_) => PreCheckOutcome::Valid, Ok(_) => PreCheckOutcome::Valid,
Err(prepare_err) => Err(prepare_err) =>
if prepare_err.is_deterministic() { if prepare_err.is_deterministic() {
@@ -465,7 +475,7 @@ async fn validate_from_chain_state<Sender>(
validation_host: ValidationHost, validation_host: ValidationHost,
candidate_receipt: CandidateReceipt, candidate_receipt: CandidateReceipt,
pov: Arc<PoV>, pov: Arc<PoV>,
timeout: Duration, exec_timeout_kind: PvfExecTimeoutKind,
metrics: &Metrics, metrics: &Metrics,
) -> Result<ValidationResult, ValidationFailed> ) -> Result<ValidationResult, ValidationFailed>
where where
@@ -485,7 +495,7 @@ where
validation_code, validation_code,
candidate_receipt.clone(), candidate_receipt.clone(),
pov, pov,
timeout, exec_timeout_kind,
metrics, metrics,
) )
.await; .await;
@@ -521,7 +531,7 @@ async fn validate_candidate_exhaustive<Sender>(
validation_code: ValidationCode, validation_code: ValidationCode,
candidate_receipt: CandidateReceipt, candidate_receipt: CandidateReceipt,
pov: Arc<PoV>, pov: Arc<PoV>,
timeout: Duration, exec_timeout_kind: PvfExecTimeoutKind,
metrics: &Metrics, metrics: &Metrics,
) -> Result<ValidationResult, ValidationFailed> ) -> Result<ValidationResult, ValidationFailed>
where where
@@ -606,7 +616,7 @@ where
let result = validation_backend let result = validation_backend
.validate_candidate_with_retry( .validate_candidate_with_retry(
raw_validation_code.to_vec(), raw_validation_code.to_vec(),
timeout, pvf_exec_timeout(&executor_params, exec_timeout_kind),
params, params,
executor_params, executor_params,
) )
@@ -667,8 +677,8 @@ trait ValidationBackend {
/// Tries executing a PVF a single time (no retries). /// Tries executing a PVF a single time (no retries).
async fn validate_candidate( async fn validate_candidate(
&mut self, &mut self,
pvf_with_params: PvfWithExecutorParams, pvf: PvfPrepData,
timeout: Duration, exec_timeout: Duration,
encoded_params: Vec<u8>, encoded_params: Vec<u8>,
) -> Result<WasmValidationResult, ValidationError>; ) -> Result<WasmValidationResult, ValidationError>;
@@ -677,16 +687,16 @@ trait ValidationBackend {
async fn validate_candidate_with_retry( async fn validate_candidate_with_retry(
&mut self, &mut self,
raw_validation_code: Vec<u8>, raw_validation_code: Vec<u8>,
timeout: Duration, exec_timeout: Duration,
params: ValidationParams, params: ValidationParams,
executor_params: ExecutorParams, executor_params: ExecutorParams,
) -> Result<WasmValidationResult, ValidationError> { ) -> Result<WasmValidationResult, ValidationError> {
// Construct the PVF a single time, since it is an expensive operation. Cloning it is cheap. // Construct the PVF a single time, since it is an expensive operation. Cloning it is cheap.
let pvf_with_params = let prep_timeout = pvf_prep_timeout(&executor_params, PvfPrepTimeoutKind::Lenient);
PvfWithExecutorParams::from_code(raw_validation_code, executor_params); let pvf = PvfPrepData::from_code(raw_validation_code, executor_params, prep_timeout);
let mut validation_result = let mut validation_result =
self.validate_candidate(pvf_with_params.clone(), timeout, params.encode()).await; self.validate_candidate(pvf.clone(), exec_timeout, params.encode()).await;
// If we get an AmbiguousWorkerDeath error, retry once after a brief delay, on the // If we get an AmbiguousWorkerDeath error, retry once after a brief delay, on the
// assumption that the conditions that caused this error may have been transient. Note that // assumption that the conditions that caused this error may have been transient. Note that
@@ -699,23 +709,19 @@ trait ValidationBackend {
gum::warn!( gum::warn!(
target: LOG_TARGET, target: LOG_TARGET,
?pvf_with_params, ?pvf,
"Re-trying failed candidate validation due to AmbiguousWorkerDeath." "Re-trying failed candidate validation due to AmbiguousWorkerDeath."
); );
// Encode the params again when re-trying. We expect the retry case to be relatively // Encode the params again when re-trying. We expect the retry case to be relatively
// rare, and we want to avoid unconditionally cloning data. // rare, and we want to avoid unconditionally cloning data.
validation_result = validation_result = self.validate_candidate(pvf, exec_timeout, params.encode()).await;
self.validate_candidate(pvf_with_params, timeout, params.encode()).await;
} }
validation_result validation_result
} }
async fn precheck_pvf( async fn precheck_pvf(&mut self, pvf: PvfPrepData) -> Result<PrepareStats, PrepareError>;
&mut self,
pvf_with_params: PvfWithExecutorParams,
) -> Result<PrepareStats, PrepareError>;
} }
#[async_trait] #[async_trait]
@@ -723,16 +729,14 @@ impl ValidationBackend for ValidationHost {
/// Tries executing a PVF a single time (no retries). /// Tries executing a PVF a single time (no retries).
async fn validate_candidate( async fn validate_candidate(
&mut self, &mut self,
pvf_with_params: PvfWithExecutorParams, pvf: PvfPrepData,
timeout: Duration, exec_timeout: Duration,
encoded_params: Vec<u8>, encoded_params: Vec<u8>,
) -> Result<WasmValidationResult, ValidationError> { ) -> Result<WasmValidationResult, ValidationError> {
let priority = polkadot_node_core_pvf::Priority::Normal; let priority = polkadot_node_core_pvf::Priority::Normal;
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
if let Err(err) = if let Err(err) = self.execute_pvf(pvf, exec_timeout, encoded_params, priority, tx).await {
self.execute_pvf(pvf_with_params, timeout, encoded_params, priority, tx).await
{
return Err(ValidationError::InternalError(format!( return Err(ValidationError::InternalError(format!(
"cannot send pvf to the validation host: {:?}", "cannot send pvf to the validation host: {:?}",
err err
@@ -743,12 +747,9 @@ impl ValidationBackend for ValidationHost {
.map_err(|_| ValidationError::InternalError("validation was cancelled".into()))? .map_err(|_| ValidationError::InternalError("validation was cancelled".into()))?
} }
async fn precheck_pvf( async fn precheck_pvf(&mut self, pvf: PvfPrepData) -> Result<PrepareStats, PrepareError> {
&mut self,
pvf_with_params: PvfWithExecutorParams,
) -> Result<PrepareStats, PrepareError> {
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
if let Err(err) = self.precheck_pvf(pvf_with_params, tx).await { if let Err(err) = self.precheck_pvf(pvf, tx).await {
// Return an IO error if there was an error communicating with the host. // Return an IO error if there was an error communicating with the host.
return Err(PrepareError::IoErr(err)) return Err(PrepareError::IoErr(err))
} }
@@ -788,3 +789,23 @@ fn perform_basic_checks(
Ok(()) Ok(())
} }
fn pvf_prep_timeout(executor_params: &ExecutorParams, kind: PvfPrepTimeoutKind) -> Duration {
if let Some(timeout) = executor_params.pvf_prep_timeout(kind) {
return timeout
}
match kind {
PvfPrepTimeoutKind::Precheck => DEFAULT_PRECHECK_PREPARATION_TIMEOUT,
PvfPrepTimeoutKind::Lenient => DEFAULT_LENIENT_PREPARATION_TIMEOUT,
}
}
fn pvf_exec_timeout(executor_params: &ExecutorParams, kind: PvfExecTimeoutKind) -> Duration {
if let Some(timeout) = executor_params.pvf_exec_timeout(kind) {
return timeout
}
match kind {
PvfExecTimeoutKind::Backing => DEFAULT_BACKING_EXECUTION_TIMEOUT,
PvfExecTimeoutKind::Approval => DEFAULT_APPROVAL_EXECUTION_TIMEOUT,
}
}
@@ -396,7 +396,7 @@ impl MockValidateCandidateBackend {
impl ValidationBackend for MockValidateCandidateBackend { impl ValidationBackend for MockValidateCandidateBackend {
async fn validate_candidate( async fn validate_candidate(
&mut self, &mut self,
_pvf_with_params: PvfWithExecutorParams, _pvf: PvfPrepData,
_timeout: Duration, _timeout: Duration,
_encoded_params: Vec<u8>, _encoded_params: Vec<u8>,
) -> Result<WasmValidationResult, ValidationError> { ) -> Result<WasmValidationResult, ValidationError> {
@@ -408,10 +408,7 @@ impl ValidationBackend for MockValidateCandidateBackend {
result result
} }
async fn precheck_pvf( async fn precheck_pvf(&mut self, _pvf: PvfPrepData) -> Result<PrepareStats, PrepareError> {
&mut self,
_pvf_with_params: PvfWithExecutorParams,
) -> Result<PrepareStats, PrepareError> {
unreachable!() unreachable!()
} }
} }
@@ -476,7 +473,7 @@ fn candidate_validation_ok_is_ok() {
validation_code, validation_code,
candidate_receipt, candidate_receipt,
Arc::new(pov), Arc::new(pov),
Duration::from_secs(0), PvfExecTimeoutKind::Backing,
&metrics, &metrics,
) )
}) })
@@ -535,7 +532,7 @@ fn candidate_validation_bad_return_is_invalid() {
validation_code, validation_code,
candidate_receipt, candidate_receipt,
Arc::new(pov), Arc::new(pov),
Duration::from_secs(0), PvfExecTimeoutKind::Backing,
&metrics, &metrics,
) )
}); });
@@ -606,7 +603,7 @@ fn candidate_validation_one_ambiguous_error_is_valid() {
validation_code, validation_code,
candidate_receipt, candidate_receipt,
Arc::new(pov), Arc::new(pov),
Duration::from_secs(0), PvfExecTimeoutKind::Backing,
&metrics, &metrics,
) )
}) })
@@ -666,7 +663,7 @@ fn candidate_validation_multiple_ambiguous_errors_is_invalid() {
validation_code, validation_code,
candidate_receipt, candidate_receipt,
Arc::new(pov), Arc::new(pov),
Duration::from_secs(0), PvfExecTimeoutKind::Backing,
&metrics, &metrics,
) )
}) })
@@ -718,7 +715,7 @@ fn candidate_validation_timeout_is_internal_error() {
validation_code, validation_code,
candidate_receipt, candidate_receipt,
Arc::new(pov), Arc::new(pov),
Duration::from_secs(0), PvfExecTimeoutKind::Backing,
&metrics, &metrics,
) )
}); });
@@ -770,7 +767,7 @@ fn candidate_validation_commitment_hash_mismatch_is_invalid() {
validation_code, validation_code,
candidate_receipt, candidate_receipt,
Arc::new(pov), Arc::new(pov),
Duration::from_secs(0), PvfExecTimeoutKind::Backing,
&metrics, &metrics,
) )
}) })
@@ -821,7 +818,7 @@ fn candidate_validation_code_mismatch_is_invalid() {
validation_code, validation_code,
candidate_receipt, candidate_receipt,
Arc::new(pov), Arc::new(pov),
Duration::from_secs(0), PvfExecTimeoutKind::Backing,
&Default::default(), &Default::default(),
)) ))
.unwrap(); .unwrap();
@@ -884,7 +881,7 @@ fn compressed_code_works() {
validation_code, validation_code,
candidate_receipt, candidate_receipt,
Arc::new(pov), Arc::new(pov),
Duration::from_secs(0), PvfExecTimeoutKind::Backing,
&metrics, &metrics,
) )
}); });
@@ -937,7 +934,7 @@ fn code_decompression_failure_is_error() {
validation_code, validation_code,
candidate_receipt, candidate_receipt,
Arc::new(pov), Arc::new(pov),
Duration::from_secs(0), PvfExecTimeoutKind::Backing,
&Default::default(), &Default::default(),
)); ));
@@ -990,7 +987,7 @@ fn pov_decompression_failure_is_invalid() {
validation_code, validation_code,
candidate_receipt, candidate_receipt,
Arc::new(pov), Arc::new(pov),
Duration::from_secs(0), PvfExecTimeoutKind::Backing,
&Default::default(), &Default::default(),
)); ));
@@ -1011,17 +1008,14 @@ impl MockPreCheckBackend {
impl ValidationBackend for MockPreCheckBackend { impl ValidationBackend for MockPreCheckBackend {
async fn validate_candidate( async fn validate_candidate(
&mut self, &mut self,
_pvf_with_params: PvfWithExecutorParams, _pvf: PvfPrepData,
_timeout: Duration, _timeout: Duration,
_encoded_params: Vec<u8>, _encoded_params: Vec<u8>,
) -> Result<WasmValidationResult, ValidationError> { ) -> Result<WasmValidationResult, ValidationError> {
unreachable!() unreachable!()
} }
async fn precheck_pvf( async fn precheck_pvf(&mut self, _pvf: PvfPrepData) -> Result<PrepareStats, PrepareError> {
&mut self,
_pvf_with_params: PvfWithExecutorParams,
) -> Result<PrepareStats, PrepareError> {
self.result.clone() self.result.clone()
} }
} }
@@ -25,13 +25,15 @@ use futures::{
#[cfg(test)] #[cfg(test)]
use futures_timer::Delay; use futures_timer::Delay;
use polkadot_node_primitives::{ValidationResult, APPROVAL_EXECUTION_TIMEOUT}; use polkadot_node_primitives::ValidationResult;
use polkadot_node_subsystem::{ use polkadot_node_subsystem::{
messages::{AvailabilityRecoveryMessage, CandidateValidationMessage}, messages::{AvailabilityRecoveryMessage, CandidateValidationMessage},
overseer, ActiveLeavesUpdate, RecoveryError, overseer, ActiveLeavesUpdate, RecoveryError,
}; };
use polkadot_node_subsystem_util::runtime::get_validation_code_by_hash; use polkadot_node_subsystem_util::runtime::get_validation_code_by_hash;
use polkadot_primitives::{BlockNumber, CandidateHash, CandidateReceipt, Hash, SessionIndex}; use polkadot_primitives::{
BlockNumber, CandidateHash, CandidateReceipt, Hash, PvfExecTimeoutKind, SessionIndex,
};
use crate::LOG_TARGET; use crate::LOG_TARGET;
@@ -348,7 +350,7 @@ async fn participate(
validation_code, validation_code,
req.candidate_receipt().clone(), req.candidate_receipt().clone(),
available_data.pov, available_data.pov,
APPROVAL_EXECUTION_TIMEOUT, PvfExecTimeoutKind::Approval,
validation_tx, validation_tx,
)) ))
.await; .await;
@@ -120,7 +120,7 @@ pub async fn participation_full_happy_path(
ctx_handle.recv().await, ctx_handle.recv().await,
AllMessages::CandidateValidation( AllMessages::CandidateValidation(
CandidateValidationMessage::ValidateFromExhaustive(_, _, candidate_receipt, _, timeout, tx) CandidateValidationMessage::ValidateFromExhaustive(_, _, candidate_receipt, _, timeout, tx)
) if timeout == APPROVAL_EXECUTION_TIMEOUT => { ) if timeout == PvfExecTimeoutKind::Approval => {
if expected_commitments_hash != candidate_receipt.commitments_hash { if expected_commitments_hash != candidate_receipt.commitments_hash {
tx.send(Ok(ValidationResult::Invalid(InvalidCandidate::CommitmentsHashMismatch))).unwrap(); tx.send(Ok(ValidationResult::Invalid(InvalidCandidate::CommitmentsHashMismatch))).unwrap();
} else { } else {
@@ -454,7 +454,7 @@ fn cast_invalid_vote_if_validation_fails_or_is_invalid() {
ctx_handle.recv().await, ctx_handle.recv().await,
AllMessages::CandidateValidation( AllMessages::CandidateValidation(
CandidateValidationMessage::ValidateFromExhaustive(_, _, _, _, timeout, tx) CandidateValidationMessage::ValidateFromExhaustive(_, _, _, _, timeout, tx)
) if timeout == APPROVAL_EXECUTION_TIMEOUT => { ) if timeout == PvfExecTimeoutKind::Approval => {
tx.send(Ok(ValidationResult::Invalid(InvalidCandidate::Timeout))).unwrap(); tx.send(Ok(ValidationResult::Invalid(InvalidCandidate::Timeout))).unwrap();
}, },
"overseer did not receive candidate validation message", "overseer did not receive candidate validation message",
@@ -491,7 +491,7 @@ fn cast_invalid_vote_if_commitments_dont_match() {
ctx_handle.recv().await, ctx_handle.recv().await,
AllMessages::CandidateValidation( AllMessages::CandidateValidation(
CandidateValidationMessage::ValidateFromExhaustive(_, _, _, _, timeout, tx) CandidateValidationMessage::ValidateFromExhaustive(_, _, _, _, timeout, tx)
) if timeout == APPROVAL_EXECUTION_TIMEOUT => { ) if timeout == PvfExecTimeoutKind::Approval => {
tx.send(Ok(ValidationResult::Invalid(InvalidCandidate::CommitmentsHashMismatch))).unwrap(); tx.send(Ok(ValidationResult::Invalid(InvalidCandidate::CommitmentsHashMismatch))).unwrap();
}, },
"overseer did not receive candidate validation message", "overseer did not receive candidate validation message",
@@ -528,7 +528,7 @@ fn cast_valid_vote_if_validation_passes() {
ctx_handle.recv().await, ctx_handle.recv().await,
AllMessages::CandidateValidation( AllMessages::CandidateValidation(
CandidateValidationMessage::ValidateFromExhaustive(_, _, _, _, timeout, tx) CandidateValidationMessage::ValidateFromExhaustive(_, _, _, _, timeout, tx)
) if timeout == APPROVAL_EXECUTION_TIMEOUT => { ) if timeout == PvfExecTimeoutKind::Approval => {
tx.send(Ok(ValidationResult::Valid(dummy_candidate_commitments(None), PersistedValidationData::default()))).unwrap(); tx.send(Ok(ValidationResult::Valid(dummy_candidate_commitments(None), PersistedValidationData::default()))).unwrap();
}, },
"overseer did not receive candidate validation message", "overseer did not receive candidate validation message",
+8 -15
View File
@@ -30,7 +30,6 @@ use futures::{
stream::{FuturesUnordered, StreamExt as _}, stream::{FuturesUnordered, StreamExt as _},
Future, FutureExt, Future, FutureExt,
}; };
use polkadot_node_primitives::BACKING_EXECUTION_TIMEOUT;
use polkadot_primitives::vstaging::{ExecutorParams, ExecutorParamsHash}; use polkadot_primitives::vstaging::{ExecutorParams, ExecutorParamsHash};
use slotmap::HopSlotMap; use slotmap::HopSlotMap;
use std::{ use std::{
@@ -45,8 +44,7 @@ use std::{
/// re-spawn a new worker to execute the job immediately. /// re-spawn a new worker to execute the job immediately.
/// To make any sense and not to break things, the value should be greater than minimal execution /// To make any sense and not to break things, the value should be greater than minimal execution
/// timeout in use, and less than the block time. /// timeout in use, and less than the block time.
const MAX_KEEP_WAITING: Duration = const MAX_KEEP_WAITING: Duration = Duration::from_secs(4);
Duration::from_millis(BACKING_EXECUTION_TIMEOUT.as_millis() as u64 * 2);
slotmap::new_key_type! { struct Worker; } slotmap::new_key_type! { struct Worker; }
@@ -54,7 +52,7 @@ slotmap::new_key_type! { struct Worker; }
pub enum ToQueue { pub enum ToQueue {
Enqueue { Enqueue {
artifact: ArtifactPathId, artifact: ArtifactPathId,
execution_timeout: Duration, exec_timeout: Duration,
params: Vec<u8>, params: Vec<u8>,
executor_params: ExecutorParams, executor_params: ExecutorParams,
result_tx: ResultSender, result_tx: ResultSender,
@@ -63,7 +61,7 @@ pub enum ToQueue {
struct ExecuteJob { struct ExecuteJob {
artifact: ArtifactPathId, artifact: ArtifactPathId,
execution_timeout: Duration, exec_timeout: Duration,
params: Vec<u8>, params: Vec<u8>,
executor_params: ExecutorParams, executor_params: ExecutorParams,
result_tx: ResultSender, result_tx: ResultSender,
@@ -261,8 +259,7 @@ async fn purge_dead(metrics: &Metrics, workers: &mut Workers) {
} }
fn handle_to_queue(queue: &mut Queue, to_queue: ToQueue) { fn handle_to_queue(queue: &mut Queue, to_queue: ToQueue) {
let ToQueue::Enqueue { artifact, execution_timeout, params, executor_params, result_tx } = let ToQueue::Enqueue { artifact, exec_timeout, params, executor_params, result_tx } = to_queue;
to_queue;
gum::debug!( gum::debug!(
target: LOG_TARGET, target: LOG_TARGET,
validation_code_hash = ?artifact.id.code_hash, validation_code_hash = ?artifact.id.code_hash,
@@ -271,7 +268,7 @@ fn handle_to_queue(queue: &mut Queue, to_queue: ToQueue) {
queue.metrics.execute_enqueued(); queue.metrics.execute_enqueued();
let job = ExecuteJob { let job = ExecuteJob {
artifact, artifact,
execution_timeout, exec_timeout,
params, params,
executor_params, executor_params,
result_tx, result_tx,
@@ -457,13 +454,9 @@ fn assign(queue: &mut Queue, worker: Worker, job: ExecuteJob) {
queue.mux.push( queue.mux.push(
async move { async move {
let _timer = execution_timer; let _timer = execution_timer;
let outcome = super::worker::start_work( let outcome =
idle, super::worker::start_work(idle, job.artifact.clone(), job.exec_timeout, job.params)
job.artifact.clone(), .await;
job.execution_timeout,
job.params,
)
.await;
QueueEvent::StartWork(worker, outcome, job.artifact.id, job.result_tx) QueueEvent::StartWork(worker, outcome, job.artifact.id, job.result_tx)
} }
.boxed(), .boxed(),
@@ -126,6 +126,7 @@ fn params_to_wasmtime_semantics(par: &ExecutorParams) -> Result<Semantics, Strin
ExecutorParam::StackLogicalMax(slm) => stack_limit.logical_max = *slm, ExecutorParam::StackLogicalMax(slm) => stack_limit.logical_max = *slm,
ExecutorParam::StackNativeMax(snm) => stack_limit.native_stack_max = *snm, ExecutorParam::StackNativeMax(snm) => stack_limit.native_stack_max = *snm,
ExecutorParam::PrecheckingMaxMemory(_) => (), // TODO: Not implemented yet ExecutorParam::PrecheckingMaxMemory(_) => (), // TODO: Not implemented yet
ExecutorParam::PvfPrepTimeout(_, _) | ExecutorParam::PvfExecTimeout(_, _) => (), // Not used here
} }
} }
sem.deterministic_stack_limit = Some(stack_limit); sem.deterministic_stack_limit = Some(stack_limit);
+69 -119
View File
@@ -25,7 +25,7 @@ use crate::{
error::PrepareError, error::PrepareError,
execute, execute,
metrics::Metrics, metrics::Metrics,
prepare, PrepareResult, Priority, PvfWithExecutorParams, ValidationError, LOG_TARGET, prepare, PrepareResult, Priority, PvfPrepData, ValidationError, LOG_TARGET,
}; };
use always_assert::never; use always_assert::never;
use futures::{ use futures::{
@@ -40,17 +40,6 @@ use std::{
time::{Duration, SystemTime}, time::{Duration, SystemTime},
}; };
/// For prechecking requests, the time period after which the preparation worker is considered
/// unresponsive and will be killed.
// NOTE: If you change this make sure to fix the buckets of `pvf_preparation_time` metric.
pub const PRECHECK_PREPARATION_TIMEOUT: Duration = Duration::from_secs(60);
/// For execution and heads-up requests, the time period after which the preparation worker is
/// considered unresponsive and will be killed. More lenient than the timeout for prechecking to
/// prevent honest validators from timing out on valid PVFs.
// NOTE: If you change this make sure to fix the buckets of `pvf_preparation_time` metric.
pub const LENIENT_PREPARATION_TIMEOUT: Duration = Duration::from_secs(360);
/// The time period after which a failed preparation artifact is considered ready to be retried. /// The time period after which a failed preparation artifact is considered ready to be retried.
/// Note that we will only retry if another request comes in after this cooldown has passed. /// Note that we will only retry if another request comes in after this cooldown has passed.
#[cfg(not(test))] #[cfg(not(test))]
@@ -84,11 +73,11 @@ impl ValidationHost {
/// Returns an error if the request cannot be sent to the validation host, i.e. if it shut down. /// Returns an error if the request cannot be sent to the validation host, i.e. if it shut down.
pub async fn precheck_pvf( pub async fn precheck_pvf(
&mut self, &mut self,
pvf_with_params: PvfWithExecutorParams, pvf: PvfPrepData,
result_tx: PrepareResultSender, result_tx: PrepareResultSender,
) -> Result<(), String> { ) -> Result<(), String> {
self.to_host_tx self.to_host_tx
.send(ToHost::PrecheckPvf { pvf_with_params, result_tx }) .send(ToHost::PrecheckPvf { pvf, result_tx })
.await .await
.map_err(|_| "the inner loop hung up".to_string()) .map_err(|_| "the inner loop hung up".to_string())
} }
@@ -102,16 +91,16 @@ impl ValidationHost {
/// Returns an error if the request cannot be sent to the validation host, i.e. if it shut down. /// Returns an error if the request cannot be sent to the validation host, i.e. if it shut down.
pub async fn execute_pvf( pub async fn execute_pvf(
&mut self, &mut self,
pvf_with_params: PvfWithExecutorParams, pvf: PvfPrepData,
execution_timeout: Duration, exec_timeout: Duration,
params: Vec<u8>, params: Vec<u8>,
priority: Priority, priority: Priority,
result_tx: ResultSender, result_tx: ResultSender,
) -> Result<(), String> { ) -> Result<(), String> {
self.to_host_tx self.to_host_tx
.send(ToHost::ExecutePvf(ExecutePvfInputs { .send(ToHost::ExecutePvf(ExecutePvfInputs {
pvf_with_params, pvf,
execution_timeout, exec_timeout,
params, params,
priority, priority,
result_tx, result_tx,
@@ -126,10 +115,7 @@ impl ValidationHost {
/// situations this function should return immediately. /// situations this function should return immediately.
/// ///
/// Returns an error if the request cannot be sent to the validation host, i.e. if it shut down. /// Returns an error if the request cannot be sent to the validation host, i.e. if it shut down.
pub async fn heads_up( pub async fn heads_up(&mut self, active_pvfs: Vec<PvfPrepData>) -> Result<(), String> {
&mut self,
active_pvfs: Vec<PvfWithExecutorParams>,
) -> Result<(), String> {
self.to_host_tx self.to_host_tx
.send(ToHost::HeadsUp { active_pvfs }) .send(ToHost::HeadsUp { active_pvfs })
.await .await
@@ -138,14 +124,14 @@ impl ValidationHost {
} }
enum ToHost { enum ToHost {
PrecheckPvf { pvf_with_params: PvfWithExecutorParams, result_tx: PrepareResultSender }, PrecheckPvf { pvf: PvfPrepData, result_tx: PrepareResultSender },
ExecutePvf(ExecutePvfInputs), ExecutePvf(ExecutePvfInputs),
HeadsUp { active_pvfs: Vec<PvfWithExecutorParams> }, HeadsUp { active_pvfs: Vec<PvfPrepData> },
} }
struct ExecutePvfInputs { struct ExecutePvfInputs {
pvf_with_params: PvfWithExecutorParams, pvf: PvfPrepData,
execution_timeout: Duration, exec_timeout: Duration,
params: Vec<u8>, params: Vec<u8>,
priority: Priority, priority: Priority,
result_tx: ResultSender, result_tx: ResultSender,
@@ -267,7 +253,7 @@ pub fn start(config: Config, metrics: Metrics) -> (ValidationHost, impl Future<O
/// to the given result sender. /// to the given result sender.
#[derive(Debug)] #[derive(Debug)]
struct PendingExecutionRequest { struct PendingExecutionRequest {
execution_timeout: Duration, exec_timeout: Duration,
params: Vec<u8>, params: Vec<u8>,
executor_params: ExecutorParams, executor_params: ExecutorParams,
result_tx: ResultSender, result_tx: ResultSender,
@@ -282,13 +268,13 @@ impl AwaitingPrepare {
fn add( fn add(
&mut self, &mut self,
artifact_id: ArtifactId, artifact_id: ArtifactId,
execution_timeout: Duration, exec_timeout: Duration,
params: Vec<u8>, params: Vec<u8>,
executor_params: ExecutorParams, executor_params: ExecutorParams,
result_tx: ResultSender, result_tx: ResultSender,
) { ) {
self.0.entry(artifact_id).or_default().push(PendingExecutionRequest { self.0.entry(artifact_id).or_default().push(PendingExecutionRequest {
execution_timeout, exec_timeout,
params, params,
executor_params, executor_params,
result_tx, result_tx,
@@ -427,8 +413,8 @@ async fn handle_to_host(
to_host: ToHost, to_host: ToHost,
) -> Result<(), Fatal> { ) -> Result<(), Fatal> {
match to_host { match to_host {
ToHost::PrecheckPvf { pvf_with_params, result_tx } => { ToHost::PrecheckPvf { pvf, result_tx } => {
handle_precheck_pvf(artifacts, prepare_queue, pvf_with_params, result_tx).await?; handle_precheck_pvf(artifacts, prepare_queue, pvf, result_tx).await?;
}, },
ToHost::ExecutePvf(inputs) => { ToHost::ExecutePvf(inputs) => {
handle_execute_pvf( handle_execute_pvf(
@@ -450,16 +436,17 @@ async fn handle_to_host(
/// Handles PVF prechecking requests. /// Handles PVF prechecking requests.
/// ///
/// This tries to prepare the PVF by compiling the WASM blob within a given timeout ([`PRECHECK_PREPARATION_TIMEOUT`]). /// This tries to prepare the PVF by compiling the WASM blob within a timeout set in
/// `PvfPrepData`.
/// ///
/// If the prepare job failed previously, we may retry it under certain conditions. /// If the prepare job failed previously, we may retry it under certain conditions.
async fn handle_precheck_pvf( async fn handle_precheck_pvf(
artifacts: &mut Artifacts, artifacts: &mut Artifacts,
prepare_queue: &mut mpsc::Sender<prepare::ToQueue>, prepare_queue: &mut mpsc::Sender<prepare::ToQueue>,
pvf_with_params: PvfWithExecutorParams, pvf: PvfPrepData,
result_sender: PrepareResultSender, result_sender: PrepareResultSender,
) -> Result<(), Fatal> { ) -> Result<(), Fatal> {
let artifact_id = pvf_with_params.as_artifact_id(); let artifact_id = pvf.as_artifact_id();
if let Some(state) = artifacts.artifact_state_mut(&artifact_id) { if let Some(state) = artifacts.artifact_state_mut(&artifact_id) {
match state { match state {
@@ -477,15 +464,8 @@ async fn handle_precheck_pvf(
} }
} else { } else {
artifacts.insert_preparing(artifact_id, vec![result_sender]); artifacts.insert_preparing(artifact_id, vec![result_sender]);
send_prepare( send_prepare(prepare_queue, prepare::ToQueue::Enqueue { priority: Priority::Normal, pvf })
prepare_queue, .await?;
prepare::ToQueue::Enqueue {
priority: Priority::Normal,
pvf_with_params,
preparation_timeout: PRECHECK_PREPARATION_TIMEOUT,
},
)
.await?;
} }
Ok(()) Ok(())
} }
@@ -507,9 +487,8 @@ async fn handle_execute_pvf(
awaiting_prepare: &mut AwaitingPrepare, awaiting_prepare: &mut AwaitingPrepare,
inputs: ExecutePvfInputs, inputs: ExecutePvfInputs,
) -> Result<(), Fatal> { ) -> Result<(), Fatal> {
let ExecutePvfInputs { pvf_with_params, execution_timeout, params, priority, result_tx } = let ExecutePvfInputs { pvf, exec_timeout, params, priority, result_tx } = inputs;
inputs; let artifact_id = pvf.as_artifact_id();
let artifact_id = pvf_with_params.as_artifact_id();
if let Some(state) = artifacts.artifact_state_mut(&artifact_id) { if let Some(state) = artifacts.artifact_state_mut(&artifact_id) {
match state { match state {
@@ -521,9 +500,9 @@ async fn handle_execute_pvf(
execute_queue, execute_queue,
execute::ToQueue::Enqueue { execute::ToQueue::Enqueue {
artifact: ArtifactPathId::new(artifact_id, cache_path), artifact: ArtifactPathId::new(artifact_id, cache_path),
execution_timeout, exec_timeout,
params, params,
executor_params: (*pvf_with_params.executor_params()).clone(), executor_params: (*pvf.executor_params()).clone(),
result_tx, result_tx,
}, },
) )
@@ -532,9 +511,9 @@ async fn handle_execute_pvf(
ArtifactState::Preparing { .. } => { ArtifactState::Preparing { .. } => {
awaiting_prepare.add( awaiting_prepare.add(
artifact_id, artifact_id,
execution_timeout, exec_timeout,
params, params,
(*pvf_with_params.executor_params()).clone(), (*pvf.executor_params()).clone(),
result_tx, result_tx,
); );
}, },
@@ -542,7 +521,7 @@ async fn handle_execute_pvf(
if can_retry_prepare_after_failure(*last_time_failed, *num_failures, error) { if can_retry_prepare_after_failure(*last_time_failed, *num_failures, error) {
gum::warn!( gum::warn!(
target: LOG_TARGET, target: LOG_TARGET,
?pvf_with_params, ?pvf,
?artifact_id, ?artifact_id,
?last_time_failed, ?last_time_failed,
%num_failures, %num_failures,
@@ -556,22 +535,15 @@ async fn handle_execute_pvf(
waiting_for_response: Vec::new(), waiting_for_response: Vec::new(),
num_failures: *num_failures, num_failures: *num_failures,
}; };
let executor_params = (*pvf_with_params.executor_params()).clone(); let executor_params = (*pvf.executor_params()).clone();
send_prepare( send_prepare(prepare_queue, prepare::ToQueue::Enqueue { priority, pvf })
prepare_queue, .await?;
prepare::ToQueue::Enqueue {
priority,
pvf_with_params,
preparation_timeout: LENIENT_PREPARATION_TIMEOUT,
},
)
.await?;
// Add an execution request that will wait to run after this prepare job has // Add an execution request that will wait to run after this prepare job has
// finished. // finished.
awaiting_prepare.add( awaiting_prepare.add(
artifact_id, artifact_id,
execution_timeout, exec_timeout,
params, params,
executor_params, executor_params,
result_tx, result_tx,
@@ -584,20 +556,12 @@ async fn handle_execute_pvf(
} else { } else {
// Artifact is unknown: register it and enqueue a job with the corresponding priority and // Artifact is unknown: register it and enqueue a job with the corresponding priority and
// PVF. // PVF.
let executor_params = (*pvf_with_params.executor_params()).clone(); let executor_params = (*pvf.executor_params()).clone();
artifacts.insert_preparing(artifact_id.clone(), Vec::new()); artifacts.insert_preparing(artifact_id.clone(), Vec::new());
send_prepare( send_prepare(prepare_queue, prepare::ToQueue::Enqueue { priority, pvf }).await?;
prepare_queue,
prepare::ToQueue::Enqueue {
priority,
pvf_with_params,
preparation_timeout: LENIENT_PREPARATION_TIMEOUT,
},
)
.await?;
// Add an execution request that will wait to run after this prepare job has finished. // Add an execution request that will wait to run after this prepare job has finished.
awaiting_prepare.add(artifact_id, execution_timeout, params, executor_params, result_tx); awaiting_prepare.add(artifact_id, exec_timeout, params, executor_params, result_tx);
} }
Ok(()) Ok(())
@@ -606,7 +570,7 @@ async fn handle_execute_pvf(
async fn handle_heads_up( async fn handle_heads_up(
artifacts: &mut Artifacts, artifacts: &mut Artifacts,
prepare_queue: &mut mpsc::Sender<prepare::ToQueue>, prepare_queue: &mut mpsc::Sender<prepare::ToQueue>,
active_pvfs: Vec<PvfWithExecutorParams>, active_pvfs: Vec<PvfPrepData>,
) -> Result<(), Fatal> { ) -> Result<(), Fatal> {
let now = SystemTime::now(); let now = SystemTime::now();
@@ -642,8 +606,7 @@ async fn handle_heads_up(
prepare_queue, prepare_queue,
prepare::ToQueue::Enqueue { prepare::ToQueue::Enqueue {
priority: Priority::Normal, priority: Priority::Normal,
pvf_with_params: active_pvf, pvf: active_pvf,
preparation_timeout: LENIENT_PREPARATION_TIMEOUT,
}, },
) )
.await?; .await?;
@@ -656,11 +619,7 @@ async fn handle_heads_up(
send_prepare( send_prepare(
prepare_queue, prepare_queue,
prepare::ToQueue::Enqueue { prepare::ToQueue::Enqueue { priority: Priority::Normal, pvf: active_pvf },
priority: Priority::Normal,
pvf_with_params: active_pvf,
preparation_timeout: LENIENT_PREPARATION_TIMEOUT,
},
) )
.await?; .await?;
} }
@@ -722,7 +681,7 @@ async fn handle_prepare_done(
// It's finally time to dispatch all the execution requests that were waiting for this artifact // It's finally time to dispatch all the execution requests that were waiting for this artifact
// to be prepared. // to be prepared.
let pending_requests = awaiting_prepare.take(&artifact_id); let pending_requests = awaiting_prepare.take(&artifact_id);
for PendingExecutionRequest { execution_timeout, params, executor_params, result_tx } in for PendingExecutionRequest { exec_timeout, params, executor_params, result_tx } in
pending_requests pending_requests
{ {
if result_tx.is_canceled() { if result_tx.is_canceled() {
@@ -741,7 +700,7 @@ async fn handle_prepare_done(
execute_queue, execute_queue,
execute::ToQueue::Enqueue { execute::ToQueue::Enqueue {
artifact: ArtifactPathId::new(artifact_id.clone(), cache_path), artifact: ArtifactPathId::new(artifact_id.clone(), cache_path),
execution_timeout, exec_timeout,
params, params,
executor_params, executor_params,
result_tx, result_tx,
@@ -858,13 +817,14 @@ fn pulse_every(interval: std::time::Duration) -> impl futures::Stream<Item = ()>
} }
#[cfg(test)] #[cfg(test)]
mod tests { pub(crate) mod tests {
use super::*; use super::*;
use crate::{prepare::PrepareStats, InvalidCandidate, PrepareError}; use crate::{prepare::PrepareStats, InvalidCandidate, PrepareError};
use assert_matches::assert_matches; use assert_matches::assert_matches;
use futures::future::BoxFuture; use futures::future::BoxFuture;
const TEST_EXECUTION_TIMEOUT: Duration = Duration::from_secs(3); const TEST_EXECUTION_TIMEOUT: Duration = Duration::from_secs(3);
pub(crate) const TEST_PREPARATION_TIMEOUT: Duration = Duration::from_secs(30);
#[tokio::test] #[tokio::test]
async fn pulse_test() { async fn pulse_test() {
@@ -882,7 +842,7 @@ mod tests {
/// Creates a new PVF which artifact id can be uniquely identified by the given number. /// Creates a new PVF which artifact id can be uniquely identified by the given number.
fn artifact_id(descriminator: u32) -> ArtifactId { fn artifact_id(descriminator: u32) -> ArtifactId {
PvfWithExecutorParams::from_discriminator(descriminator).as_artifact_id() PvfPrepData::from_discriminator(descriminator).as_artifact_id()
} }
fn artifact_path(descriminator: u32) -> PathBuf { fn artifact_path(descriminator: u32) -> PathBuf {
@@ -1091,7 +1051,7 @@ mod tests {
let mut test = builder.build(); let mut test = builder.build();
let mut host = test.host_handle(); let mut host = test.host_handle();
host.heads_up(vec![PvfWithExecutorParams::from_discriminator(1)]).await.unwrap(); host.heads_up(vec![PvfPrepData::from_discriminator(1)]).await.unwrap();
let to_sweeper_rx = &mut test.to_sweeper_rx; let to_sweeper_rx = &mut test.to_sweeper_rx;
run_until( run_until(
@@ -1105,7 +1065,7 @@ mod tests {
// Extend TTL for the first artifact and make sure we don't receive another file removal // Extend TTL for the first artifact and make sure we don't receive another file removal
// request. // request.
host.heads_up(vec![PvfWithExecutorParams::from_discriminator(1)]).await.unwrap(); host.heads_up(vec![PvfPrepData::from_discriminator(1)]).await.unwrap();
test.poll_ensure_to_sweeper_is_empty().await; test.poll_ensure_to_sweeper_is_empty().await;
} }
@@ -1116,7 +1076,7 @@ mod tests {
let (result_tx, result_rx_pvf_1_1) = oneshot::channel(); let (result_tx, result_rx_pvf_1_1) = oneshot::channel();
host.execute_pvf( host.execute_pvf(
PvfWithExecutorParams::from_discriminator(1), PvfPrepData::from_discriminator(1),
TEST_EXECUTION_TIMEOUT, TEST_EXECUTION_TIMEOUT,
b"pvf1".to_vec(), b"pvf1".to_vec(),
Priority::Normal, Priority::Normal,
@@ -1127,7 +1087,7 @@ mod tests {
let (result_tx, result_rx_pvf_1_2) = oneshot::channel(); let (result_tx, result_rx_pvf_1_2) = oneshot::channel();
host.execute_pvf( host.execute_pvf(
PvfWithExecutorParams::from_discriminator(1), PvfPrepData::from_discriminator(1),
TEST_EXECUTION_TIMEOUT, TEST_EXECUTION_TIMEOUT,
b"pvf1".to_vec(), b"pvf1".to_vec(),
Priority::Critical, Priority::Critical,
@@ -1138,7 +1098,7 @@ mod tests {
let (result_tx, result_rx_pvf_2) = oneshot::channel(); let (result_tx, result_rx_pvf_2) = oneshot::channel();
host.execute_pvf( host.execute_pvf(
PvfWithExecutorParams::from_discriminator(2), PvfPrepData::from_discriminator(2),
TEST_EXECUTION_TIMEOUT, TEST_EXECUTION_TIMEOUT,
b"pvf2".to_vec(), b"pvf2".to_vec(),
Priority::Normal, Priority::Normal,
@@ -1216,9 +1176,7 @@ mod tests {
// First, test a simple precheck request. // First, test a simple precheck request.
let (result_tx, result_rx) = oneshot::channel(); let (result_tx, result_rx) = oneshot::channel();
host.precheck_pvf(PvfWithExecutorParams::from_discriminator(1), result_tx) host.precheck_pvf(PvfPrepData::from_discriminator(1), result_tx).await.unwrap();
.await
.unwrap();
// The queue received the prepare request. // The queue received the prepare request.
assert_matches!( assert_matches!(
@@ -1242,9 +1200,7 @@ mod tests {
let mut precheck_receivers = Vec::new(); let mut precheck_receivers = Vec::new();
for _ in 0..3 { for _ in 0..3 {
let (result_tx, result_rx) = oneshot::channel(); let (result_tx, result_rx) = oneshot::channel();
host.precheck_pvf(PvfWithExecutorParams::from_discriminator(2), result_tx) host.precheck_pvf(PvfPrepData::from_discriminator(2), result_tx).await.unwrap();
.await
.unwrap();
precheck_receivers.push(result_rx); precheck_receivers.push(result_rx);
} }
// Received prepare request. // Received prepare request.
@@ -1279,7 +1235,7 @@ mod tests {
// Send PVF for the execution and request the prechecking for it. // Send PVF for the execution and request the prechecking for it.
let (result_tx, result_rx_execute) = oneshot::channel(); let (result_tx, result_rx_execute) = oneshot::channel();
host.execute_pvf( host.execute_pvf(
PvfWithExecutorParams::from_discriminator(1), PvfPrepData::from_discriminator(1),
TEST_EXECUTION_TIMEOUT, TEST_EXECUTION_TIMEOUT,
b"pvf2".to_vec(), b"pvf2".to_vec(),
Priority::Critical, Priority::Critical,
@@ -1294,9 +1250,7 @@ mod tests {
); );
let (result_tx, result_rx) = oneshot::channel(); let (result_tx, result_rx) = oneshot::channel();
host.precheck_pvf(PvfWithExecutorParams::from_discriminator(1), result_tx) host.precheck_pvf(PvfPrepData::from_discriminator(1), result_tx).await.unwrap();
.await
.unwrap();
// Suppose the preparation failed, the execution queue is empty and both // Suppose the preparation failed, the execution queue is empty and both
// "clients" receive their results. // "clients" receive their results.
@@ -1318,15 +1272,13 @@ mod tests {
let mut precheck_receivers = Vec::new(); let mut precheck_receivers = Vec::new();
for _ in 0..3 { for _ in 0..3 {
let (result_tx, result_rx) = oneshot::channel(); let (result_tx, result_rx) = oneshot::channel();
host.precheck_pvf(PvfWithExecutorParams::from_discriminator(2), result_tx) host.precheck_pvf(PvfPrepData::from_discriminator(2), result_tx).await.unwrap();
.await
.unwrap();
precheck_receivers.push(result_rx); precheck_receivers.push(result_rx);
} }
let (result_tx, _result_rx_execute) = oneshot::channel(); let (result_tx, _result_rx_execute) = oneshot::channel();
host.execute_pvf( host.execute_pvf(
PvfWithExecutorParams::from_discriminator(2), PvfPrepData::from_discriminator(2),
TEST_EXECUTION_TIMEOUT, TEST_EXECUTION_TIMEOUT,
b"pvf2".to_vec(), b"pvf2".to_vec(),
Priority::Critical, Priority::Critical,
@@ -1366,9 +1318,7 @@ mod tests {
// Submit a precheck request that fails. // Submit a precheck request that fails.
let (result_tx, result_rx) = oneshot::channel(); let (result_tx, result_rx) = oneshot::channel();
host.precheck_pvf(PvfWithExecutorParams::from_discriminator(1), result_tx) host.precheck_pvf(PvfPrepData::from_discriminator(1), result_tx).await.unwrap();
.await
.unwrap();
// The queue received the prepare request. // The queue received the prepare request.
assert_matches!( assert_matches!(
@@ -1390,7 +1340,7 @@ mod tests {
// Submit another precheck request. // Submit another precheck request.
let (result_tx_2, result_rx_2) = oneshot::channel(); let (result_tx_2, result_rx_2) = oneshot::channel();
host.precheck_pvf(PvfWithExecutorParams::from_discriminator(1), result_tx_2) host.precheck_pvf(PvfPrepData::from_discriminator(1), result_tx_2)
.await .await
.unwrap(); .unwrap();
@@ -1406,7 +1356,7 @@ mod tests {
// Submit another precheck request. // Submit another precheck request.
let (result_tx_3, result_rx_3) = oneshot::channel(); let (result_tx_3, result_rx_3) = oneshot::channel();
host.precheck_pvf(PvfWithExecutorParams::from_discriminator(1), result_tx_3) host.precheck_pvf(PvfPrepData::from_discriminator(1), result_tx_3)
.await .await
.unwrap(); .unwrap();
@@ -1428,7 +1378,7 @@ mod tests {
// Submit a execute request that fails. // Submit a execute request that fails.
let (result_tx, result_rx) = oneshot::channel(); let (result_tx, result_rx) = oneshot::channel();
host.execute_pvf( host.execute_pvf(
PvfWithExecutorParams::from_discriminator(1), PvfPrepData::from_discriminator(1),
TEST_EXECUTION_TIMEOUT, TEST_EXECUTION_TIMEOUT,
b"pvf".to_vec(), b"pvf".to_vec(),
Priority::Critical, Priority::Critical,
@@ -1458,7 +1408,7 @@ mod tests {
// Submit another execute request. We shouldn't try to prepare again, yet. // Submit another execute request. We shouldn't try to prepare again, yet.
let (result_tx_2, result_rx_2) = oneshot::channel(); let (result_tx_2, result_rx_2) = oneshot::channel();
host.execute_pvf( host.execute_pvf(
PvfWithExecutorParams::from_discriminator(1), PvfPrepData::from_discriminator(1),
TEST_EXECUTION_TIMEOUT, TEST_EXECUTION_TIMEOUT,
b"pvf".to_vec(), b"pvf".to_vec(),
Priority::Critical, Priority::Critical,
@@ -1480,7 +1430,7 @@ mod tests {
// Submit another execute request. // Submit another execute request.
let (result_tx_3, result_rx_3) = oneshot::channel(); let (result_tx_3, result_rx_3) = oneshot::channel();
host.execute_pvf( host.execute_pvf(
PvfWithExecutorParams::from_discriminator(1), PvfPrepData::from_discriminator(1),
TEST_EXECUTION_TIMEOUT, TEST_EXECUTION_TIMEOUT,
b"pvf".to_vec(), b"pvf".to_vec(),
Priority::Critical, Priority::Critical,
@@ -1530,7 +1480,7 @@ mod tests {
// Submit an execute request that fails. // Submit an execute request that fails.
let (result_tx, result_rx) = oneshot::channel(); let (result_tx, result_rx) = oneshot::channel();
host.execute_pvf( host.execute_pvf(
PvfWithExecutorParams::from_discriminator(1), PvfPrepData::from_discriminator(1),
TEST_EXECUTION_TIMEOUT, TEST_EXECUTION_TIMEOUT,
b"pvf".to_vec(), b"pvf".to_vec(),
Priority::Critical, Priority::Critical,
@@ -1563,7 +1513,7 @@ mod tests {
// Submit another execute request. // Submit another execute request.
let (result_tx_2, result_rx_2) = oneshot::channel(); let (result_tx_2, result_rx_2) = oneshot::channel();
host.execute_pvf( host.execute_pvf(
PvfWithExecutorParams::from_discriminator(1), PvfPrepData::from_discriminator(1),
TEST_EXECUTION_TIMEOUT, TEST_EXECUTION_TIMEOUT,
b"pvf".to_vec(), b"pvf".to_vec(),
Priority::Critical, Priority::Critical,
@@ -1588,7 +1538,7 @@ mod tests {
// Submit another execute request. // Submit another execute request.
let (result_tx_3, result_rx_3) = oneshot::channel(); let (result_tx_3, result_rx_3) = oneshot::channel();
host.execute_pvf( host.execute_pvf(
PvfWithExecutorParams::from_discriminator(1), PvfPrepData::from_discriminator(1),
TEST_EXECUTION_TIMEOUT, TEST_EXECUTION_TIMEOUT,
b"pvf".to_vec(), b"pvf".to_vec(),
Priority::Critical, Priority::Critical,
@@ -1615,7 +1565,7 @@ mod tests {
let mut host = test.host_handle(); let mut host = test.host_handle();
// Submit a heads-up request that fails. // Submit a heads-up request that fails.
host.heads_up(vec![PvfWithExecutorParams::from_discriminator(1)]).await.unwrap(); host.heads_up(vec![PvfPrepData::from_discriminator(1)]).await.unwrap();
// The queue received the prepare request. // The queue received the prepare request.
assert_matches!( assert_matches!(
@@ -1632,7 +1582,7 @@ mod tests {
.unwrap(); .unwrap();
// Submit another heads-up request. // Submit another heads-up request.
host.heads_up(vec![PvfWithExecutorParams::from_discriminator(1)]).await.unwrap(); host.heads_up(vec![PvfPrepData::from_discriminator(1)]).await.unwrap();
// Assert the prepare queue is empty. // Assert the prepare queue is empty.
test.poll_ensure_to_prepare_queue_is_empty().await; test.poll_ensure_to_prepare_queue_is_empty().await;
@@ -1641,7 +1591,7 @@ mod tests {
futures_timer::Delay::new(PREPARE_FAILURE_COOLDOWN).await; futures_timer::Delay::new(PREPARE_FAILURE_COOLDOWN).await;
// Submit another heads-up request. // Submit another heads-up request.
host.heads_up(vec![PvfWithExecutorParams::from_discriminator(1)]).await.unwrap(); host.heads_up(vec![PvfPrepData::from_discriminator(1)]).await.unwrap();
// Assert the prepare queue contains the request. // Assert the prepare queue contains the request.
assert_matches!( assert_matches!(
@@ -1657,7 +1607,7 @@ mod tests {
let (result_tx, result_rx) = oneshot::channel(); let (result_tx, result_rx) = oneshot::channel();
host.execute_pvf( host.execute_pvf(
PvfWithExecutorParams::from_discriminator(1), PvfPrepData::from_discriminator(1),
TEST_EXECUTION_TIMEOUT, TEST_EXECUTION_TIMEOUT,
b"pvf1".to_vec(), b"pvf1".to_vec(),
Priority::Normal, Priority::Normal,
+1 -1
View File
@@ -110,7 +110,7 @@ pub use sp_tracing;
pub use error::{InvalidCandidate, PrepareError, PrepareResult, ValidationError}; pub use error::{InvalidCandidate, PrepareError, PrepareResult, ValidationError};
pub use prepare::PrepareStats; pub use prepare::PrepareStats;
pub use priority::Priority; pub use priority::Priority;
pub use pvf::PvfWithExecutorParams; pub use pvf::PvfPrepData;
pub use host::{start, Config, ValidationHost}; pub use host::{start, Config, ValidationHost};
pub use metrics::Metrics; pub use metrics::Metrics;
+6 -5
View File
@@ -183,9 +183,9 @@ impl metrics::Metrics for Metrics {
"Time spent in preparing PVF artifacts in seconds", "Time spent in preparing PVF artifacts in seconds",
) )
.buckets(vec![ .buckets(vec![
// This is synchronized with the PRECHECK_PREPARATION_TIMEOUT=60s // This is synchronized with the `DEFAULT_PRECHECK_PREPARATION_TIMEOUT=60s`
// and LENIENT_PREPARATION_TIMEOUT=360s constants found in // and `DEFAULT_LENIENT_PREPARATION_TIMEOUT=360s` constants found in
// src/prepare/worker.rs // node/core/candidate-validation/src/lib.rs
0.1, 0.1,
0.5, 0.5,
1.0, 1.0,
@@ -209,8 +209,9 @@ impl metrics::Metrics for Metrics {
"polkadot_pvf_execution_time", "polkadot_pvf_execution_time",
"Time spent in executing PVFs", "Time spent in executing PVFs",
).buckets(vec![ ).buckets(vec![
// This is synchronized with `APPROVAL_EXECUTION_TIMEOUT` and // This is synchronized with `DEFAULT_APPROVAL_EXECUTION_TIMEOUT` and
// `BACKING_EXECUTION_TIMEOUT` constants in `node/primitives/src/lib.rs` // `DEFAULT_BACKING_EXECUTION_TIMEOUT` constants in
// node/core/candidate-validation/src/lib.rs
0.01, 0.01,
0.025, 0.025,
0.05, 0.05,
+6 -21
View File
@@ -18,7 +18,7 @@ use super::worker::{self, Outcome};
use crate::{ use crate::{
error::{PrepareError, PrepareResult}, error::{PrepareError, PrepareResult},
metrics::Metrics, metrics::Metrics,
pvf::PvfWithExecutorParams, pvf::PvfPrepData,
worker_common::{IdleWorker, WorkerHandle}, worker_common::{IdleWorker, WorkerHandle},
LOG_TARGET, LOG_TARGET,
}; };
@@ -65,12 +65,7 @@ pub enum ToPool {
/// ///
/// In either case, the worker is considered busy and no further `StartWork` messages should be /// In either case, the worker is considered busy and no further `StartWork` messages should be
/// sent until either `Concluded` or `Rip` message is received. /// sent until either `Concluded` or `Rip` message is received.
StartWork { StartWork { worker: Worker, pvf: PvfPrepData, artifact_path: PathBuf },
worker: Worker,
pvf_with_params: PvfWithExecutorParams,
artifact_path: PathBuf,
preparation_timeout: Duration,
},
} }
/// A message sent from pool to its client. /// A message sent from pool to its client.
@@ -214,7 +209,7 @@ fn handle_to_pool(
metrics.prepare_worker().on_begin_spawn(); metrics.prepare_worker().on_begin_spawn();
mux.push(spawn_worker_task(program_path.to_owned(), spawn_timeout).boxed()); mux.push(spawn_worker_task(program_path.to_owned(), spawn_timeout).boxed());
}, },
ToPool::StartWork { worker, pvf_with_params, artifact_path, preparation_timeout } => { ToPool::StartWork { worker, pvf, artifact_path } => {
if let Some(data) = spawned.get_mut(worker) { if let Some(data) = spawned.get_mut(worker) {
if let Some(idle) = data.idle.take() { if let Some(idle) = data.idle.take() {
let preparation_timer = metrics.time_preparation(); let preparation_timer = metrics.time_preparation();
@@ -223,10 +218,9 @@ fn handle_to_pool(
metrics.clone(), metrics.clone(),
worker, worker,
idle, idle,
pvf_with_params, pvf,
cache_path.to_owned(), cache_path.to_owned(),
artifact_path, artifact_path,
preparation_timeout,
preparation_timer, preparation_timer,
) )
.boxed(), .boxed(),
@@ -272,21 +266,12 @@ async fn start_work_task<Timer>(
metrics: Metrics, metrics: Metrics,
worker: Worker, worker: Worker,
idle: IdleWorker, idle: IdleWorker,
pvf_with_params: PvfWithExecutorParams, pvf: PvfPrepData,
cache_path: PathBuf, cache_path: PathBuf,
artifact_path: PathBuf, artifact_path: PathBuf,
preparation_timeout: Duration,
_preparation_timer: Option<Timer>, _preparation_timer: Option<Timer>,
) -> PoolEvent { ) -> PoolEvent {
let outcome = worker::start_work( let outcome = worker::start_work(&metrics, idle, pvf, &cache_path, artifact_path).await;
&metrics,
idle,
pvf_with_params,
&cache_path,
artifact_path,
preparation_timeout,
)
.await;
PoolEvent::StartWork(worker, outcome) PoolEvent::StartWork(worker, outcome)
} }
+33 -85
View File
@@ -18,17 +18,18 @@
use super::pool::{self, Worker}; use super::pool::{self, Worker};
use crate::{ use crate::{
artifacts::ArtifactId, metrics::Metrics, PrepareResult, Priority, PvfWithExecutorParams, artifacts::ArtifactId, metrics::Metrics, PrepareResult, Priority, PvfPrepData, LOG_TARGET,
LOG_TARGET,
}; };
use always_assert::{always, never}; use always_assert::{always, never};
use futures::{channel::mpsc, stream::StreamExt as _, Future, SinkExt}; use futures::{channel::mpsc, stream::StreamExt as _, Future, SinkExt};
use std::{ use std::{
collections::{HashMap, VecDeque}, collections::{HashMap, VecDeque},
path::PathBuf, path::PathBuf,
time::Duration,
}; };
#[cfg(test)]
use std::time::Duration;
/// A request to pool. /// A request to pool.
#[derive(Debug)] #[derive(Debug)]
pub enum ToQueue { pub enum ToQueue {
@@ -36,11 +37,7 @@ pub enum ToQueue {
/// ///
/// Note that it is incorrect to enqueue the same PVF again without first receiving the /// Note that it is incorrect to enqueue the same PVF again without first receiving the
/// [`FromQueue`] response. /// [`FromQueue`] response.
Enqueue { Enqueue { priority: Priority, pvf: PvfPrepData },
priority: Priority,
pvf_with_params: PvfWithExecutorParams,
preparation_timeout: Duration,
},
} }
/// A response from queue. /// A response from queue.
@@ -85,9 +82,7 @@ slotmap::new_key_type! { pub struct Job; }
struct JobData { struct JobData {
/// The priority of this job. Can be bumped. /// The priority of this job. Can be bumped.
priority: Priority, priority: Priority,
pvf_with_params: PvfWithExecutorParams, pvf: PvfPrepData,
/// The timeout for the preparation job.
preparation_timeout: Duration,
worker: Option<Worker>, worker: Option<Worker>,
} }
@@ -215,8 +210,8 @@ impl Queue {
async fn handle_to_queue(queue: &mut Queue, to_queue: ToQueue) -> Result<(), Fatal> { async fn handle_to_queue(queue: &mut Queue, to_queue: ToQueue) -> Result<(), Fatal> {
match to_queue { match to_queue {
ToQueue::Enqueue { priority, pvf_with_params, preparation_timeout } => { ToQueue::Enqueue { priority, pvf } => {
handle_enqueue(queue, priority, pvf_with_params, preparation_timeout).await?; handle_enqueue(queue, priority, pvf).await?;
}, },
} }
Ok(()) Ok(())
@@ -225,19 +220,18 @@ async fn handle_to_queue(queue: &mut Queue, to_queue: ToQueue) -> Result<(), Fat
async fn handle_enqueue( async fn handle_enqueue(
queue: &mut Queue, queue: &mut Queue,
priority: Priority, priority: Priority,
pvf_with_params: PvfWithExecutorParams, pvf: PvfPrepData,
preparation_timeout: Duration,
) -> Result<(), Fatal> { ) -> Result<(), Fatal> {
gum::debug!( gum::debug!(
target: LOG_TARGET, target: LOG_TARGET,
validation_code_hash = ?pvf_with_params.code_hash(), validation_code_hash = ?pvf.code_hash(),
?priority, ?priority,
?preparation_timeout, preparation_timeout = ?pvf.prep_timeout,
"PVF is enqueued for preparation.", "PVF is enqueued for preparation.",
); );
queue.metrics.prepare_enqueued(); queue.metrics.prepare_enqueued();
let artifact_id = pvf_with_params.as_artifact_id(); let artifact_id = pvf.as_artifact_id();
if never!( if never!(
queue.artifact_id_to_job.contains_key(&artifact_id), queue.artifact_id_to_job.contains_key(&artifact_id),
"second Enqueue sent for a known artifact" "second Enqueue sent for a known artifact"
@@ -254,10 +248,7 @@ async fn handle_enqueue(
return Ok(()) return Ok(())
} }
let job = let job = queue.jobs.insert(JobData { priority, pvf, worker: None });
queue
.jobs
.insert(JobData { priority, pvf_with_params, preparation_timeout, worker: None });
queue.artifact_id_to_job.insert(artifact_id, job); queue.artifact_id_to_job.insert(artifact_id, job);
if let Some(available) = find_idle_worker(queue) { if let Some(available) = find_idle_worker(queue) {
@@ -348,7 +339,7 @@ async fn handle_worker_concluded(
// this can't be None; // this can't be None;
// qed. // qed.
let job_data = never_none!(queue.jobs.remove(job)); let job_data = never_none!(queue.jobs.remove(job));
let artifact_id = job_data.pvf_with_params.as_artifact_id(); let artifact_id = job_data.pvf.as_artifact_id();
queue.artifact_id_to_job.remove(&artifact_id); queue.artifact_id_to_job.remove(&artifact_id);
@@ -434,7 +425,7 @@ async fn spawn_extra_worker(queue: &mut Queue, critical: bool) -> Result<(), Fat
async fn assign(queue: &mut Queue, worker: Worker, job: Job) -> Result<(), Fatal> { async fn assign(queue: &mut Queue, worker: Worker, job: Job) -> Result<(), Fatal> {
let job_data = &mut queue.jobs[job]; let job_data = &mut queue.jobs[job];
let artifact_id = job_data.pvf_with_params.as_artifact_id(); let artifact_id = job_data.pvf.as_artifact_id();
let artifact_path = artifact_id.path(&queue.cache_path); let artifact_path = artifact_id.path(&queue.cache_path);
job_data.worker = Some(worker); job_data.worker = Some(worker);
@@ -443,12 +434,7 @@ async fn assign(queue: &mut Queue, worker: Worker, job: Job) -> Result<(), Fatal
send_pool( send_pool(
&mut queue.to_pool_tx, &mut queue.to_pool_tx,
pool::ToPool::StartWork { pool::ToPool::StartWork { worker, pvf: job_data.pvf.clone(), artifact_path },
worker,
pvf_with_params: job_data.pvf_with_params.clone(),
artifact_path,
preparation_timeout: job_data.preparation_timeout,
},
) )
.await?; .await?;
@@ -503,9 +489,7 @@ pub fn start(
mod tests { mod tests {
use super::*; use super::*;
use crate::{ use crate::{
error::PrepareError, error::PrepareError, host::tests::TEST_PREPARATION_TIMEOUT, prepare::PrepareStats,
host::{LENIENT_PREPARATION_TIMEOUT, PRECHECK_PREPARATION_TIMEOUT},
prepare::PrepareStats,
}; };
use assert_matches::assert_matches; use assert_matches::assert_matches;
use futures::{future::BoxFuture, FutureExt}; use futures::{future::BoxFuture, FutureExt};
@@ -513,8 +497,8 @@ mod tests {
use std::task::Poll; use std::task::Poll;
/// Creates a new PVF which artifact id can be uniquely identified by the given number. /// Creates a new PVF which artifact id can be uniquely identified by the given number.
fn pvf_with_params(descriminator: u32) -> PvfWithExecutorParams { fn pvf(discriminator: u32) -> PvfPrepData {
PvfWithExecutorParams::from_discriminator(descriminator) PvfPrepData::from_discriminator(discriminator)
} }
async fn run_until<R>( async fn run_until<R>(
@@ -621,11 +605,7 @@ mod tests {
async fn properly_concludes() { async fn properly_concludes() {
let mut test = Test::new(2, 2); let mut test = Test::new(2, 2);
test.send_queue(ToQueue::Enqueue { test.send_queue(ToQueue::Enqueue { priority: Priority::Normal, pvf: pvf(1) });
priority: Priority::Normal,
pvf_with_params: pvf_with_params(1),
preparation_timeout: PRECHECK_PREPARATION_TIMEOUT,
});
assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn); assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn);
let w = test.workers.insert(()); let w = test.workers.insert(());
@@ -636,10 +616,7 @@ mod tests {
result: Ok(PrepareStats::default()), result: Ok(PrepareStats::default()),
}); });
assert_eq!( assert_eq!(test.poll_and_recv_from_queue().await.artifact_id, pvf(1).as_artifact_id());
test.poll_and_recv_from_queue().await.artifact_id,
pvf_with_params(1).as_artifact_id()
);
} }
#[tokio::test] #[tokio::test]
@@ -647,22 +624,12 @@ mod tests {
let mut test = Test::new(2, 3); let mut test = Test::new(2, 3);
let priority = Priority::Normal; let priority = Priority::Normal;
let preparation_timeout = PRECHECK_PREPARATION_TIMEOUT; test.send_queue(ToQueue::Enqueue { priority, pvf: PvfPrepData::from_discriminator(1) });
test.send_queue(ToQueue::Enqueue { test.send_queue(ToQueue::Enqueue { priority, pvf: PvfPrepData::from_discriminator(2) });
priority,
pvf_with_params: PvfWithExecutorParams::from_discriminator(1),
preparation_timeout,
});
test.send_queue(ToQueue::Enqueue {
priority,
pvf_with_params: PvfWithExecutorParams::from_discriminator(2),
preparation_timeout,
});
// Start a non-precheck preparation for this one. // Start a non-precheck preparation for this one.
test.send_queue(ToQueue::Enqueue { test.send_queue(ToQueue::Enqueue {
priority, priority,
pvf_with_params: PvfWithExecutorParams::from_discriminator(3), pvf: PvfPrepData::from_discriminator_and_timeout(3, TEST_PREPARATION_TIMEOUT * 3),
preparation_timeout: LENIENT_PREPARATION_TIMEOUT,
}); });
// Receive only two spawns. // Receive only two spawns.
@@ -690,8 +657,7 @@ mod tests {
// Enqueue a critical job. // Enqueue a critical job.
test.send_queue(ToQueue::Enqueue { test.send_queue(ToQueue::Enqueue {
priority: Priority::Critical, priority: Priority::Critical,
pvf_with_params: PvfWithExecutorParams::from_discriminator(4), pvf: PvfPrepData::from_discriminator(4),
preparation_timeout,
}); });
// 2 out of 2 are working, but there is a critical job incoming. That means that spawning // 2 out of 2 are working, but there is a critical job incoming. That means that spawning
@@ -702,12 +668,10 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn cull_unwanted() { async fn cull_unwanted() {
let mut test = Test::new(1, 2); let mut test = Test::new(1, 2);
let preparation_timeout = PRECHECK_PREPARATION_TIMEOUT;
test.send_queue(ToQueue::Enqueue { test.send_queue(ToQueue::Enqueue {
priority: Priority::Normal, priority: Priority::Normal,
pvf_with_params: PvfWithExecutorParams::from_discriminator(1), pvf: PvfPrepData::from_discriminator(1),
preparation_timeout,
}); });
assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn); assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn);
let w1 = test.workers.insert(()); let w1 = test.workers.insert(());
@@ -717,8 +681,7 @@ mod tests {
// Enqueue a critical job, which warrants spawning over the soft limit. // Enqueue a critical job, which warrants spawning over the soft limit.
test.send_queue(ToQueue::Enqueue { test.send_queue(ToQueue::Enqueue {
priority: Priority::Critical, priority: Priority::Critical,
pvf_with_params: PvfWithExecutorParams::from_discriminator(2), pvf: PvfPrepData::from_discriminator(2),
preparation_timeout,
}); });
assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn); assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn);
@@ -742,22 +705,12 @@ mod tests {
let mut test = Test::new(2, 2); let mut test = Test::new(2, 2);
let priority = Priority::Normal; let priority = Priority::Normal;
let preparation_timeout = PRECHECK_PREPARATION_TIMEOUT; test.send_queue(ToQueue::Enqueue { priority, pvf: PvfPrepData::from_discriminator(1) });
test.send_queue(ToQueue::Enqueue { test.send_queue(ToQueue::Enqueue { priority, pvf: PvfPrepData::from_discriminator(2) });
priority,
pvf_with_params: PvfWithExecutorParams::from_discriminator(1),
preparation_timeout,
});
test.send_queue(ToQueue::Enqueue {
priority,
pvf_with_params: PvfWithExecutorParams::from_discriminator(2),
preparation_timeout,
});
// Start a non-precheck preparation for this one. // Start a non-precheck preparation for this one.
test.send_queue(ToQueue::Enqueue { test.send_queue(ToQueue::Enqueue {
priority, priority,
pvf_with_params: PvfWithExecutorParams::from_discriminator(3), pvf: PvfPrepData::from_discriminator_and_timeout(3, TEST_PREPARATION_TIMEOUT * 3),
preparation_timeout: LENIENT_PREPARATION_TIMEOUT,
}); });
assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn); assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn);
@@ -782,10 +735,7 @@ mod tests {
// Since there is still work, the queue requested one extra worker to spawn to handle the // Since there is still work, the queue requested one extra worker to spawn to handle the
// remaining enqueued work items. // remaining enqueued work items.
assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn); assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn);
assert_eq!( assert_eq!(test.poll_and_recv_from_queue().await.artifact_id, pvf(1).as_artifact_id());
test.poll_and_recv_from_queue().await.artifact_id,
pvf_with_params(1).as_artifact_id()
);
} }
#[tokio::test] #[tokio::test]
@@ -794,8 +744,7 @@ mod tests {
test.send_queue(ToQueue::Enqueue { test.send_queue(ToQueue::Enqueue {
priority: Priority::Normal, priority: Priority::Normal,
pvf_with_params: PvfWithExecutorParams::from_discriminator(1), pvf: PvfPrepData::from_discriminator(1),
preparation_timeout: PRECHECK_PREPARATION_TIMEOUT,
}); });
assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn); assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn);
@@ -819,8 +768,7 @@ mod tests {
test.send_queue(ToQueue::Enqueue { test.send_queue(ToQueue::Enqueue {
priority: Priority::Normal, priority: Priority::Normal,
pvf_with_params: PvfWithExecutorParams::from_discriminator(1), pvf: PvfPrepData::from_discriminator(1),
preparation_timeout: PRECHECK_PREPARATION_TIMEOUT,
}); });
assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn); assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn);
+21 -36
View File
@@ -24,7 +24,7 @@ use crate::{
error::{PrepareError, PrepareResult}, error::{PrepareError, PrepareResult},
metrics::Metrics, metrics::Metrics,
prepare::PrepareStats, prepare::PrepareStats,
pvf::PvfWithExecutorParams, pvf::PvfPrepData,
worker_common::{ worker_common::{
bytes_to_path, cpu_time_monitor_loop, framed_recv, framed_send, path_to_bytes, bytes_to_path, cpu_time_monitor_loop, framed_recv, framed_send, path_to_bytes,
spawn_with_program_path, tmpfile_in, worker_event_loop, IdleWorker, SpawnErr, WorkerHandle, spawn_with_program_path, tmpfile_in, worker_event_loop, IdleWorker, SpawnErr, WorkerHandle,
@@ -84,10 +84,9 @@ pub enum Outcome {
pub async fn start_work( pub async fn start_work(
metrics: &Metrics, metrics: &Metrics,
worker: IdleWorker, worker: IdleWorker,
pvf_with_params: PvfWithExecutorParams, pvf: PvfPrepData,
cache_path: &Path, cache_path: &Path,
artifact_path: PathBuf, artifact_path: PathBuf,
preparation_timeout: Duration,
) -> Outcome { ) -> Outcome {
let IdleWorker { stream, pid } = worker; let IdleWorker { stream, pid } = worker;
@@ -99,9 +98,8 @@ pub async fn start_work(
); );
with_tmp_file(stream, pid, cache_path, |tmp_file, mut stream| async move { with_tmp_file(stream, pid, cache_path, |tmp_file, mut stream| async move {
if let Err(err) = let preparation_timeout = pvf.prep_timeout;
send_request(&mut stream, pvf_with_params, &tmp_file, preparation_timeout).await if let Err(err) = send_request(&mut stream, pvf, &tmp_file).await {
{
gum::warn!( gum::warn!(
target: LOG_TARGET, target: LOG_TARGET,
worker_pid = %pid, worker_pid = %pid,
@@ -273,27 +271,22 @@ where
async fn send_request( async fn send_request(
stream: &mut UnixStream, stream: &mut UnixStream,
pvf_with_params: PvfWithExecutorParams, pvf: PvfPrepData,
tmp_file: &Path, tmp_file: &Path,
preparation_timeout: Duration,
) -> io::Result<()> { ) -> io::Result<()> {
framed_send(stream, &pvf_with_params.encode()).await?; framed_send(stream, &pvf.encode()).await?;
framed_send(stream, path_to_bytes(tmp_file)).await?; framed_send(stream, path_to_bytes(tmp_file)).await?;
framed_send(stream, &preparation_timeout.encode()).await?;
Ok(()) Ok(())
} }
async fn recv_request( async fn recv_request(stream: &mut UnixStream) -> io::Result<(PvfPrepData, PathBuf)> {
stream: &mut UnixStream, let pvf = framed_recv(stream).await?;
) -> io::Result<(PvfWithExecutorParams, PathBuf, Duration)> { let pvf = PvfPrepData::decode(&mut &pvf[..]).map_err(|e| {
let pvf_with_params = framed_recv(stream).await?; io::Error::new(
let pvf_with_params = io::ErrorKind::Other,
PvfWithExecutorParams::decode(&mut &pvf_with_params[..]).map_err(|e| { format!("prepare pvf recv_request: failed to decode PvfPrepData: {}", e),
io::Error::new( )
io::ErrorKind::Other, })?;
format!("prepare pvf recv_request: failed to decode PvfWithExecutorParams: {}", e),
)
})?;
let tmp_file = framed_recv(stream).await?; let tmp_file = framed_recv(stream).await?;
let tmp_file = bytes_to_path(&tmp_file).ok_or_else(|| { let tmp_file = bytes_to_path(&tmp_file).ok_or_else(|| {
io::Error::new( io::Error::new(
@@ -301,14 +294,7 @@ async fn recv_request(
"prepare pvf recv_request: non utf-8 artifact path".to_string(), "prepare pvf recv_request: non utf-8 artifact path".to_string(),
) )
})?; })?;
let preparation_timeout = framed_recv(stream).await?; Ok((pvf, tmp_file))
let preparation_timeout = Duration::decode(&mut &preparation_timeout[..]).map_err(|e| {
io::Error::new(
io::ErrorKind::Other,
format!("prepare pvf recv_request: failed to decode duration: {:?}", e),
)
})?;
Ok((pvf_with_params, tmp_file, preparation_timeout))
} }
async fn send_response(stream: &mut UnixStream, result: PrepareResult) -> io::Result<()> { async fn send_response(stream: &mut UnixStream, result: PrepareResult) -> io::Result<()> {
@@ -360,7 +346,7 @@ pub fn worker_entrypoint(socket_path: &str) {
worker_event_loop("prepare", socket_path, |rt_handle, mut stream| async move { worker_event_loop("prepare", socket_path, |rt_handle, mut stream| async move {
loop { loop {
let worker_pid = std::process::id(); let worker_pid = std::process::id();
let (pvf_with_params, dest, preparation_timeout) = recv_request(&mut stream).await?; let (pvf, dest) = recv_request(&mut stream).await?;
gum::debug!( gum::debug!(
target: LOG_TARGET, target: LOG_TARGET,
%worker_pid, %worker_pid,
@@ -368,6 +354,7 @@ pub fn worker_entrypoint(socket_path: &str) {
); );
let cpu_time_start = ProcessTime::now(); let cpu_time_start = ProcessTime::now();
let preparation_timeout = pvf.prep_timeout;
// Run the memory tracker. // Run the memory tracker.
#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))] #[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
@@ -385,7 +372,7 @@ pub fn worker_entrypoint(socket_path: &str) {
// Spawn another thread for preparation. // Spawn another thread for preparation.
let prepare_fut = rt_handle let prepare_fut = rt_handle
.spawn_blocking(move || { .spawn_blocking(move || {
let result = prepare_artifact(pvf_with_params); let result = prepare_artifact(pvf);
// Get the `ru_maxrss` stat. If supported, call getrusage for the thread. // Get the `ru_maxrss` stat. If supported, call getrusage for the thread.
#[cfg(target_os = "linux")] #[cfg(target_os = "linux")]
@@ -467,16 +454,14 @@ pub fn worker_entrypoint(socket_path: &str) {
}); });
} }
fn prepare_artifact( fn prepare_artifact(pvf: PvfPrepData) -> Result<CompiledArtifact, PrepareError> {
pvf_with_params: PvfWithExecutorParams,
) -> Result<CompiledArtifact, PrepareError> {
panic::catch_unwind(|| { panic::catch_unwind(|| {
let blob = match crate::executor_intf::prevalidate(&pvf_with_params.code()) { let blob = match crate::executor_intf::prevalidate(&pvf.code()) {
Err(err) => return Err(PrepareError::Prevalidation(format!("{:?}", err))), Err(err) => return Err(PrepareError::Prevalidation(format!("{:?}", err))),
Ok(b) => b, Ok(b) => b,
}; };
match crate::executor_intf::prepare(blob, &pvf_with_params.executor_params()) { match crate::executor_intf::prepare(blob, &pvf.executor_params()) {
Ok(compiled_artifact) => Ok(CompiledArtifact::new(compiled_artifact)), Ok(compiled_artifact) => Ok(CompiledArtifact::new(compiled_artifact)),
Err(err) => Err(PrepareError::Preparation(format!("{:?}", err))), Err(err) => Err(PrepareError::Preparation(format!("{:?}", err))),
} }
+31 -13
View File
@@ -23,26 +23,39 @@ use std::{
cmp::{Eq, PartialEq}, cmp::{Eq, PartialEq},
fmt, fmt,
sync::Arc, sync::Arc,
time::Duration,
}; };
/// A struct that carries code of a parachain validation function, its hash, and a corresponding #[cfg(test)]
/// set of executor parameters. use crate::host::tests::TEST_PREPARATION_TIMEOUT;
/// A struct that carries the exhaustive set of data to prepare an artifact out of plain
/// Wasm binary
/// ///
/// Should be cheap to clone. /// Should be cheap to clone.
#[derive(Clone, Encode, Decode)] #[derive(Clone, Encode, Decode)]
pub struct PvfWithExecutorParams { pub struct PvfPrepData {
/// Wasm code (uncompressed)
pub(crate) code: Arc<Vec<u8>>, pub(crate) code: Arc<Vec<u8>>,
/// Wasm code hash
pub(crate) code_hash: ValidationCodeHash, pub(crate) code_hash: ValidationCodeHash,
/// Executor environment parameters for the session for which artifact is prepared
pub(crate) executor_params: Arc<ExecutorParams>, pub(crate) executor_params: Arc<ExecutorParams>,
/// Preparation timeout
pub(crate) prep_timeout: Duration,
} }
impl PvfWithExecutorParams { impl PvfPrepData {
/// Returns an instance of the PVF out of the given PVF code and executor params. /// Returns an instance of the PVF out of the given PVF code and executor params.
pub fn from_code(code: Vec<u8>, executor_params: ExecutorParams) -> Self { pub fn from_code(
code: Vec<u8>,
executor_params: ExecutorParams,
prep_timeout: Duration,
) -> Self {
let code = Arc::new(code); let code = Arc::new(code);
let code_hash = blake2_256(&code).into(); let code_hash = blake2_256(&code).into();
let executor_params = Arc::new(executor_params); let executor_params = Arc::new(executor_params);
Self { code, code_hash, executor_params } Self { code, code_hash, executor_params, prep_timeout }
} }
/// Returns artifact ID that corresponds to the PVF with given executor params /// Returns artifact ID that corresponds to the PVF with given executor params
@@ -67,27 +80,32 @@ impl PvfWithExecutorParams {
/// Creates a structure for tests /// Creates a structure for tests
#[cfg(test)] #[cfg(test)]
pub(crate) fn from_discriminator(num: u32) -> Self { pub(crate) fn from_discriminator_and_timeout(num: u32, timeout: Duration) -> Self {
let descriminator_buf = num.to_le_bytes().to_vec(); let descriminator_buf = num.to_le_bytes().to_vec();
Self::from_code(descriminator_buf, ExecutorParams::default()) Self::from_code(descriminator_buf, ExecutorParams::default(), timeout)
}
#[cfg(test)]
pub(crate) fn from_discriminator(num: u32) -> Self {
Self::from_discriminator_and_timeout(num, TEST_PREPARATION_TIMEOUT)
} }
} }
impl fmt::Debug for PvfWithExecutorParams { impl fmt::Debug for PvfPrepData {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!( write!(
f, f,
"Pvf {{ code, code_hash: {:?}, executor_params: {:?} }}", "Pvf {{ code, code_hash: {:?}, executor_params: {:?}, prep_timeout: {:?} }}",
self.code_hash, self.executor_params self.code_hash, self.executor_params, self.prep_timeout
) )
} }
} }
impl PartialEq for PvfWithExecutorParams { impl PartialEq for PvfPrepData {
fn eq(&self, other: &Self) -> bool { fn eq(&self, other: &Self) -> bool {
self.code_hash == other.code_hash && self.code_hash == other.code_hash &&
self.executor_params.hash() == other.executor_params.hash() self.executor_params.hash() == other.executor_params.hash()
} }
} }
impl Eq for PvfWithExecutorParams {} impl Eq for PvfPrepData {}
+4 -3
View File
@@ -17,8 +17,8 @@
use assert_matches::assert_matches; use assert_matches::assert_matches;
use parity_scale_codec::Encode as _; use parity_scale_codec::Encode as _;
use polkadot_node_core_pvf::{ use polkadot_node_core_pvf::{
start, Config, InvalidCandidate, Metrics, PvfWithExecutorParams, ValidationError, start, Config, InvalidCandidate, Metrics, PvfPrepData, ValidationError, ValidationHost,
ValidationHost, JOB_TIMEOUT_WALL_CLOCK_FACTOR, JOB_TIMEOUT_WALL_CLOCK_FACTOR,
}; };
use polkadot_parachain::primitives::{BlockData, ValidationParams, ValidationResult}; use polkadot_parachain::primitives::{BlockData, ValidationParams, ValidationResult};
use polkadot_primitives::vstaging::{ExecutorParam, ExecutorParams}; use polkadot_primitives::vstaging::{ExecutorParam, ExecutorParams};
@@ -30,6 +30,7 @@ mod worker_common;
const PUPPET_EXE: &str = env!("CARGO_BIN_EXE_puppet_worker"); const PUPPET_EXE: &str = env!("CARGO_BIN_EXE_puppet_worker");
const TEST_EXECUTION_TIMEOUT: Duration = Duration::from_secs(3); const TEST_EXECUTION_TIMEOUT: Duration = Duration::from_secs(3);
const TEST_PREPARATION_TIMEOUT: Duration = Duration::from_secs(3);
struct TestHost { struct TestHost {
_cache_dir: tempfile::TempDir, _cache_dir: tempfile::TempDir,
@@ -69,7 +70,7 @@ impl TestHost {
.lock() .lock()
.await .await
.execute_pvf( .execute_pvf(
PvfWithExecutorParams::from_code(code.into(), executor_params), PvfPrepData::from_code(code.into(), executor_params, TEST_PREPARATION_TIMEOUT),
TEST_EXECUTION_TIMEOUT, TEST_EXECUTION_TIMEOUT,
params.encode(), params.encode(),
polkadot_node_core_pvf::Priority::Normal, polkadot_node_core_pvf::Priority::Normal,
@@ -32,7 +32,7 @@ use polkadot_overseer::{
gen::{FromOrchestra, SpawnedSubsystem}, gen::{FromOrchestra, SpawnedSubsystem},
HeadSupportsParachains, SubsystemError, HeadSupportsParachains, SubsystemError,
}; };
use polkadot_primitives::{CandidateReceipt, Hash}; use polkadot_primitives::{CandidateReceipt, Hash, PvfExecTimeoutKind};
struct AlwaysSupportsParachains; struct AlwaysSupportsParachains;
@@ -76,7 +76,7 @@ impl Subsystem1 {
let msg = CandidateValidationMessage::ValidateFromChainState( let msg = CandidateValidationMessage::ValidateFromChainState(
candidate_receipt, candidate_receipt,
PoV { block_data: BlockData(Vec::new()) }.into(), PoV { block_data: BlockData(Vec::new()) }.into(),
Default::default(), PvfExecTimeoutKind::Backing,
tx, tx,
); );
ctx.send_message(msg).await; ctx.send_message(msg).await;
+4 -4
View File
@@ -30,8 +30,8 @@ use polkadot_node_subsystem_types::{
ActivatedLeaf, LeafStatus, ActivatedLeaf, LeafStatus,
}; };
use polkadot_primitives::{ use polkadot_primitives::{
CandidateHash, CandidateReceipt, CollatorPair, InvalidDisputeStatementKind, SessionIndex, CandidateHash, CandidateReceipt, CollatorPair, InvalidDisputeStatementKind, PvfExecTimeoutKind,
ValidDisputeStatementKind, ValidatorIndex, SessionIndex, ValidDisputeStatementKind, ValidatorIndex,
}; };
use crate::{ use crate::{
@@ -106,7 +106,7 @@ where
ctx.send_message(CandidateValidationMessage::ValidateFromChainState( ctx.send_message(CandidateValidationMessage::ValidateFromChainState(
candidate_receipt, candidate_receipt,
PoV { block_data: BlockData(Vec::new()) }.into(), PoV { block_data: BlockData(Vec::new()) }.into(),
Default::default(), PvfExecTimeoutKind::Backing,
tx, tx,
)) ))
.await; .await;
@@ -779,7 +779,7 @@ fn test_candidate_validation_msg() -> CandidateValidationMessage {
CandidateValidationMessage::ValidateFromChainState( CandidateValidationMessage::ValidateFromChainState(
candidate_receipt, candidate_receipt,
pov, pov,
Duration::default(), PvfExecTimeoutKind::Backing,
sender, sender,
) )
} }
+1 -15
View File
@@ -22,7 +22,7 @@
#![deny(missing_docs)] #![deny(missing_docs)]
use std::{pin::Pin, time::Duration}; use std::pin::Pin;
use bounded_vec::BoundedVec; use bounded_vec::BoundedVec;
use futures::Future; use futures::Future;
@@ -64,20 +64,6 @@ pub const VALIDATION_CODE_BOMB_LIMIT: usize = (MAX_CODE_SIZE * 4u32) as usize;
/// The bomb limit for decompressing PoV blobs. /// The bomb limit for decompressing PoV blobs.
pub const POV_BOMB_LIMIT: usize = (MAX_POV_SIZE * 4u32) as usize; pub const POV_BOMB_LIMIT: usize = (MAX_POV_SIZE * 4u32) as usize;
/// The amount of time to spend on execution during backing.
pub const BACKING_EXECUTION_TIMEOUT: Duration = Duration::from_secs(2);
/// The amount of time to spend on execution during approval or disputes.
///
/// This is deliberately much longer than the backing execution timeout to
/// ensure that in the absence of extremely large disparities between hardware,
/// blocks that pass backing are considered executable by approval checkers or
/// dispute participants.
///
/// NOTE: If this value is increased significantly, also check the dispute coordinator to consider
/// candidates longer into finalization: `DISPUTE_CANDIDATE_LIFETIME_AFTER_FINALIZATION`.
pub const APPROVAL_EXECUTION_TIMEOUT: Duration = Duration::from_secs(12);
/// How many blocks after finalization an information about backed/included candidate should be /// How many blocks after finalization an information about backed/included candidate should be
/// kept. /// kept.
/// ///
@@ -43,15 +43,14 @@ use polkadot_primitives::{
CandidateHash, CandidateIndex, CandidateReceipt, CollatorId, CommittedCandidateReceipt, CandidateHash, CandidateIndex, CandidateReceipt, CollatorId, CommittedCandidateReceipt,
CoreState, DisputeState, GroupIndex, GroupRotationInfo, Hash, Header as BlockHeader, CoreState, DisputeState, GroupIndex, GroupRotationInfo, Hash, Header as BlockHeader,
Id as ParaId, InboundDownwardMessage, InboundHrmpMessage, MultiDisputeStatementSet, Id as ParaId, InboundDownwardMessage, InboundHrmpMessage, MultiDisputeStatementSet,
OccupiedCoreAssumption, PersistedValidationData, PvfCheckStatement, SessionIndex, SessionInfo, OccupiedCoreAssumption, PersistedValidationData, PvfCheckStatement, PvfExecTimeoutKind,
SignedAvailabilityBitfield, SignedAvailabilityBitfields, ValidationCode, ValidationCodeHash, SessionIndex, SessionInfo, SignedAvailabilityBitfield, SignedAvailabilityBitfields,
ValidatorId, ValidatorIndex, ValidatorSignature, ValidationCode, ValidationCodeHash, ValidatorId, ValidatorIndex, ValidatorSignature,
}; };
use polkadot_statement_table::v2::Misbehavior; use polkadot_statement_table::v2::Misbehavior;
use std::{ use std::{
collections::{BTreeMap, HashMap, HashSet}, collections::{BTreeMap, HashMap, HashSet},
sync::Arc, sync::Arc,
time::Duration,
}; };
/// Network events as transmitted to other subsystems, wrapped in their message types. /// Network events as transmitted to other subsystems, wrapped in their message types.
@@ -121,7 +120,7 @@ pub enum CandidateValidationMessage {
CandidateReceipt, CandidateReceipt,
Arc<PoV>, Arc<PoV>,
/// Execution timeout /// Execution timeout
Duration, PvfExecTimeoutKind,
oneshot::Sender<Result<ValidationResult, ValidationFailed>>, oneshot::Sender<Result<ValidationResult, ValidationFailed>>,
), ),
/// Validate a candidate with provided, exhaustive parameters for validation. /// Validate a candidate with provided, exhaustive parameters for validation.
@@ -139,7 +138,7 @@ pub enum CandidateValidationMessage {
CandidateReceipt, CandidateReceipt,
Arc<PoV>, Arc<PoV>,
/// Execution timeout /// Execution timeout
Duration, PvfExecTimeoutKind,
oneshot::Sender<Result<ValidationResult, ValidationFailed>>, oneshot::Sender<Result<ValidationResult, ValidationFailed>>,
), ),
/// Try to compile the given validation code and send back /// Try to compile the given validation code and send back
+10 -10
View File
@@ -46,16 +46,16 @@ pub use v2::{
Header, HrmpChannelId, Id, InboundDownwardMessage, InboundHrmpMessage, IndexedVec, Header, HrmpChannelId, Id, InboundDownwardMessage, InboundHrmpMessage, IndexedVec,
InherentData, InvalidDisputeStatementKind, Moment, MultiDisputeStatementSet, Nonce, InherentData, InvalidDisputeStatementKind, Moment, MultiDisputeStatementSet, Nonce,
OccupiedCore, OccupiedCoreAssumption, OutboundHrmpMessage, ParathreadClaim, ParathreadEntry, OccupiedCore, OccupiedCoreAssumption, OutboundHrmpMessage, ParathreadClaim, ParathreadEntry,
PersistedValidationData, PvfCheckStatement, RuntimeMetricLabel, RuntimeMetricLabelValue, PersistedValidationData, PvfCheckStatement, PvfExecTimeoutKind, PvfPrepTimeoutKind,
RuntimeMetricLabelValues, RuntimeMetricLabels, RuntimeMetricOp, RuntimeMetricUpdate, RuntimeMetricLabel, RuntimeMetricLabelValue, RuntimeMetricLabelValues, RuntimeMetricLabels,
ScheduledCore, ScrapedOnChainVotes, SessionIndex, SessionInfo, Signature, Signed, RuntimeMetricOp, RuntimeMetricUpdate, ScheduledCore, ScrapedOnChainVotes, SessionIndex,
SignedAvailabilityBitfield, SignedAvailabilityBitfields, SignedStatement, SigningContext, Slot, SessionInfo, Signature, Signed, SignedAvailabilityBitfield, SignedAvailabilityBitfields,
UncheckedSigned, UncheckedSignedAvailabilityBitfield, UncheckedSignedAvailabilityBitfields, SignedStatement, SigningContext, Slot, UncheckedSigned, UncheckedSignedAvailabilityBitfield,
UncheckedSignedStatement, UpgradeGoAhead, UpgradeRestriction, UpwardMessage, UncheckedSignedAvailabilityBitfields, UncheckedSignedStatement, UpgradeGoAhead,
ValidDisputeStatementKind, ValidationCode, ValidationCodeHash, ValidatorId, ValidatorIndex, UpgradeRestriction, UpwardMessage, ValidDisputeStatementKind, ValidationCode,
ValidatorSignature, ValidityAttestation, ValidityError, ASSIGNMENT_KEY_TYPE_ID, ValidationCodeHash, ValidatorId, ValidatorIndex, ValidatorSignature, ValidityAttestation,
LOWEST_PUBLIC_ID, MAX_CODE_SIZE, MAX_HEAD_DATA_SIZE, MAX_POV_SIZE, ValidityError, ASSIGNMENT_KEY_TYPE_ID, LOWEST_PUBLIC_ID, MAX_CODE_SIZE, MAX_HEAD_DATA_SIZE,
PARACHAINS_INHERENT_IDENTIFIER, PARACHAIN_KEY_TYPE_ID, MAX_POV_SIZE, PARACHAINS_INHERENT_IDENTIFIER, PARACHAIN_KEY_TYPE_ID,
}; };
#[cfg(feature = "std")] #[cfg(feature = "std")]
+27
View File
@@ -1689,6 +1689,33 @@ impl PvfCheckStatement {
} }
} }
/// Type discriminator for PVF preparation timeouts
#[derive(Encode, Decode, TypeInfo, Clone, Copy, Debug, PartialEq, Eq)]
pub enum PvfPrepTimeoutKind {
/// For prechecking requests, the time period after which the preparation worker is considered
/// unresponsive and will be killed.
Precheck,
/// For execution and heads-up requests, the time period after which the preparation worker is
/// considered unresponsive and will be killed. More lenient than the timeout for prechecking
/// to prevent honest validators from timing out on valid PVFs.
Lenient,
}
/// Type discriminator for PVF execution timeouts
#[derive(Encode, Decode, TypeInfo, Clone, Copy, Debug, PartialEq, Eq)]
pub enum PvfExecTimeoutKind {
/// The amount of time to spend on execution during backing.
Backing,
/// The amount of time to spend on execution during approval or disputes.
///
/// This should be much longer than the backing execution timeout to ensure that in the
/// absence of extremely large disparities between hardware, blocks that pass backing are
/// considered executable by approval checkers or dispute participants.
Approval,
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
@@ -21,24 +21,34 @@
//! by the first element of the vector). Decoding to a usable semantics structure is //! by the first element of the vector). Decoding to a usable semantics structure is
//! done in `polkadot-node-core-pvf`. //! done in `polkadot-node-core-pvf`.
use crate::{BlakeTwo256, HashT as _}; use crate::{BlakeTwo256, HashT as _, PvfExecTimeoutKind, PvfPrepTimeoutKind};
use parity_scale_codec::{Decode, Encode}; use parity_scale_codec::{Decode, Encode};
use polkadot_core_primitives::Hash; use polkadot_core_primitives::Hash;
use scale_info::TypeInfo; use scale_info::TypeInfo;
use sp_std::{ops::Deref, vec, vec::Vec}; use sp_std::{ops::Deref, time::Duration, vec, vec::Vec};
/// The different executor parameters for changing the execution environment semantics. /// The different executor parameters for changing the execution environment semantics.
#[derive(Clone, Debug, Encode, Decode, PartialEq, Eq, TypeInfo)] #[derive(Clone, Debug, Encode, Decode, PartialEq, Eq, TypeInfo)]
pub enum ExecutorParam { pub enum ExecutorParam {
/// Maximum number of memory pages (64KiB bytes per page) the executor can allocate. /// Maximum number of memory pages (64KiB bytes per page) the executor can allocate.
#[codec(index = 1)]
MaxMemoryPages(u32), MaxMemoryPages(u32),
/// Wasm logical stack size limit (max. number of Wasm values on stack) /// Wasm logical stack size limit (max. number of Wasm values on stack)
#[codec(index = 2)]
StackLogicalMax(u32), StackLogicalMax(u32),
/// Executor machine stack size limit, in bytes /// Executor machine stack size limit, in bytes
#[codec(index = 3)]
StackNativeMax(u32), StackNativeMax(u32),
/// Max. amount of memory the preparation worker is allowed to use during /// Max. amount of memory the preparation worker is allowed to use during
/// pre-checking, in bytes /// pre-checking, in bytes
#[codec(index = 4)]
PrecheckingMaxMemory(u64), PrecheckingMaxMemory(u64),
/// PVF preparation timeouts, millisec
#[codec(index = 5)]
PvfPrepTimeout(PvfPrepTimeoutKind, u64),
/// PVF execution timeouts, millisec
#[codec(index = 6)]
PvfExecTimeout(PvfExecTimeoutKind, u64),
} }
/// Unit type wrapper around [`type@Hash`] that represents an execution parameter set hash. /// Unit type wrapper around [`type@Hash`] that represents an execution parameter set hash.
@@ -92,6 +102,30 @@ impl ExecutorParams {
pub fn hash(&self) -> ExecutorParamsHash { pub fn hash(&self) -> ExecutorParamsHash {
ExecutorParamsHash(BlakeTwo256::hash(&self.encode())) ExecutorParamsHash(BlakeTwo256::hash(&self.encode()))
} }
/// Returns a PVF preparation timeout, if any
pub fn pvf_prep_timeout(&self, kind: PvfPrepTimeoutKind) -> Option<Duration> {
for param in &self.0 {
if let ExecutorParam::PvfPrepTimeout(k, timeout) = param {
if kind == *k {
return Some(Duration::from_millis(*timeout))
}
}
}
None
}
/// Returns a PVF execution timeout, if any
pub fn pvf_exec_timeout(&self, kind: PvfExecTimeoutKind) -> Option<Duration> {
for param in &self.0 {
if let ExecutorParam::PvfExecTimeout(k, timeout) = param {
if kind == *k {
return Some(Duration::from_millis(*timeout))
}
}
}
None
}
} }
impl Deref for ExecutorParams { impl Deref for ExecutorParams {