PVF: Vote invalid on panics in execution thread (after a retry) (#7155)

* PVF: Remove `rayon` and some uses of `tokio`

1. We were using `rayon` to spawn a superfluous thread to do execution, so it was removed.

2. We were using `rayon` to set a threadpool-specific thread stack size, and AFAIK we couldn't do that with `tokio` (it's possible [per-runtime](https://docs.rs/tokio/latest/tokio/runtime/struct.Builder.html#method.thread_stack_size) but not per-thread). Since we want to remove `tokio` from the workers [anyway](https://github.com/paritytech/polkadot/issues/7117), I changed it to spawn threads with the `std::thread` API instead of `tokio`.[^1]

[^1]: NOTE: This PR does not totally remove the `tokio` dependency just yet.

3. Since `std::thread` API is not async, we could no longer `select!` on the threads as futures, so the `select!` was changed to a naive loop.

4. The order of thread selection was flipped to make (3) sound (see note in code).

I left some TODO's related to panics which I'm going to address soon as part of https://github.com/paritytech/polkadot/issues/7045.

* PVF: Vote invalid on panics in execution thread (after a retry)

Also make sure we kill the worker process on panic errors and internal errors to
potentially clear any error states independent of the candidate.

* Address a couple of TODOs

Addresses a couple of follow-up TODOs from
https://github.com/paritytech/polkadot/pull/7153.

* Add some documentation to implementer's guide

* Fix compile error

* Fix compile errors

* Fix compile error

* Update roadmap/implementers-guide/src/node/utility/candidate-validation.md

Co-authored-by: Andrei Sandu <54316454+sandreim@users.noreply.github.com>

* Address comments + couple other changes (see message)

- Measure the CPU time in the prepare thread, so the observed time is not
  affected by any delays in joining on the thread.

- Measure the full CPU time in the execute thread.

* Implement proper thread synchronization

Use condvars i.e. `Arc::new((Mutex::new(true), Condvar::new()))` as per the std
docs.

Considered also using a condvar to signal the CPU thread to end, in place of an
mpsc channel. This was not done because `Condvar::wait_timeout_while` is
documented as being imprecise, and `mpsc::Receiver::recv_timeout` is not
documented as such. Also, we would need a separate condvar, to avoid this case:
the worker thread finishes its job, notifies the condvar, the CPU thread returns
first, and we join on it and not the worker thread. So it was simpler to leave
this part as is.

* Catch panics in threads so we always notify condvar

* Use `WaitOutcome` enum instead of bool condition variable

* Fix retry timeouts to depend on exec timeout kind

* Address review comments

* Make the API for condvars in workers nicer

* Add a doc

* Use condvar for memory stats thread

* Small refactor

* Enumerate internal validation errors in an enum

* Fix comment

* Add a log

* Fix test

* Update variant naming

* Address a missed TODO

---------

Co-authored-by: Andrei Sandu <54316454+sandreim@users.noreply.github.com>
This commit is contained in:
Marcin S
2023-05-16 17:01:02 -04:00
committed by GitHub
parent b75b137b0f
commit 82e4dbcc2d
9 changed files with 236 additions and 61 deletions
@@ -24,8 +24,8 @@
#![warn(missing_docs)]
use polkadot_node_core_pvf::{
InvalidCandidate as WasmInvalidCandidate, PrepareError, PrepareStats, PvfPrepData,
ValidationError, ValidationHost,
InternalValidationError, InvalidCandidate as WasmInvalidCandidate, PrepareError, PrepareStats,
PvfPrepData, ValidationError, ValidationHost,
};
use polkadot_node_primitives::{
BlockData, InvalidCandidate, PoV, ValidationResult, POV_BOMB_LIMIT, VALIDATION_CODE_BOMB_LIMIT,
@@ -51,7 +51,11 @@ use parity_scale_codec::Encode;
use futures::{channel::oneshot, prelude::*};
use std::{path::PathBuf, sync::Arc, time::Duration};
use std::{
path::PathBuf,
sync::Arc,
time::{Duration, Instant},
};
use async_trait::async_trait;
@@ -63,11 +67,19 @@ mod tests;
const LOG_TARGET: &'static str = "parachain::candidate-validation";
/// The amount of time to wait before retrying after an AmbiguousWorkerDeath validation error.
/// The amount of time to wait before retrying after a retry-able backing validation error. We use a lower value for the
/// backing case, to fit within the lower backing timeout.
#[cfg(not(test))]
const PVF_EXECUTION_RETRY_DELAY: Duration = Duration::from_secs(3);
const PVF_BACKING_EXECUTION_RETRY_DELAY: Duration = Duration::from_millis(500);
#[cfg(test)]
const PVF_EXECUTION_RETRY_DELAY: Duration = Duration::from_millis(200);
const PVF_BACKING_EXECUTION_RETRY_DELAY: Duration = Duration::from_millis(200);
/// The amount of time to wait before retrying after a retry-able approval validation error. We use a higher value for
/// the approval case since we have more time, and if we wait longer it is more likely that transient conditions will
/// resolve.
#[cfg(not(test))]
const PVF_APPROVAL_EXECUTION_RETRY_DELAY: Duration = Duration::from_secs(3);
#[cfg(test)]
const PVF_APPROVAL_EXECUTION_RETRY_DELAY: Duration = Duration::from_millis(200);
// Default PVF timeouts. Must never be changed! Use executor environment parameters in
// `session_info` pallet to adjust them. See also `PvfTimeoutKind` docs.
@@ -617,6 +629,7 @@ where
.validate_candidate_with_retry(
raw_validation_code.to_vec(),
pvf_exec_timeout(&executor_params, exec_timeout_kind),
exec_timeout_kind,
params,
executor_params,
)
@@ -627,7 +640,15 @@ where
}
match result {
Err(ValidationError::InternalError(e)) => Err(ValidationFailed(e)),
Err(ValidationError::InternalError(e)) => {
gum::warn!(
target: LOG_TARGET,
?para_id,
?e,
"An internal error occurred during validation, will abstain from voting",
);
Err(ValidationFailed(e.to_string()))
},
Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::HardTimeout)) =>
Ok(ValidationResult::Invalid(InvalidCandidate::Timeout)),
Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::WorkerReportedError(e))) =>
@@ -636,6 +657,8 @@ where
Ok(ValidationResult::Invalid(InvalidCandidate::ExecutionError(
"ambiguous worker death".to_string(),
))),
Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::Panic(err))) =>
Ok(ValidationResult::Invalid(InvalidCandidate::ExecutionError(err))),
Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::PrepareError(e))) => {
// In principle if preparation of the `WASM` fails, the current candidate can not be the
// reason for that. So we can't say whether it is invalid or not. In addition, with
@@ -698,24 +721,44 @@ trait ValidationBackend {
&mut self,
raw_validation_code: Vec<u8>,
exec_timeout: Duration,
exec_timeout_kind: PvfExecTimeoutKind,
params: ValidationParams,
executor_params: ExecutorParams,
) -> Result<WasmValidationResult, ValidationError> {
let prep_timeout = pvf_prep_timeout(&executor_params, PvfPrepTimeoutKind::Lenient);
// Construct the PVF a single time, since it is an expensive operation. Cloning it is cheap.
let pvf = PvfPrepData::from_code(raw_validation_code, executor_params, prep_timeout);
// We keep track of the total time that has passed and stop retrying if we are taking too long.
let total_time_start = Instant::now();
let mut validation_result =
self.validate_candidate(pvf.clone(), exec_timeout, params.encode()).await;
if validation_result.is_ok() {
return validation_result
}
let retry_delay = match exec_timeout_kind {
PvfExecTimeoutKind::Backing => PVF_BACKING_EXECUTION_RETRY_DELAY,
PvfExecTimeoutKind::Approval => PVF_APPROVAL_EXECUTION_RETRY_DELAY,
};
// Allow limited retries for each kind of error.
let mut num_internal_retries_left = 1;
let mut num_awd_retries_left = 1;
let mut num_panic_retries_left = 1;
loop {
// Stop retrying if we exceeded the timeout.
if total_time_start.elapsed() + retry_delay > exec_timeout {
break
}
match validation_result {
Err(ValidationError::InvalidCandidate(
WasmInvalidCandidate::AmbiguousWorkerDeath,
)) if num_awd_retries_left > 0 => num_awd_retries_left -= 1,
Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::Panic(_)))
if num_panic_retries_left > 0 =>
num_panic_retries_left -= 1,
Err(ValidationError::InternalError(_)) if num_internal_retries_left > 0 =>
num_internal_retries_left -= 1,
_ => break,
@@ -725,11 +768,14 @@ trait ValidationBackend {
// that the conditions that caused this error may have resolved on their own.
{
// Wait a brief delay before retrying.
futures_timer::Delay::new(PVF_EXECUTION_RETRY_DELAY).await;
futures_timer::Delay::new(retry_delay).await;
let new_timeout = exec_timeout.saturating_sub(total_time_start.elapsed());
gum::warn!(
target: LOG_TARGET,
?pvf,
?new_timeout,
"Re-trying failed candidate validation due to possible transient error: {:?}",
validation_result
);
@@ -737,7 +783,7 @@ trait ValidationBackend {
// Encode the params again when re-trying. We expect the retry case to be relatively
// rare, and we want to avoid unconditionally cloning data.
validation_result =
self.validate_candidate(pvf.clone(), exec_timeout, params.encode()).await;
self.validate_candidate(pvf.clone(), new_timeout, params.encode()).await;
}
}
@@ -760,14 +806,18 @@ impl ValidationBackend for ValidationHost {
let (tx, rx) = oneshot::channel();
if let Err(err) = self.execute_pvf(pvf, exec_timeout, encoded_params, priority, tx).await {
return Err(ValidationError::InternalError(format!(
"cannot send pvf to the validation host: {:?}",
return Err(InternalValidationError::HostCommunication(format!(
"cannot send pvf to the validation host, it might have shut down: {:?}",
err
)))
))
.into())
}
rx.await
.map_err(|_| ValidationError::InternalError("validation was cancelled".into()))?
rx.await.map_err(|_| {
ValidationError::from(InternalValidationError::HostCommunication(
"validation was cancelled".into(),
))
})?
}
async fn precheck_pvf(&mut self, pvf: PvfPrepData) -> Result<PrepareStats, PrepareError> {
@@ -540,6 +540,7 @@ fn candidate_validation_bad_return_is_invalid() {
assert_matches!(v, Ok(ValidationResult::Invalid(InvalidCandidate::Timeout)));
}
// Test that we vote valid if we get `AmbiguousWorkerDeath`, retry, and then succeed.
#[test]
fn candidate_validation_one_ambiguous_error_is_valid() {
let validation_data = PersistedValidationData { max_pov_size: 1024, ..Default::default() };
@@ -710,11 +711,11 @@ fn candidate_validation_retry_internal_errors() {
validate_candidate_exhaustive(
ctx.sender(),
MockValidateCandidateBackend::with_hardcoded_result_list(vec![
Err(ValidationError::InternalError("foo".into())),
Err(InternalValidationError::HostCommunication("foo".into()).into()),
// Throw an AWD error, we should still retry again.
Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::AmbiguousWorkerDeath)),
// Throw another internal error.
Err(ValidationError::InternalError("bar".into())),
Err(InternalValidationError::HostCommunication("bar".into()).into()),
]),
validation_data,
validation_code,
@@ -725,7 +726,63 @@ fn candidate_validation_retry_internal_errors() {
)
});
assert_matches!(v, Err(ValidationFailed(s)) if s == "bar".to_string());
assert_matches!(v, Err(ValidationFailed(s)) if s.contains("bar"));
}
// Test that we retry on panic errors.
#[test]
fn candidate_validation_retry_panic_errors() {
let validation_data = PersistedValidationData { max_pov_size: 1024, ..Default::default() };
let pov = PoV { block_data: BlockData(vec![1; 32]) };
let validation_code = ValidationCode(vec![2; 16]);
let descriptor = make_valid_candidate_descriptor(
ParaId::from(1_u32),
dummy_hash(),
validation_data.hash(),
pov.hash(),
validation_code.hash(),
dummy_hash(),
dummy_hash(),
Sr25519Keyring::Alice,
);
let check = perform_basic_checks(
&descriptor,
validation_data.max_pov_size,
&pov,
&validation_code.hash(),
);
assert!(check.is_ok());
let candidate_receipt = CandidateReceipt { descriptor, commitments_hash: Hash::zero() };
let pool = TaskExecutor::new();
let (mut ctx, ctx_handle) =
test_helpers::make_subsystem_context::<AllMessages, _>(pool.clone());
let metrics = Metrics::default();
let v = test_with_executor_params(ctx_handle, || {
validate_candidate_exhaustive(
ctx.sender(),
MockValidateCandidateBackend::with_hardcoded_result_list(vec![
Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::Panic("foo".into()))),
// Throw an AWD error, we should still retry again.
Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::AmbiguousWorkerDeath)),
// Throw another panic error.
Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::Panic("bar".into()))),
]),
validation_data,
validation_code,
candidate_receipt,
Arc::new(pov),
PvfExecTimeoutKind::Backing,
&metrics,
)
});
assert_matches!(v, Ok(ValidationResult::Invalid(InvalidCandidate::ExecutionError(s))) if s == "bar".to_string());
}
#[test]