PVF: Don't dispute on missing artifact (#7011)

* PVF: Don't dispute on missing artifact

A dispute should never be raised if the local cache doesn't provide a certain
artifact. You can not dispute based on this reason, as it is a local hardware
issue and not related to the candidate to check.

Design:

Currently we assume that if we prepared an artifact, it remains there on-disk
until we prune it, i.e. we never check again if it's still there.

We can change it so that instead of artifact-not-found triggering a dispute, we
retry once (like we do for AmbiguousWorkerDeath, except we don't dispute if it
still doesn't work). And when enqueuing an execute job, we check for the
artifact on-disk, and start preparation if not found.

Changes:

- [x] Integration test (should fail without the following changes)
- [x] Check if artifact exists when executing, prepare if not
- [x] Return an internal error when file is missing
- [x] Retry once on internal errors
- [x] Document design (update impl guide)

* Add some context to wasm error message (it is quite long)

* Fix impl guide

* Add check for missing/inaccessible file

* Add comment referencing Substrate issue

* Add test for retrying internal errors

---------

Co-authored-by: parity-processbot <>
This commit is contained in:
Marcin S
2023-04-20 15:38:31 +02:00
committed by GitHub
parent 023d459857
commit 0940cdd1d7
8 changed files with 286 additions and 99 deletions
+1 -1
View File
@@ -23,5 +23,5 @@
mod queue;
mod worker;
pub use queue::{start, ToQueue};
pub use queue::{start, PendingExecutionRequest, ToQueue};
pub use worker::{worker_entrypoint, Response as ExecuteResponse};
+14 -8
View File
@@ -50,13 +50,17 @@ slotmap::new_key_type! { struct Worker; }
#[derive(Debug)]
pub enum ToQueue {
Enqueue {
artifact: ArtifactPathId,
exec_timeout: Duration,
params: Vec<u8>,
executor_params: ExecutorParams,
result_tx: ResultSender,
},
Enqueue { artifact: ArtifactPathId, pending_execution_request: PendingExecutionRequest },
}
/// An execution request that should execute the PVF (known in the context) and send the results
/// to the given result sender.
#[derive(Debug)]
pub struct PendingExecutionRequest {
pub exec_timeout: Duration,
pub params: Vec<u8>,
pub executor_params: ExecutorParams,
pub result_tx: ResultSender,
}
struct ExecuteJob {
@@ -259,7 +263,9 @@ async fn purge_dead(metrics: &Metrics, workers: &mut Workers) {
}
fn handle_to_queue(queue: &mut Queue, to_queue: ToQueue) {
let ToQueue::Enqueue { artifact, exec_timeout, params, executor_params, result_tx } = to_queue;
let ToQueue::Enqueue { artifact, pending_execution_request } = to_queue;
let PendingExecutionRequest { exec_timeout, params, executor_params, result_tx } =
pending_execution_request;
gum::debug!(
target: LOG_TARGET,
validation_code_hash = ?artifact.id.code_hash,
@@ -349,6 +349,13 @@ fn validate_using_artifact(
executor: Arc<Executor>,
cpu_time_start: ProcessTime,
) -> Response {
// Check here if the file exists, because the error from Substrate is not match-able.
// TODO: Re-evaluate after <https://github.com/paritytech/substrate/issues/13860>.
let file_metadata = std::fs::metadata(artifact_path);
if let Err(err) = file_metadata {
return Response::format_internal("execute: could not find or open file", &err.to_string())
}
let descriptor_bytes = match unsafe {
// SAFETY: this should be safe since the compiled artifact passed here comes from the
// file created by the prepare workers. These files are obtained by calling
+103 -68
View File
@@ -23,7 +23,7 @@
use crate::{
artifacts::{ArtifactId, ArtifactPathId, ArtifactState, Artifacts},
error::PrepareError,
execute,
execute::{self, PendingExecutionRequest},
metrics::Metrics,
prepare, PrepareResult, Priority, PvfPrepData, ValidationError, LOG_TARGET,
};
@@ -33,7 +33,6 @@ use futures::{
Future, FutureExt, SinkExt, StreamExt,
};
use polkadot_parachain::primitives::ValidationResult;
use polkadot_primitives::ExecutorParams;
use std::{
collections::HashMap,
path::{Path, PathBuf},
@@ -249,36 +248,14 @@ pub fn start(config: Config, metrics: Metrics) -> (ValidationHost, impl Future<O
(validation_host, task)
}
/// An execution request that should execute the PVF (known in the context) and send the results
/// to the given result sender.
#[derive(Debug)]
struct PendingExecutionRequest {
exec_timeout: Duration,
params: Vec<u8>,
executor_params: ExecutorParams,
result_tx: ResultSender,
}
/// A mapping from an artifact ID which is in preparation state to the list of pending execution
/// requests that should be executed once the artifact's preparation is finished.
#[derive(Default)]
struct AwaitingPrepare(HashMap<ArtifactId, Vec<PendingExecutionRequest>>);
impl AwaitingPrepare {
fn add(
&mut self,
artifact_id: ArtifactId,
exec_timeout: Duration,
params: Vec<u8>,
executor_params: ExecutorParams,
result_tx: ResultSender,
) {
self.0.entry(artifact_id).or_default().push(PendingExecutionRequest {
exec_timeout,
params,
executor_params,
result_tx,
});
fn add(&mut self, artifact_id: ArtifactId, pending_execution_request: PendingExecutionRequest) {
self.0.entry(artifact_id).or_default().push(pending_execution_request);
}
fn take(&mut self, artifact_id: &ArtifactId) -> Vec<PendingExecutionRequest> {
@@ -475,6 +452,8 @@ async fn handle_precheck_pvf(
/// This will try to prepare the PVF, if a prepared artifact does not already exist. If there is already a
/// preparation job, we coalesce the two preparation jobs.
///
/// If the prepare job succeeded previously, we will enqueue an execute job right away.
///
/// If the prepare job failed previously, we may retry it under certain conditions.
///
/// When preparing for execution, we use a more lenient timeout ([`LENIENT_PREPARATION_TIMEOUT`])
@@ -489,32 +468,63 @@ async fn handle_execute_pvf(
) -> Result<(), Fatal> {
let ExecutePvfInputs { pvf, exec_timeout, params, priority, result_tx } = inputs;
let artifact_id = pvf.as_artifact_id();
let executor_params = (*pvf.executor_params()).clone();
if let Some(state) = artifacts.artifact_state_mut(&artifact_id) {
match state {
ArtifactState::Prepared { last_time_needed, .. } => {
*last_time_needed = SystemTime::now();
let file_metadata = std::fs::metadata(artifact_id.path(cache_path));
// This artifact has already been prepared, send it to the execute queue.
send_execute(
execute_queue,
execute::ToQueue::Enqueue {
artifact: ArtifactPathId::new(artifact_id, cache_path),
exec_timeout,
params,
executor_params: (*pvf.executor_params()).clone(),
result_tx,
},
)
.await?;
if file_metadata.is_ok() {
*last_time_needed = SystemTime::now();
// This artifact has already been prepared, send it to the execute queue.
send_execute(
execute_queue,
execute::ToQueue::Enqueue {
artifact: ArtifactPathId::new(artifact_id, cache_path),
pending_execution_request: PendingExecutionRequest {
exec_timeout,
params,
executor_params,
result_tx,
},
},
)
.await?;
} else {
gum::warn!(
target: LOG_TARGET,
?pvf,
?artifact_id,
"handle_execute_pvf: Re-queuing PVF preparation for prepared artifact with missing file."
);
// The artifact has been prepared previously but the file is missing, prepare it again.
*state = ArtifactState::Preparing {
waiting_for_response: Vec::new(),
num_failures: 0,
};
enqueue_prepare_for_execute(
prepare_queue,
awaiting_prepare,
pvf,
priority,
artifact_id,
PendingExecutionRequest {
exec_timeout,
params,
executor_params,
result_tx,
},
)
.await?;
}
},
ArtifactState::Preparing { .. } => {
awaiting_prepare.add(
artifact_id,
exec_timeout,
params,
(*pvf.executor_params()).clone(),
result_tx,
PendingExecutionRequest { exec_timeout, params, executor_params, result_tx },
);
},
ArtifactState::FailedToProcess { last_time_failed, num_failures, error } => {
@@ -535,19 +545,20 @@ async fn handle_execute_pvf(
waiting_for_response: Vec::new(),
num_failures: *num_failures,
};
let executor_params = (*pvf.executor_params()).clone();
send_prepare(prepare_queue, prepare::ToQueue::Enqueue { priority, pvf })
.await?;
// Add an execution request that will wait to run after this prepare job has
// finished.
awaiting_prepare.add(
enqueue_prepare_for_execute(
prepare_queue,
awaiting_prepare,
pvf,
priority,
artifact_id,
exec_timeout,
params,
executor_params,
result_tx,
);
PendingExecutionRequest {
exec_timeout,
params,
executor_params,
result_tx,
},
)
.await?;
} else {
let _ = result_tx.send(Err(ValidationError::from(error.clone())));
}
@@ -556,12 +567,16 @@ async fn handle_execute_pvf(
} else {
// Artifact is unknown: register it and enqueue a job with the corresponding priority and
// PVF.
let executor_params = (*pvf.executor_params()).clone();
artifacts.insert_preparing(artifact_id.clone(), Vec::new());
send_prepare(prepare_queue, prepare::ToQueue::Enqueue { priority, pvf }).await?;
// Add an execution request that will wait to run after this prepare job has finished.
awaiting_prepare.add(artifact_id, exec_timeout, params, executor_params, result_tx);
enqueue_prepare_for_execute(
prepare_queue,
awaiting_prepare,
pvf,
priority,
artifact_id,
PendingExecutionRequest { exec_timeout, params, executor_params, result_tx },
)
.await?;
}
Ok(())
@@ -700,10 +715,12 @@ async fn handle_prepare_done(
execute_queue,
execute::ToQueue::Enqueue {
artifact: ArtifactPathId::new(artifact_id.clone(), cache_path),
exec_timeout,
params,
executor_params,
result_tx,
pending_execution_request: PendingExecutionRequest {
exec_timeout,
params,
executor_params,
result_tx,
},
},
)
.await?;
@@ -745,6 +762,24 @@ async fn send_execute(
execute_queue.send(to_queue).await.map_err(|_| Fatal)
}
/// Sends a job to the preparation queue, and adds an execution request that will wait to run after
/// this prepare job has finished.
async fn enqueue_prepare_for_execute(
prepare_queue: &mut mpsc::Sender<prepare::ToQueue>,
awaiting_prepare: &mut AwaitingPrepare,
pvf: PvfPrepData,
priority: Priority,
artifact_id: ArtifactId,
pending_execution_request: PendingExecutionRequest,
) -> Result<(), Fatal> {
send_prepare(prepare_queue, prepare::ToQueue::Enqueue { priority, pvf }).await?;
// Add an execution request that will wait to run after this prepare job has finished.
awaiting_prepare.add(artifact_id, pending_execution_request);
Ok(())
}
async fn handle_cleanup_pulse(
cache_path: &Path,
sweeper_tx: &mut mpsc::Sender<PathBuf>,
@@ -1125,11 +1160,11 @@ pub(crate) mod tests {
.unwrap();
let result_tx_pvf_1_1 = assert_matches!(
test.poll_and_recv_to_execute_queue().await,
execute::ToQueue::Enqueue { result_tx, .. } => result_tx
execute::ToQueue::Enqueue { pending_execution_request: PendingExecutionRequest { result_tx, .. }, .. } => result_tx
);
let result_tx_pvf_1_2 = assert_matches!(
test.poll_and_recv_to_execute_queue().await,
execute::ToQueue::Enqueue { result_tx, .. } => result_tx
execute::ToQueue::Enqueue { pending_execution_request: PendingExecutionRequest { result_tx, .. }, .. } => result_tx
);
test.from_prepare_queue_tx
@@ -1141,7 +1176,7 @@ pub(crate) mod tests {
.unwrap();
let result_tx_pvf_2 = assert_matches!(
test.poll_and_recv_to_execute_queue().await,
execute::ToQueue::Enqueue { result_tx, .. } => result_tx
execute::ToQueue::Enqueue { pending_execution_request: PendingExecutionRequest { result_tx, .. }, .. } => result_tx
);
result_tx_pvf_1_1
@@ -1456,7 +1491,7 @@ pub(crate) mod tests {
// Preparation should have been retried and succeeded this time.
let result_tx_3 = assert_matches!(
test.poll_and_recv_to_execute_queue().await,
execute::ToQueue::Enqueue { result_tx, .. } => result_tx
execute::ToQueue::Enqueue { pending_execution_request: PendingExecutionRequest { result_tx, .. }, .. } => result_tx
);
// Send an error for the execution here, just so we can check the result receiver is still