mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-05-30 08:11:03 +00:00
Retry failed PVF execution (AmbiguousWorkerDeath) (#6235)
* Fix a couple of typos * Retry failed PVF execution PVF execution that fails due to AmbiguousWorkerDeath should be retried once. This should reduce the occurrence of failures due to transient conditions. Closes #6195 * Address a couple of nits * Write tests; refactor (add `validate_candidate_with_retry`) * Update node/core/candidate-validation/src/lib.rs Co-authored-by: Andronik <write@reusable.software> Co-authored-by: eskimor <eskimor@users.noreply.github.com> Co-authored-by: Andronik <write@reusable.software>
This commit is contained in:
Generated
+1
@@ -6428,6 +6428,7 @@ dependencies = [
|
|||||||
"assert_matches",
|
"assert_matches",
|
||||||
"async-trait",
|
"async-trait",
|
||||||
"futures",
|
"futures",
|
||||||
|
"futures-timer",
|
||||||
"parity-scale-codec",
|
"parity-scale-codec",
|
||||||
"polkadot-node-core-pvf",
|
"polkadot-node-core-pvf",
|
||||||
"polkadot-node-primitives",
|
"polkadot-node-primitives",
|
||||||
|
|||||||
@@ -7,6 +7,7 @@ edition = "2021"
|
|||||||
[dependencies]
|
[dependencies]
|
||||||
async-trait = "0.1.57"
|
async-trait = "0.1.57"
|
||||||
futures = "0.3.21"
|
futures = "0.3.21"
|
||||||
|
futures-timer = "3.0.2"
|
||||||
gum = { package = "tracing-gum", path = "../../gum" }
|
gum = { package = "tracing-gum", path = "../../gum" }
|
||||||
|
|
||||||
sp-maybe-compressed-blob = { package = "sp-maybe-compressed-blob", git = "https://github.com/paritytech/substrate", branch = "master" }
|
sp-maybe-compressed-blob = { package = "sp-maybe-compressed-blob", git = "https://github.com/paritytech/substrate", branch = "master" }
|
||||||
|
|||||||
@@ -60,6 +60,12 @@ mod tests;
|
|||||||
|
|
||||||
const LOG_TARGET: &'static str = "parachain::candidate-validation";
|
const LOG_TARGET: &'static str = "parachain::candidate-validation";
|
||||||
|
|
||||||
|
/// The amount of time to wait before retrying after an AmbiguousWorkerDeath validation error.
|
||||||
|
#[cfg(not(test))]
|
||||||
|
const PVF_EXECUTION_RETRY_DELAY: Duration = Duration::from_secs(3);
|
||||||
|
#[cfg(test)]
|
||||||
|
const PVF_EXECUTION_RETRY_DELAY: Duration = Duration::from_millis(200);
|
||||||
|
|
||||||
/// Configuration for the candidate validation subsystem
|
/// Configuration for the candidate validation subsystem
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct Config {
|
pub struct Config {
|
||||||
@@ -490,7 +496,7 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
async fn validate_candidate_exhaustive(
|
async fn validate_candidate_exhaustive(
|
||||||
mut validation_backend: impl ValidationBackend,
|
mut validation_backend: impl ValidationBackend + Send,
|
||||||
persisted_validation_data: PersistedValidationData,
|
persisted_validation_data: PersistedValidationData,
|
||||||
validation_code: ValidationCode,
|
validation_code: ValidationCode,
|
||||||
candidate_receipt: CandidateReceipt,
|
candidate_receipt: CandidateReceipt,
|
||||||
@@ -551,7 +557,7 @@ async fn validate_candidate_exhaustive(
|
|||||||
};
|
};
|
||||||
|
|
||||||
let result = validation_backend
|
let result = validation_backend
|
||||||
.validate_candidate(raw_validation_code.to_vec(), timeout, params)
|
.validate_candidate_with_retry(raw_validation_code.to_vec(), timeout, params)
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
if let Err(ref error) = result {
|
if let Err(ref error) = result {
|
||||||
@@ -604,45 +610,63 @@ async fn validate_candidate_exhaustive(
|
|||||||
#[async_trait]
|
#[async_trait]
|
||||||
trait ValidationBackend {
|
trait ValidationBackend {
|
||||||
async fn validate_candidate(
|
async fn validate_candidate(
|
||||||
|
&mut self,
|
||||||
|
pvf: Pvf,
|
||||||
|
timeout: Duration,
|
||||||
|
encoded_params: Vec<u8>,
|
||||||
|
) -> Result<WasmValidationResult, ValidationError>;
|
||||||
|
|
||||||
|
async fn validate_candidate_with_retry(
|
||||||
&mut self,
|
&mut self,
|
||||||
raw_validation_code: Vec<u8>,
|
raw_validation_code: Vec<u8>,
|
||||||
timeout: Duration,
|
timeout: Duration,
|
||||||
params: ValidationParams,
|
params: ValidationParams,
|
||||||
) -> Result<WasmValidationResult, ValidationError>;
|
) -> Result<WasmValidationResult, ValidationError> {
|
||||||
|
// Construct the PVF a single time, since it is an expensive operation. Cloning it is cheap.
|
||||||
|
let pvf = Pvf::from_code(raw_validation_code);
|
||||||
|
|
||||||
|
let validation_result =
|
||||||
|
self.validate_candidate(pvf.clone(), timeout, params.encode()).await;
|
||||||
|
|
||||||
|
// If we get an AmbiguousWorkerDeath error, retry once after a brief delay, on the
|
||||||
|
// assumption that the conditions that caused this error may have been transient.
|
||||||
|
if let Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::AmbiguousWorkerDeath)) =
|
||||||
|
validation_result
|
||||||
|
{
|
||||||
|
// Wait a brief delay before retrying.
|
||||||
|
futures_timer::Delay::new(PVF_EXECUTION_RETRY_DELAY).await;
|
||||||
|
// Encode the params again when re-trying. We expect the retry case to be relatively
|
||||||
|
// rare, and we want to avoid unconditionally cloning data.
|
||||||
|
self.validate_candidate(pvf, timeout, params.encode()).await
|
||||||
|
} else {
|
||||||
|
validation_result
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
async fn precheck_pvf(&mut self, pvf: Pvf) -> Result<(), PrepareError>;
|
async fn precheck_pvf(&mut self, pvf: Pvf) -> Result<(), PrepareError>;
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl ValidationBackend for ValidationHost {
|
impl ValidationBackend for ValidationHost {
|
||||||
|
/// Tries executing a PVF a single time (no retries).
|
||||||
async fn validate_candidate(
|
async fn validate_candidate(
|
||||||
&mut self,
|
&mut self,
|
||||||
raw_validation_code: Vec<u8>,
|
pvf: Pvf,
|
||||||
timeout: Duration,
|
timeout: Duration,
|
||||||
params: ValidationParams,
|
encoded_params: Vec<u8>,
|
||||||
) -> Result<WasmValidationResult, ValidationError> {
|
) -> Result<WasmValidationResult, ValidationError> {
|
||||||
|
let priority = polkadot_node_core_pvf::Priority::Normal;
|
||||||
|
|
||||||
let (tx, rx) = oneshot::channel();
|
let (tx, rx) = oneshot::channel();
|
||||||
if let Err(err) = self
|
if let Err(err) = self.execute_pvf(pvf, timeout, encoded_params, priority, tx).await {
|
||||||
.execute_pvf(
|
|
||||||
Pvf::from_code(raw_validation_code),
|
|
||||||
timeout,
|
|
||||||
params.encode(),
|
|
||||||
polkadot_node_core_pvf::Priority::Normal,
|
|
||||||
tx,
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
{
|
|
||||||
return Err(ValidationError::InternalError(format!(
|
return Err(ValidationError::InternalError(format!(
|
||||||
"cannot send pvf to the validation host: {:?}",
|
"cannot send pvf to the validation host: {:?}",
|
||||||
err
|
err
|
||||||
)))
|
)))
|
||||||
}
|
}
|
||||||
|
|
||||||
let validation_result = rx
|
rx.await
|
||||||
.await
|
.map_err(|_| ValidationError::InternalError("validation was cancelled".into()))?
|
||||||
.map_err(|_| ValidationError::InternalError("validation was cancelled".into()))?;
|
|
||||||
|
|
||||||
validation_result
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn precheck_pvf(&mut self, pvf: Pvf) -> Result<(), PrepareError> {
|
async fn precheck_pvf(&mut self, pvf: Pvf) -> Result<(), PrepareError> {
|
||||||
|
|||||||
@@ -345,12 +345,19 @@ fn check_does_not_match() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
struct MockValidateCandidateBackend {
|
struct MockValidateCandidateBackend {
|
||||||
result: Result<WasmValidationResult, ValidationError>,
|
result_list: Vec<Result<WasmValidationResult, ValidationError>>,
|
||||||
|
num_times_called: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl MockValidateCandidateBackend {
|
impl MockValidateCandidateBackend {
|
||||||
fn with_hardcoded_result(result: Result<WasmValidationResult, ValidationError>) -> Self {
|
fn with_hardcoded_result(result: Result<WasmValidationResult, ValidationError>) -> Self {
|
||||||
Self { result }
|
Self { result_list: vec![result], num_times_called: 0 }
|
||||||
|
}
|
||||||
|
|
||||||
|
fn with_hardcoded_result_list(
|
||||||
|
result_list: Vec<Result<WasmValidationResult, ValidationError>>,
|
||||||
|
) -> Self {
|
||||||
|
Self { result_list, num_times_called: 0 }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -358,11 +365,16 @@ impl MockValidateCandidateBackend {
|
|||||||
impl ValidationBackend for MockValidateCandidateBackend {
|
impl ValidationBackend for MockValidateCandidateBackend {
|
||||||
async fn validate_candidate(
|
async fn validate_candidate(
|
||||||
&mut self,
|
&mut self,
|
||||||
_raw_validation_code: Vec<u8>,
|
_pvf: Pvf,
|
||||||
_timeout: Duration,
|
_timeout: Duration,
|
||||||
_params: ValidationParams,
|
_encoded_params: Vec<u8>,
|
||||||
) -> Result<WasmValidationResult, ValidationError> {
|
) -> Result<WasmValidationResult, ValidationError> {
|
||||||
self.result.clone()
|
// This is expected to panic if called more times than expected, indicating an error in the
|
||||||
|
// test.
|
||||||
|
let result = self.result_list[self.num_times_called].clone();
|
||||||
|
self.num_times_called += 1;
|
||||||
|
|
||||||
|
result
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn precheck_pvf(&mut self, _pvf: Pvf) -> Result<(), PrepareError> {
|
async fn precheck_pvf(&mut self, _pvf: Pvf) -> Result<(), PrepareError> {
|
||||||
@@ -468,7 +480,7 @@ fn candidate_validation_bad_return_is_invalid() {
|
|||||||
|
|
||||||
let v = executor::block_on(validate_candidate_exhaustive(
|
let v = executor::block_on(validate_candidate_exhaustive(
|
||||||
MockValidateCandidateBackend::with_hardcoded_result(Err(
|
MockValidateCandidateBackend::with_hardcoded_result(Err(
|
||||||
ValidationError::InvalidCandidate(WasmInvalidCandidate::AmbiguousWorkerDeath),
|
ValidationError::InvalidCandidate(WasmInvalidCandidate::HardTimeout),
|
||||||
)),
|
)),
|
||||||
validation_data,
|
validation_data,
|
||||||
validation_code,
|
validation_code,
|
||||||
@@ -479,6 +491,122 @@ fn candidate_validation_bad_return_is_invalid() {
|
|||||||
))
|
))
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
|
assert_matches!(v, ValidationResult::Invalid(InvalidCandidate::Timeout));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn candidate_validation_one_ambiguous_error_is_valid() {
|
||||||
|
let validation_data = PersistedValidationData { max_pov_size: 1024, ..Default::default() };
|
||||||
|
|
||||||
|
let pov = PoV { block_data: BlockData(vec![1; 32]) };
|
||||||
|
let head_data = HeadData(vec![1, 1, 1]);
|
||||||
|
let validation_code = ValidationCode(vec![2; 16]);
|
||||||
|
|
||||||
|
let descriptor = make_valid_candidate_descriptor(
|
||||||
|
ParaId::from(1_u32),
|
||||||
|
dummy_hash(),
|
||||||
|
validation_data.hash(),
|
||||||
|
pov.hash(),
|
||||||
|
validation_code.hash(),
|
||||||
|
head_data.hash(),
|
||||||
|
dummy_hash(),
|
||||||
|
Sr25519Keyring::Alice,
|
||||||
|
);
|
||||||
|
|
||||||
|
let check = perform_basic_checks(
|
||||||
|
&descriptor,
|
||||||
|
validation_data.max_pov_size,
|
||||||
|
&pov,
|
||||||
|
&validation_code.hash(),
|
||||||
|
);
|
||||||
|
assert!(check.is_ok());
|
||||||
|
|
||||||
|
let validation_result = WasmValidationResult {
|
||||||
|
head_data,
|
||||||
|
new_validation_code: Some(vec![2, 2, 2].into()),
|
||||||
|
upward_messages: Vec::new(),
|
||||||
|
horizontal_messages: Vec::new(),
|
||||||
|
processed_downward_messages: 0,
|
||||||
|
hrmp_watermark: 0,
|
||||||
|
};
|
||||||
|
|
||||||
|
let commitments = CandidateCommitments {
|
||||||
|
head_data: validation_result.head_data.clone(),
|
||||||
|
upward_messages: validation_result.upward_messages.clone(),
|
||||||
|
horizontal_messages: validation_result.horizontal_messages.clone(),
|
||||||
|
new_validation_code: validation_result.new_validation_code.clone(),
|
||||||
|
processed_downward_messages: validation_result.processed_downward_messages,
|
||||||
|
hrmp_watermark: validation_result.hrmp_watermark,
|
||||||
|
};
|
||||||
|
|
||||||
|
let candidate_receipt = CandidateReceipt { descriptor, commitments_hash: commitments.hash() };
|
||||||
|
|
||||||
|
let v = executor::block_on(validate_candidate_exhaustive(
|
||||||
|
MockValidateCandidateBackend::with_hardcoded_result_list(vec![
|
||||||
|
Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::AmbiguousWorkerDeath)),
|
||||||
|
Ok(validation_result),
|
||||||
|
]),
|
||||||
|
validation_data.clone(),
|
||||||
|
validation_code,
|
||||||
|
candidate_receipt,
|
||||||
|
Arc::new(pov),
|
||||||
|
Duration::from_secs(0),
|
||||||
|
&Default::default(),
|
||||||
|
))
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
assert_matches!(v, ValidationResult::Valid(outputs, used_validation_data) => {
|
||||||
|
assert_eq!(outputs.head_data, HeadData(vec![1, 1, 1]));
|
||||||
|
assert_eq!(outputs.upward_messages, Vec::<UpwardMessage>::new());
|
||||||
|
assert_eq!(outputs.horizontal_messages, Vec::new());
|
||||||
|
assert_eq!(outputs.new_validation_code, Some(vec![2, 2, 2].into()));
|
||||||
|
assert_eq!(outputs.hrmp_watermark, 0);
|
||||||
|
assert_eq!(used_validation_data, validation_data);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn candidate_validation_multiple_ambiguous_errors_is_invalid() {
|
||||||
|
let validation_data = PersistedValidationData { max_pov_size: 1024, ..Default::default() };
|
||||||
|
|
||||||
|
let pov = PoV { block_data: BlockData(vec![1; 32]) };
|
||||||
|
let validation_code = ValidationCode(vec![2; 16]);
|
||||||
|
|
||||||
|
let descriptor = make_valid_candidate_descriptor(
|
||||||
|
ParaId::from(1_u32),
|
||||||
|
dummy_hash(),
|
||||||
|
validation_data.hash(),
|
||||||
|
pov.hash(),
|
||||||
|
validation_code.hash(),
|
||||||
|
dummy_hash(),
|
||||||
|
dummy_hash(),
|
||||||
|
Sr25519Keyring::Alice,
|
||||||
|
);
|
||||||
|
|
||||||
|
let check = perform_basic_checks(
|
||||||
|
&descriptor,
|
||||||
|
validation_data.max_pov_size,
|
||||||
|
&pov,
|
||||||
|
&validation_code.hash(),
|
||||||
|
);
|
||||||
|
assert!(check.is_ok());
|
||||||
|
|
||||||
|
let candidate_receipt = CandidateReceipt { descriptor, commitments_hash: Hash::zero() };
|
||||||
|
|
||||||
|
let v = executor::block_on(validate_candidate_exhaustive(
|
||||||
|
MockValidateCandidateBackend::with_hardcoded_result_list(vec![
|
||||||
|
Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::AmbiguousWorkerDeath)),
|
||||||
|
Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::AmbiguousWorkerDeath)),
|
||||||
|
]),
|
||||||
|
validation_data,
|
||||||
|
validation_code,
|
||||||
|
candidate_receipt,
|
||||||
|
Arc::new(pov),
|
||||||
|
Duration::from_secs(0),
|
||||||
|
&Default::default(),
|
||||||
|
))
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
assert_matches!(v, ValidationResult::Invalid(InvalidCandidate::ExecutionError(_)));
|
assert_matches!(v, ValidationResult::Invalid(InvalidCandidate::ExecutionError(_)));
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -779,9 +907,9 @@ impl MockPreCheckBackend {
|
|||||||
impl ValidationBackend for MockPreCheckBackend {
|
impl ValidationBackend for MockPreCheckBackend {
|
||||||
async fn validate_candidate(
|
async fn validate_candidate(
|
||||||
&mut self,
|
&mut self,
|
||||||
_raw_validation_code: Vec<u8>,
|
_pvf: Pvf,
|
||||||
_timeout: Duration,
|
_timeout: Duration,
|
||||||
_params: ValidationParams,
|
_encoded_params: Vec<u8>,
|
||||||
) -> Result<WasmValidationResult, ValidationError> {
|
) -> Result<WasmValidationResult, ValidationError> {
|
||||||
unreachable!()
|
unreachable!()
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -252,8 +252,8 @@ fn handle_job_finish(
|
|||||||
"execute worker concluded",
|
"execute worker concluded",
|
||||||
);
|
);
|
||||||
|
|
||||||
// First we send the result. It may fail due the other end of the channel being dropped, that's
|
// First we send the result. It may fail due to the other end of the channel being dropped,
|
||||||
// legitimate and we don't treat that as an error.
|
// that's legitimate and we don't treat that as an error.
|
||||||
let _ = result_tx.send(result);
|
let _ = result_tx.send(result);
|
||||||
|
|
||||||
// Then, we should deal with the worker:
|
// Then, we should deal with the worker:
|
||||||
@@ -305,7 +305,7 @@ async fn spawn_worker_task(program_path: PathBuf, spawn_timeout: Duration) -> Qu
|
|||||||
Err(err) => {
|
Err(err) => {
|
||||||
gum::warn!(target: LOG_TARGET, "failed to spawn an execute worker: {:?}", err);
|
gum::warn!(target: LOG_TARGET, "failed to spawn an execute worker: {:?}", err);
|
||||||
|
|
||||||
// Assume that the failure intermittent and retry after a delay.
|
// Assume that the failure is intermittent and retry after a delay.
|
||||||
Delay::new(Duration::from_secs(3)).await;
|
Delay::new(Duration::from_secs(3)).await;
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user