Pvf refactor execute worker errors follow up (#4071)

follow up of https://github.com/paritytech/polkadot-sdk/pull/2604
closes https://github.com/paritytech/polkadot-sdk/pull/2604

- [x] take relevant changes from Marcin's PR 
- [x] extract common duplicate code for workers (low-hanging fruits)

~Some unpassed ci problems are more general and should be fixed in
master (see https://github.com/paritytech/polkadot-sdk/pull/4074)~

Proposed labels: **T0-node**, **R0-silent**, **I4-refactor**

-----

kusama address: FZXVQLqLbFV2otNXs6BMnNch54CFJ1idpWwjMb3Z8fTLQC6

---------

Co-authored-by: s0me0ne-unkn0wn <48632512+s0me0ne-unkn0wn@users.noreply.github.com>
This commit is contained in:
maksimryndin
2024-04-19 15:36:36 +02:00
committed by GitHub
parent eba3deca3e
commit 4eabe5e0dd
13 changed files with 331 additions and 285 deletions
+39 -16
View File
@@ -16,7 +16,7 @@
//! A queue that handles requests for PVF execution.
use super::worker_interface::Outcome;
use super::worker_interface::{Error as WorkerInterfaceError, Response as WorkerInterfaceResponse};
use crate::{
artifacts::{ArtifactId, ArtifactPathId},
host::ResultSender,
@@ -30,7 +30,10 @@ use futures::{
stream::{FuturesUnordered, StreamExt as _},
Future, FutureExt,
};
use polkadot_node_core_pvf_common::SecurityStatus;
use polkadot_node_core_pvf_common::{
execute::{JobResponse, WorkerError, WorkerResponse},
SecurityStatus,
};
use polkadot_primitives::{ExecutorParams, ExecutorParamsHash};
use slotmap::HopSlotMap;
use std::{
@@ -133,7 +136,12 @@ impl Workers {
enum QueueEvent {
Spawn(IdleWorker, WorkerHandle, ExecuteJob),
StartWork(Worker, Outcome, ArtifactId, ResultSender),
StartWork(
Worker,
Result<WorkerInterfaceResponse, WorkerInterfaceError>,
ArtifactId,
ResultSender,
),
}
type Mux = FuturesUnordered<BoxFuture<'static, QueueEvent>>;
@@ -340,23 +348,34 @@ fn handle_worker_spawned(
async fn handle_job_finish(
queue: &mut Queue,
worker: Worker,
outcome: Outcome,
worker_result: Result<WorkerInterfaceResponse, WorkerInterfaceError>,
artifact_id: ArtifactId,
result_tx: ResultSender,
) {
let (idle_worker, result, duration, sync_channel) = match outcome {
Outcome::Ok { result_descriptor, duration, idle_worker } => {
let (idle_worker, result, duration, sync_channel) = match worker_result {
Ok(WorkerInterfaceResponse {
worker_response:
WorkerResponse { job_response: JobResponse::Ok { result_descriptor }, duration },
idle_worker,
}) => {
// TODO: propagate the soft timeout
(Some(idle_worker), Ok(result_descriptor), Some(duration), None)
},
Outcome::InvalidCandidate { err, idle_worker } => (
Ok(WorkerInterfaceResponse {
worker_response: WorkerResponse { job_response: JobResponse::InvalidCandidate(err), .. },
idle_worker,
}) => (
Some(idle_worker),
Err(ValidationError::Invalid(InvalidCandidate::WorkerReportedInvalid(err))),
None,
None,
),
Outcome::RuntimeConstruction { err, idle_worker } => {
Ok(WorkerInterfaceResponse {
worker_response:
WorkerResponse { job_response: JobResponse::RuntimeConstruction(err), .. },
idle_worker,
}) => {
// The task for artifact removal is executed concurrently with
// the message to the host on the execution result.
let (result_tx, result_rx) = oneshot::channel();
@@ -376,27 +395,31 @@ async fn handle_job_finish(
Some(result_rx),
)
},
Outcome::InternalError { err } => (None, Err(ValidationError::Internal(err)), None, None),
Err(WorkerInterfaceError::InternalError(err)) |
Err(WorkerInterfaceError::WorkerError(WorkerError::InternalError(err))) =>
(None, Err(ValidationError::Internal(err)), None, None),
// Either the worker or the job timed out. Kill the worker in either case. Treated as
// definitely-invalid, because if we timed out, there's no time left for a retry.
Outcome::HardTimeout =>
Err(WorkerInterfaceError::HardTimeout) |
Err(WorkerInterfaceError::WorkerError(WorkerError::JobTimedOut)) =>
(None, Err(ValidationError::Invalid(InvalidCandidate::HardTimeout)), None, None),
// "Maybe invalid" errors (will retry).
Outcome::WorkerIntfErr => (
Err(WorkerInterfaceError::CommunicationErr(_err)) => (
None,
Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::AmbiguousWorkerDeath)),
None,
None,
),
Outcome::JobDied { err } => (
Err(WorkerInterfaceError::WorkerError(WorkerError::JobDied { err, .. })) => (
None,
Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::AmbiguousJobDeath(err))),
None,
None,
),
Outcome::JobError { err } => (
Err(WorkerInterfaceError::WorkerError(WorkerError::JobError(err))) => (
None,
Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::JobError(err))),
Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::JobError(err.to_string()))),
None,
None,
),
@@ -543,14 +566,14 @@ fn assign(queue: &mut Queue, worker: Worker, job: ExecuteJob) {
queue.mux.push(
async move {
let _timer = execution_timer;
let outcome = super::worker_interface::start_work(
let result = super::worker_interface::start_work(
idle,
job.artifact.clone(),
job.exec_timeout,
job.params,
)
.await;
QueueEvent::StartWork(worker, outcome, job.artifact.id, job.result_tx)
QueueEvent::StartWork(worker, result, job.artifact.id, job.result_tx)
}
.boxed(),
);
@@ -29,10 +29,9 @@ use futures_timer::Delay;
use parity_scale_codec::{Decode, Encode};
use polkadot_node_core_pvf_common::{
error::InternalValidationError,
execute::{Handshake, WorkerResponse},
execute::{Handshake, WorkerError, WorkerResponse},
worker_dir, SecurityStatus,
};
use polkadot_parachain_primitives::primitives::ValidationResult;
use polkadot_primitives::ExecutorParams;
use std::{path::Path, time::Duration};
use tokio::{io, net::UnixStream};
@@ -69,7 +68,8 @@ pub async fn spawn(
gum::warn!(
target: LOG_TARGET,
worker_pid = %idle_worker.pid,
%err
"failed to send a handshake to the spawned worker: {}",
error
);
err
})?;
@@ -78,39 +78,40 @@ pub async fn spawn(
/// Outcome of PVF execution.
///
/// If the idle worker token is not returned, it means the worker must be terminated.
pub enum Outcome {
/// PVF execution completed successfully and the result is returned. The worker is ready for
/// another job.
Ok { result_descriptor: ValidationResult, duration: Duration, idle_worker: IdleWorker },
/// The candidate validation failed. It may be for example because the wasm execution triggered
/// a trap. Errors related to the preparation process are not expected to be encountered by the
/// execution workers.
InvalidCandidate { err: String, idle_worker: IdleWorker },
/// The error is probably transient. It may be for example
/// because the artifact was prepared with a Wasmtime version different from the version
/// in the current execution environment.
RuntimeConstruction { err: String, idle_worker: IdleWorker },
/// PVF execution completed and the result is returned. The worker is ready for
/// another job.
pub struct Response {
/// The response (valid/invalid) from the worker.
pub worker_response: WorkerResponse,
/// Returning the idle worker token means the worker can be reused.
pub idle_worker: IdleWorker,
}
/// The idle worker token is not returned for any of these cases, meaning the worker must be
/// terminated.
///
/// NOTE: Errors related to the preparation process are not expected to be encountered by the
/// execution workers.
#[derive(thiserror::Error, Debug)]
pub enum Error {
/// The execution time exceeded the hard limit. The worker is terminated.
#[error("The communication with the worker exceeded the hard limit")]
HardTimeout,
/// An I/O error happened during communication with the worker. This may mean that the worker
/// process already died. The token is not returned in any case.
WorkerIntfErr,
/// The job process has died. We must kill the worker just in case.
///
/// We cannot treat this as an internal error because malicious code may have caused this.
JobDied { err: String },
/// An unexpected error occurred in the job process.
///
/// Because malicious code can cause a job error, we must not treat it as an internal error.
JobError { err: String },
#[error("An I/O error happened during communication with the worker: {0}")]
CommunicationErr(#[from] io::Error),
/// The worker reported an error (can be from itself or from the job). The worker should not be
/// reused.
#[error("The worker reported an error: {0}")]
WorkerError(#[from] WorkerError),
/// An internal error happened during the validation. Such an error is most likely related to
/// some transient glitch.
///
/// Should only ever be used for errors independent of the candidate and PVF. Therefore it may
/// be a problem with the worker, so we terminate it.
InternalError { err: InternalValidationError },
#[error("An internal error occurred: {0}")]
InternalError(#[from] InternalValidationError),
}
/// Given the idle token of a worker and parameters of work, communicates with the worker and
@@ -123,7 +124,7 @@ pub async fn start_work(
artifact: ArtifactPathId,
execution_timeout: Duration,
validation_params: Vec<u8>,
) -> Outcome {
) -> Result<Response, Error> {
let IdleWorker { mut stream, pid, worker_dir } = worker;
gum::debug!(
@@ -136,16 +137,18 @@ pub async fn start_work(
);
with_worker_dir_setup(worker_dir, pid, &artifact.path, |worker_dir| async move {
if let Err(error) = send_request(&mut stream, &validation_params, execution_timeout).await {
gum::warn!(
target: LOG_TARGET,
worker_pid = %pid,
validation_code_hash = ?artifact.id.code_hash,
?error,
"failed to send an execute request",
);
return Outcome::WorkerIntfErr
}
send_request(&mut stream, &validation_params, execution_timeout).await.map_err(
|error| {
gum::warn!(
target: LOG_TARGET,
worker_pid = %pid,
validation_code_hash = ?artifact.id.code_hash,
"failed to send an execute request: {}",
error,
);
Error::InternalError(InternalValidationError::HostCommunication(error.to_string()))
},
)?;
// We use a generous timeout here. This is in addition to the one in the child process, in
// case the child stalls. We have a wall clock timeout here in the host, but a CPU timeout
@@ -153,12 +156,12 @@ pub async fn start_work(
// load, but the CPU resources of the child can only be measured from the parent after the
// child process terminates.
let timeout = execution_timeout * JOB_TIMEOUT_WALL_CLOCK_FACTOR;
let response = futures::select! {
response = recv_response(&mut stream).fuse() => {
match response {
Ok(response) =>
handle_response(
response,
let worker_result = futures::select! {
worker_result = recv_result(&mut stream).fuse() => {
match worker_result {
Ok(result) =>
handle_result(
result,
pid,
execution_timeout,
)
@@ -168,11 +171,11 @@ pub async fn start_work(
target: LOG_TARGET,
worker_pid = %pid,
validation_code_hash = ?artifact.id.code_hash,
?error,
"failed to recv an execute response",
"failed to recv an execute result: {}",
error,
);
return Outcome::WorkerIntfErr
return Err(Error::CommunicationErr(error))
},
}
},
@@ -183,29 +186,16 @@ pub async fn start_work(
validation_code_hash = ?artifact.id.code_hash,
"execution worker exceeded lenient timeout for execution, child worker likely stalled",
);
WorkerResponse::JobTimedOut
return Err(Error::HardTimeout)
},
};
match response {
WorkerResponse::Ok { result_descriptor, duration } => Outcome::Ok {
result_descriptor,
duration,
match worker_result {
Ok(worker_response) => Ok(Response {
worker_response,
idle_worker: IdleWorker { stream, pid, worker_dir },
},
WorkerResponse::InvalidCandidate(err) => Outcome::InvalidCandidate {
err,
idle_worker: IdleWorker { stream, pid, worker_dir },
},
WorkerResponse::RuntimeConstruction(err) => Outcome::RuntimeConstruction {
err,
idle_worker: IdleWorker { stream, pid, worker_dir },
},
WorkerResponse::JobTimedOut => Outcome::HardTimeout,
WorkerResponse::JobDied { err, job_pid: _ } => Outcome::JobDied { err },
WorkerResponse::JobError(err) => Outcome::JobError { err },
WorkerResponse::InternalError(err) => Outcome::InternalError { err },
}),
Err(worker_error) => Err(worker_error.into()),
}
})
.await
@@ -215,12 +205,12 @@ pub async fn start_work(
///
/// Here we know the artifact exists, but is still located in a temporary file which will be cleared
/// by [`with_worker_dir_setup`].
async fn handle_response(
response: WorkerResponse,
async fn handle_result(
worker_result: Result<WorkerResponse, WorkerError>,
worker_pid: u32,
execution_timeout: Duration,
) -> WorkerResponse {
if let WorkerResponse::Ok { duration, .. } = response {
) -> Result<WorkerResponse, WorkerError> {
if let Ok(WorkerResponse { duration, .. }) = worker_result {
if duration > execution_timeout {
// The job didn't complete within the timeout.
gum::warn!(
@@ -232,11 +222,11 @@ async fn handle_response(
);
// Return a timeout error.
return WorkerResponse::JobTimedOut
return Err(WorkerError::JobTimedOut)
}
}
response
worker_result
}
/// Create a temporary file for an artifact in the worker cache, execute the given future/closure
@@ -249,9 +239,9 @@ async fn with_worker_dir_setup<F, Fut>(
pid: u32,
artifact_path: &Path,
f: F,
) -> Outcome
) -> Result<Response, Error>
where
Fut: futures::Future<Output = Outcome>,
Fut: futures::Future<Output = Result<Response, Error>>,
F: FnOnce(WorkerDir) -> Fut,
{
// Cheaply create a hard link to the artifact. The artifact is always at a known location in the
@@ -263,16 +253,14 @@ where
target: LOG_TARGET,
worker_pid = %pid,
?worker_dir,
"failed to clear worker cache after the job: {:?}",
"failed to clear worker cache after the job: {}",
err,
);
return Outcome::InternalError {
err: InternalValidationError::CouldNotCreateLink(format!("{:?}", err)),
}
return Err(InternalValidationError::CouldNotCreateLink(format!("{:?}", err)).into());
}
let worker_dir_path = worker_dir.path().to_owned();
let outcome = f(worker_dir).await;
let result = f(worker_dir).await;
// Try to clear the worker dir.
if let Err(err) = clear_worker_dir_path(&worker_dir_path) {
@@ -283,15 +271,14 @@ where
"failed to clear worker cache after the job: {:?}",
err,
);
return Outcome::InternalError {
err: InternalValidationError::CouldNotClearWorkerDir {
err: format!("{:?}", err),
path: worker_dir_path.to_str().map(String::from),
},
return Err(InternalValidationError::CouldNotClearWorkerDir {
err: format!("{:?}", err),
path: worker_dir_path.to_str().map(String::from),
}
.into())
}
outcome
result
}
/// Sends a handshake with information specific to the execute worker.
@@ -308,12 +295,12 @@ async fn send_request(
framed_send(stream, &execution_timeout.encode()).await
}
async fn recv_response(stream: &mut UnixStream) -> io::Result<WorkerResponse> {
let response_bytes = framed_recv(stream).await?;
WorkerResponse::decode(&mut response_bytes.as_slice()).map_err(|e| {
async fn recv_result(stream: &mut UnixStream) -> io::Result<Result<WorkerResponse, WorkerError>> {
let result_bytes = framed_recv(stream).await?;
Result::<WorkerResponse, WorkerError>::decode(&mut result_bytes.as_slice()).map_err(|e| {
io::Error::new(
io::ErrorKind::Other,
format!("execute pvf recv_response: decode error: {:?}", e),
format!("execute pvf recv_result: decode error: {:?}", e),
)
})
}
+1 -4
View File
@@ -959,10 +959,7 @@ pub(crate) mod tests {
use crate::{artifacts::generate_artifact_path, PossiblyInvalidError};
use assert_matches::assert_matches;
use futures::future::BoxFuture;
use polkadot_node_core_pvf_common::{
error::PrepareError,
prepare::{PrepareStats, PrepareSuccess},
};
use polkadot_node_core_pvf_common::prepare::PrepareStats;
const TEST_EXECUTION_TIMEOUT: Duration = Duration::from_secs(3);
pub(crate) const TEST_PREPARATION_TIMEOUT: Duration = Duration::from_secs(30);