change prepare worker to use fork instead of threads (#1685)

Co-authored-by: Marcin S <marcin@realemail.net>
This commit is contained in:
jserrat
2023-11-14 14:50:18 -03:00
committed by GitHub
parent 3a87390b30
commit 54f84285bf
24 changed files with 1468 additions and 534 deletions
@@ -642,14 +642,19 @@ async fn validate_candidate_exhaustive(
},
Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::HardTimeout)) =>
Ok(ValidationResult::Invalid(InvalidCandidate::Timeout)),
Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::WorkerReportedError(e))) =>
Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::WorkerReportedInvalid(e))) =>
Ok(ValidationResult::Invalid(InvalidCandidate::ExecutionError(e))),
Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::AmbiguousWorkerDeath)) =>
Ok(ValidationResult::Invalid(InvalidCandidate::ExecutionError(
"ambiguous worker death".to_string(),
))),
Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::Panic(err))) =>
Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::JobError(err))) =>
Ok(ValidationResult::Invalid(InvalidCandidate::ExecutionError(err))),
Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::AmbiguousJobDeath(err))) =>
Ok(ValidationResult::Invalid(InvalidCandidate::ExecutionError(format!(
"ambiguous job death: {err}"
)))),
Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::PrepareError(e))) => {
// In principle if preparation of the `WASM` fails, the current candidate can not be the
// reason for that. So we can't say whether it is invalid or not. In addition, with
@@ -741,9 +746,9 @@ trait ValidationBackend {
};
// Allow limited retries for each kind of error.
let mut num_death_retries_left = 1;
let mut num_job_error_retries_left = 1;
let mut num_internal_retries_left = 1;
let mut num_awd_retries_left = 1;
let mut num_panic_retries_left = 1;
loop {
// Stop retrying if we exceeded the timeout.
if total_time_start.elapsed() + retry_delay > exec_timeout {
@@ -752,11 +757,12 @@ trait ValidationBackend {
match validation_result {
Err(ValidationError::InvalidCandidate(
WasmInvalidCandidate::AmbiguousWorkerDeath,
)) if num_awd_retries_left > 0 => num_awd_retries_left -= 1,
Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::Panic(_)))
if num_panic_retries_left > 0 =>
num_panic_retries_left -= 1,
WasmInvalidCandidate::AmbiguousWorkerDeath |
WasmInvalidCandidate::AmbiguousJobDeath(_),
)) if num_death_retries_left > 0 => num_death_retries_left -= 1,
Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::JobError(_)))
if num_job_error_retries_left > 0 =>
num_job_error_retries_left -= 1,
Err(ValidationError::InternalError(_)) if num_internal_retries_left > 0 =>
num_internal_retries_left -= 1,
_ => break,
@@ -695,11 +695,13 @@ fn candidate_validation_retry_panic_errors() {
let v = executor::block_on(validate_candidate_exhaustive(
MockValidateCandidateBackend::with_hardcoded_result_list(vec![
Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::Panic("foo".into()))),
// Throw an AWD error, we should still retry again.
Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::AmbiguousWorkerDeath)),
Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::JobError("foo".into()))),
// Throw an AJD error, we should still retry again.
Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::AmbiguousJobDeath(
"baz".into(),
))),
// Throw another panic error.
Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::Panic("bar".into()))),
Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::JobError("bar".into()))),
]),
validation_data,
validation_code,
@@ -1216,7 +1218,7 @@ fn precheck_properly_classifies_outcomes() {
inner(Err(PrepareError::Prevalidation("foo".to_owned())), PreCheckOutcome::Invalid);
inner(Err(PrepareError::Preparation("bar".to_owned())), PreCheckOutcome::Invalid);
inner(Err(PrepareError::Panic("baz".to_owned())), PreCheckOutcome::Invalid);
inner(Err(PrepareError::JobError("baz".to_owned())), PreCheckOutcome::Invalid);
inner(Err(PrepareError::TimedOut), PreCheckOutcome::Failed);
inner(Err(PrepareError::IoErr("fizz".to_owned())), PreCheckOutcome::Failed);
@@ -37,7 +37,7 @@ impl TestHost {
where
F: FnOnce(&mut Config),
{
let (prepare_worker_path, execute_worker_path) = testing::get_and_check_worker_paths();
let (prepare_worker_path, execute_worker_path) = testing::build_workers_and_get_paths(true);
let cache_dir = tempfile::tempdir().unwrap();
let mut config = Config::new(
+1 -1
View File
@@ -12,6 +12,7 @@ cpu-time = "1.0.0"
futures = "0.3.21"
gum = { package = "tracing-gum", path = "../../../gum" }
libc = "0.2.139"
thiserror = "1.0.31"
parity-scale-codec = { version = "3.6.1", default-features = false, features = ["derive"] }
@@ -30,7 +31,6 @@ sp-tracing = { path = "../../../../../substrate/primitives/tracing" }
[target.'cfg(target_os = "linux")'.dependencies]
landlock = "0.3.0"
seccompiler = "0.4.0"
thiserror = "1.0.31"
[dev-dependencies]
assert_matches = "1.4.0"
+27 -28
View File
@@ -35,9 +35,9 @@ pub enum PrepareError {
/// Instantiation of the WASM module instance failed.
#[codec(index = 2)]
RuntimeConstruction(String),
/// An unexpected panic has occurred in the preparation worker.
/// An unexpected error has occurred in the preparation job.
#[codec(index = 3)]
Panic(String),
JobError(String),
/// Failed to prepare the PVF due to the time limit.
#[codec(index = 4)]
TimedOut,
@@ -48,12 +48,12 @@ pub enum PrepareError {
/// The temporary file for the artifact could not be created at the given cache path. This
/// state is reported by the validation host (not by the worker).
#[codec(index = 6)]
CreateTmpFileErr(String),
CreateTmpFile(String),
/// The response from the worker is received, but the file cannot be renamed (moved) to the
/// final destination location. This state is reported by the validation host (not by the
/// worker).
#[codec(index = 7)]
RenameTmpFileErr {
RenameTmpFile {
err: String,
// Unfortunately `PathBuf` doesn't implement `Encode`/`Decode`, so we do a fallible
// conversion to `Option<String>`.
@@ -68,11 +68,14 @@ pub enum PrepareError {
/// reported by the validation host (not by the worker).
#[codec(index = 9)]
ClearWorkerDir(String),
/// The preparation job process died, due to OOM, a seccomp violation, or some other factor.
JobDied(String),
#[codec(index = 10)]
/// Some error occurred when interfacing with the kernel.
#[codec(index = 11)]
Kernel(String),
}
/// Pre-encoded length-prefixed `PrepareResult::Err(PrepareError::OutOfMemory)`
pub const OOM_PAYLOAD: &[u8] = b"\x02\x00\x00\x00\x00\x00\x00\x00\x01\x08";
impl PrepareError {
/// Returns whether this is a deterministic error, i.e. one that should trigger reliably. Those
/// errors depend on the PVF itself and the sc-executor/wasmtime logic.
@@ -83,12 +86,15 @@ impl PrepareError {
pub fn is_deterministic(&self) -> bool {
use PrepareError::*;
match self {
Prevalidation(_) | Preparation(_) | Panic(_) | OutOfMemory => true,
TimedOut |
Prevalidation(_) | Preparation(_) | JobError(_) | OutOfMemory => true,
IoErr(_) |
CreateTmpFileErr(_) |
RenameTmpFileErr { .. } |
ClearWorkerDir(_) => false,
JobDied(_) |
CreateTmpFile(_) |
RenameTmpFile { .. } |
ClearWorkerDir(_) |
Kernel(_) => false,
// Can occur due to issues with the PVF, but also due to factors like local load.
TimedOut => false,
// Can occur due to issues with the PVF, but also due to local errors.
RuntimeConstruction(_) => false,
}
@@ -102,14 +108,16 @@ impl fmt::Display for PrepareError {
Prevalidation(err) => write!(f, "prevalidation: {}", err),
Preparation(err) => write!(f, "preparation: {}", err),
RuntimeConstruction(err) => write!(f, "runtime construction: {}", err),
Panic(err) => write!(f, "panic: {}", err),
JobError(err) => write!(f, "panic: {}", err),
TimedOut => write!(f, "prepare: timeout"),
IoErr(err) => write!(f, "prepare: io error while receiving response: {}", err),
CreateTmpFileErr(err) => write!(f, "prepare: error creating tmp file: {}", err),
RenameTmpFileErr { err, src, dest } =>
JobDied(err) => write!(f, "prepare: prepare job died: {}", err),
CreateTmpFile(err) => write!(f, "prepare: error creating tmp file: {}", err),
RenameTmpFile { err, src, dest } =>
write!(f, "prepare: error renaming tmp file ({:?} -> {:?}): {}", src, dest, err),
OutOfMemory => write!(f, "prepare: out of memory"),
ClearWorkerDir(err) => write!(f, "prepare: error clearing worker cache: {}", err),
Kernel(err) => write!(f, "prepare: error interfacing with the kernel: {}", err),
}
}
}
@@ -133,9 +141,9 @@ pub enum InternalValidationError {
// conversion to `Option<String>`.
path: Option<String>,
},
/// An error occurred in the CPU time monitor thread. Should be totally unrelated to
/// validation.
CpuTimeMonitorThread(String),
/// Some error occurred when interfacing with the kernel.
Kernel(String),
/// Some non-deterministic preparation error occurred.
NonDeterministicPrepareError(PrepareError),
}
@@ -158,17 +166,8 @@ impl fmt::Display for InternalValidationError {
"validation: host could not clear the worker cache ({:?}) after a job: {}",
path, err
),
CpuTimeMonitorThread(err) =>
write!(f, "validation: an error occurred in the CPU time monitor thread: {}", err),
Kernel(err) => write!(f, "validation: error interfacing with the kernel: {}", err),
NonDeterministicPrepareError(err) => write!(f, "validation: prepare: {}", err),
}
}
}
#[test]
fn pre_encoded_payloads() {
let oom_enc = PrepareResult::Err(PrepareError::OutOfMemory).encode();
let mut oom_payload = oom_enc.len().to_le_bytes().to_vec();
oom_payload.extend(oom_enc);
assert_eq!(oom_payload, OOM_PAYLOAD);
}
+45 -6
View File
@@ -28,9 +28,9 @@ pub struct Handshake {
pub executor_params: ExecutorParams,
}
/// The response from an execution job on the worker.
/// The response from the execution worker.
#[derive(Debug, Encode, Decode)]
pub enum Response {
pub enum WorkerResponse {
/// The job completed successfully.
Ok {
/// The result of parachain validation.
@@ -41,14 +41,38 @@ pub enum Response {
/// The candidate is invalid.
InvalidCandidate(String),
/// The job timed out.
TimedOut,
/// An unexpected panic has occurred in the execution worker.
Panic(String),
JobTimedOut,
/// The job process has died. We must kill the worker just in case.
///
/// We cannot treat this as an internal error because malicious code may have killed the job.
/// We still retry it, because in the non-malicious case it is likely spurious.
JobDied(String),
/// An unexpected error occurred in the job process, e.g. failing to spawn a thread, panic,
/// etc.
///
/// Because malicious code can cause a job error, we must not treat it as an internal error. We
/// still retry it, because in the non-malicious case it is likely spurious.
JobError(String),
/// Some internal error occurred.
InternalError(InternalValidationError),
}
impl Response {
/// The result of a job on the execution worker.
pub type JobResult = Result<JobResponse, JobError>;
/// The successful response from a job on the execution worker.
#[derive(Debug, Encode, Decode)]
pub enum JobResponse {
Ok {
/// The result of parachain validation.
result_descriptor: ValidationResult,
},
/// The candidate is invalid.
InvalidCandidate(String),
}
impl JobResponse {
/// Creates an invalid response from a context `ctx` and a message `msg` (which can be empty).
pub fn format_invalid(ctx: &'static str, msg: &str) -> Self {
if msg.is_empty() {
@@ -58,3 +82,18 @@ impl Response {
}
}
}
/// An unexpected error occurred in the execution job process. Because this comes from the job,
/// which executes untrusted code, this error must likewise be treated as untrusted. That is, we
/// cannot raise an internal error based on this.
#[derive(thiserror::Error, Debug, Encode, Decode)]
pub enum JobError {
#[error("The job timed out")]
TimedOut,
#[error("An unexpected panic has occurred in the execution job: {0}")]
Panic(String),
#[error("Could not spawn the requested thread: {0}")]
CouldNotSpawnThread(String),
#[error("An error occurred in the CPU time monitor thread: {0}")]
CpuTimeMonitorThread(String),
}
@@ -205,9 +205,15 @@ impl fmt::Display for WorkerKind {
}
}
// The worker version must be passed in so that we accurately get the version of the worker, and not
// the version that this crate was compiled with.
pub fn worker_event_loop<F>(
// NOTE: The worker version must be passed in so that we accurately get the version of the worker,
// and not the version that this crate was compiled with.
//
// NOTE: This must not spawn any threads due to safety requirements in `event_loop` and to avoid
// errors in [`security::unshare_user_namespace_and_change_root`].
//
/// Initializes the worker process, then runs the given event loop, which spawns a new job process
/// to securely handle each incoming request.
pub fn run_worker<F>(
worker_kind: WorkerKind,
socket_path: PathBuf,
#[cfg_attr(not(target_os = "linux"), allow(unused_mut))] mut worker_dir_path: PathBuf,
@@ -9,6 +9,9 @@ license.workspace = true
[dependencies]
cpu-time = "1.0.0"
gum = { package = "tracing-gum", path = "../../../gum" }
os_pipe = "1.1.4"
nix = { version = "0.27.1", features = ["resource", "process"]}
libc = "0.2.139"
parity-scale-codec = { version = "3.6.1", default-features = false, features = ["derive"] }
+313 -86
View File
@@ -25,23 +25,33 @@ pub use polkadot_node_core_pvf_common::{
const LOG_TARGET: &str = "parachain::pvf-execute-worker";
use cpu_time::ProcessTime;
use nix::{
errno::Errno,
sys::{
resource::{Usage, UsageWho},
wait::WaitStatus,
},
unistd::{ForkResult, Pid},
};
use os_pipe::{self, PipeReader, PipeWriter};
use parity_scale_codec::{Decode, Encode};
use polkadot_node_core_pvf_common::{
error::InternalValidationError,
execute::{Handshake, Response},
execute::{Handshake, JobError, JobResponse, JobResult, WorkerResponse},
framed_recv_blocking, framed_send_blocking,
worker::{
cpu_time_monitor_loop, stringify_panic_payload,
cpu_time_monitor_loop, run_worker, stringify_panic_payload,
thread::{self, WaitOutcome},
worker_event_loop, WorkerKind,
WorkerKind,
},
};
use polkadot_parachain_primitives::primitives::ValidationResult;
use polkadot_primitives::{executor_params::DEFAULT_NATIVE_STACK_MAX, ExecutorParams};
use std::{
io,
io::{self, Read},
os::unix::net::UnixStream,
path::PathBuf,
process,
sync::{mpsc::channel, Arc},
time::Duration,
};
@@ -105,7 +115,7 @@ fn recv_request(stream: &mut UnixStream) -> io::Result<(Vec<u8>, Duration)> {
Ok((params, execution_timeout))
}
fn send_response(stream: &mut UnixStream, response: Response) -> io::Result<()> {
fn send_response(stream: &mut UnixStream, response: WorkerResponse) -> io::Result<()> {
framed_send_blocking(stream, &response.encode())
}
@@ -131,7 +141,7 @@ pub fn worker_entrypoint(
worker_version: Option<&str>,
security_status: SecurityStatus,
) {
worker_event_loop(
run_worker(
WorkerKind::Execute,
socket_path,
worker_dir_path,
@@ -139,7 +149,7 @@ pub fn worker_entrypoint(
worker_version,
&security_status,
|mut stream, worker_dir_path| {
let worker_pid = std::process::id();
let worker_pid = process::id();
let artifact_path = worker_dir::execute_artifact(&worker_dir_path);
let Handshake { executor_params } = recv_handshake(&mut stream)?;
@@ -157,7 +167,7 @@ pub fn worker_entrypoint(
let compiled_artifact_blob = match std::fs::read(&artifact_path) {
Ok(bytes) => bytes,
Err(err) => {
let response = Response::InternalError(
let response = WorkerResponse::InternalError(
InternalValidationError::CouldNotOpenFile(err.to_string()),
);
send_response(&mut stream, response)?;
@@ -165,82 +175,51 @@ pub fn worker_entrypoint(
},
};
// Conditional variable to notify us when a thread is done.
let condvar = thread::get_condvar();
let (pipe_reader, pipe_writer) = os_pipe::pipe()?;
let cpu_time_start = ProcessTime::now();
let usage_before = match nix::sys::resource::getrusage(UsageWho::RUSAGE_CHILDREN) {
Ok(usage) => usage,
Err(errno) => {
let response = internal_error_from_errno("getrusage before", errno);
send_response(&mut stream, response)?;
continue
},
};
// Spawn a new thread that runs the CPU time monitor.
let (cpu_time_monitor_tx, cpu_time_monitor_rx) = channel::<()>();
let cpu_time_monitor_thread = thread::spawn_worker_thread(
"cpu time monitor thread",
move || {
cpu_time_monitor_loop(
cpu_time_start,
// SAFETY: new process is spawned within a single threaded process. This invariant
// is enforced by tests.
let response = match unsafe { nix::unistd::fork() } {
Err(errno) => internal_error_from_errno("fork", errno),
Ok(ForkResult::Child) => {
// Dropping the stream closes the underlying socket. We want to make sure
// that the sandboxed child can't get any kind of information from the
// outside world. The only IPC it should be able to do is sending its
// response over the pipe.
drop(stream);
// Drop the read end so we don't have too many FDs open.
drop(pipe_reader);
handle_child_process(
pipe_writer,
compiled_artifact_blob,
executor_params,
params,
execution_timeout,
cpu_time_monitor_rx,
)
},
Arc::clone(&condvar),
WaitOutcome::TimedOut,
)?;
Ok(ForkResult::Parent { child }) => {
// the read end will wait until all write ends have been closed,
// this drop is necessary to avoid deadlock
drop(pipe_writer);
let executor_params_2 = executor_params.clone();
let execute_thread = thread::spawn_worker_thread_with_stack_size(
"execute thread",
move || {
validate_using_artifact(
&compiled_artifact_blob,
&executor_params_2,
&params,
cpu_time_start,
)
handle_parent_process(
pipe_reader,
child,
worker_pid,
usage_before,
execution_timeout,
)?
},
Arc::clone(&condvar),
WaitOutcome::Finished,
EXECUTE_THREAD_STACK_SIZE,
)?;
let outcome = thread::wait_for_threads(condvar);
let response = match outcome {
WaitOutcome::Finished => {
let _ = cpu_time_monitor_tx.send(());
execute_thread
.join()
.unwrap_or_else(|e| Response::Panic(stringify_panic_payload(e)))
},
// If the CPU thread is not selected, we signal it to end, the join handle is
// dropped and the thread will finish in the background.
WaitOutcome::TimedOut => {
match cpu_time_monitor_thread.join() {
Ok(Some(cpu_time_elapsed)) => {
// Log if we exceed the timeout and the other thread hasn't
// finished.
gum::warn!(
target: LOG_TARGET,
%worker_pid,
"execute job took {}ms cpu time, exceeded execute timeout {}ms",
cpu_time_elapsed.as_millis(),
execution_timeout.as_millis(),
);
Response::TimedOut
},
Ok(None) => Response::InternalError(
InternalValidationError::CpuTimeMonitorThread(
"error communicating over finished channel".into(),
),
),
Err(e) => Response::InternalError(
InternalValidationError::CpuTimeMonitorThread(
stringify_panic_payload(e),
),
),
}
},
WaitOutcome::Pending => unreachable!(
"we run wait_while until the outcome is no longer pending; qed"
),
};
gum::trace!(
@@ -259,27 +238,275 @@ fn validate_using_artifact(
compiled_artifact_blob: &[u8],
executor_params: &ExecutorParams,
params: &[u8],
cpu_time_start: ProcessTime,
) -> Response {
) -> JobResponse {
let descriptor_bytes = match unsafe {
// SAFETY: this should be safe since the compiled artifact passed here comes from the
// file created by the prepare workers. These files are obtained by calling
// [`executor_intf::prepare`].
execute_artifact(compiled_artifact_blob, executor_params, params)
} {
Err(err) => return Response::format_invalid("execute", &err),
Err(err) => return JobResponse::format_invalid("execute", &err),
Ok(d) => d,
};
let result_descriptor = match ValidationResult::decode(&mut &descriptor_bytes[..]) {
Err(err) =>
return Response::format_invalid("validation result decoding failed", &err.to_string()),
return JobResponse::format_invalid(
"validation result decoding failed",
&err.to_string(),
),
Ok(r) => r,
};
// Include the decoding in the measured time, to prevent any potential attacks exploiting some
// bug in decoding.
let duration = cpu_time_start.elapsed();
Response::Ok { result_descriptor, duration }
JobResponse::Ok { result_descriptor }
}
/// This is used to handle child process during pvf execute worker.
/// It execute the artifact and pipes back the response to the parent process
///
/// # Arguments
///
/// - `pipe_write`: A `PipeWriter` structure, the writing end of a pipe.
///
/// - `compiled_artifact_blob`: The artifact bytes from compiled by the prepare worker`.
///
/// - `executor_params`: Deterministically serialized execution environment semantics.
///
/// - `params`: Validation parameters.
///
/// - `execution_timeout`: The timeout in `Duration`.
///
/// # Returns
///
/// - pipe back `JobResponse` to the parent process.
fn handle_child_process(
mut pipe_write: PipeWriter,
compiled_artifact_blob: Vec<u8>,
executor_params: ExecutorParams,
params: Vec<u8>,
execution_timeout: Duration,
) -> ! {
gum::debug!(
target: LOG_TARGET,
worker_job_pid = %process::id(),
"worker job: executing artifact",
);
// Conditional variable to notify us when a thread is done.
let condvar = thread::get_condvar();
let cpu_time_start = ProcessTime::now();
// Spawn a new thread that runs the CPU time monitor.
let (cpu_time_monitor_tx, cpu_time_monitor_rx) = channel::<()>();
let cpu_time_monitor_thread = thread::spawn_worker_thread(
"cpu time monitor thread",
move || cpu_time_monitor_loop(cpu_time_start, execution_timeout, cpu_time_monitor_rx),
Arc::clone(&condvar),
WaitOutcome::TimedOut,
)
.unwrap_or_else(|err| {
send_child_response(&mut pipe_write, Err(JobError::CouldNotSpawnThread(err.to_string())))
});
let executor_params_2 = executor_params.clone();
let execute_thread = thread::spawn_worker_thread_with_stack_size(
"execute thread",
move || validate_using_artifact(&compiled_artifact_blob, &executor_params_2, &params),
Arc::clone(&condvar),
WaitOutcome::Finished,
EXECUTE_THREAD_STACK_SIZE,
)
.unwrap_or_else(|err| {
send_child_response(&mut pipe_write, Err(JobError::CouldNotSpawnThread(err.to_string())))
});
let outcome = thread::wait_for_threads(condvar);
let response = match outcome {
WaitOutcome::Finished => {
let _ = cpu_time_monitor_tx.send(());
execute_thread.join().map_err(|e| JobError::Panic(stringify_panic_payload(e)))
},
// If the CPU thread is not selected, we signal it to end, the join handle is
// dropped and the thread will finish in the background.
WaitOutcome::TimedOut => match cpu_time_monitor_thread.join() {
Ok(Some(_cpu_time_elapsed)) => Err(JobError::TimedOut),
Ok(None) => Err(JobError::CpuTimeMonitorThread(
"error communicating over finished channel".into(),
)),
Err(e) => Err(JobError::CpuTimeMonitorThread(stringify_panic_payload(e))),
},
WaitOutcome::Pending =>
unreachable!("we run wait_while until the outcome is no longer pending; qed"),
};
send_child_response(&mut pipe_write, response);
}
/// Waits for child process to finish and handle child response from pipe.
///
/// # Arguments
///
/// - `pipe_read`: A `PipeReader` used to read data from the child process.
///
/// - `child`: The child pid.
///
/// - `usage_before`: Resource usage statistics before executing the child process.
///
/// - `timeout`: The maximum allowed time for the child process to finish, in `Duration`.
///
/// # Returns
///
/// - The response, either `Ok` or some error state.
fn handle_parent_process(
mut pipe_read: PipeReader,
child: Pid,
worker_pid: u32,
usage_before: Usage,
timeout: Duration,
) -> io::Result<WorkerResponse> {
// Read from the child. Don't decode unless the process exited normally, which we check later.
let mut received_data = Vec::new();
pipe_read
.read_to_end(&mut received_data)
// Could not decode job response. There is either a bug or the job was hijacked.
// Should retry at any rate.
.map_err(|err| io::Error::new(io::ErrorKind::Other, err.to_string()))?;
let status = nix::sys::wait::waitpid(child, None);
gum::trace!(
target: LOG_TARGET,
%worker_pid,
"execute worker received wait status from job: {:?}",
status,
);
let usage_after = match nix::sys::resource::getrusage(UsageWho::RUSAGE_CHILDREN) {
Ok(usage) => usage,
Err(errno) => return Ok(internal_error_from_errno("getrusage after", errno)),
};
// Using `getrusage` is needed to check whether child has timedout since we cannot rely on
// child to report its own time.
// As `getrusage` returns resource usage from all terminated child processes,
// it is necessary to subtract the usage before the current child process to isolate its cpu
// time
let cpu_tv = get_total_cpu_usage(usage_after) - get_total_cpu_usage(usage_before);
if cpu_tv >= timeout {
gum::warn!(
target: LOG_TARGET,
%worker_pid,
"execute job took {}ms cpu time, exceeded execute timeout {}ms",
cpu_tv.as_millis(),
timeout.as_millis(),
);
return Ok(WorkerResponse::JobTimedOut)
}
match status {
Ok(WaitStatus::Exited(_, exit_status)) => {
let mut reader = io::BufReader::new(received_data.as_slice());
let result = match recv_child_response(&mut reader) {
Ok(result) => result,
Err(err) => return Ok(WorkerResponse::JobError(err.to_string())),
};
match result {
Ok(JobResponse::Ok { result_descriptor }) => {
// The exit status should have been zero if no error occurred.
if exit_status != 0 {
return Ok(WorkerResponse::JobError(format!(
"unexpected exit status: {}",
exit_status
)))
}
Ok(WorkerResponse::Ok { result_descriptor, duration: cpu_tv })
},
Ok(JobResponse::InvalidCandidate(err)) => Ok(WorkerResponse::InvalidCandidate(err)),
Err(job_error) => {
gum::warn!(
target: LOG_TARGET,
%worker_pid,
"execute job error: {}",
job_error,
);
if matches!(job_error, JobError::TimedOut) {
Ok(WorkerResponse::JobTimedOut)
} else {
Ok(WorkerResponse::JobError(job_error.to_string()))
}
},
}
},
// The job was killed by the given signal.
//
// The job gets SIGSYS on seccomp violations, but this signal may have been sent for some
// other reason, so we still need to check for seccomp violations elsewhere.
Ok(WaitStatus::Signaled(_pid, signal, _core_dump)) =>
Ok(WorkerResponse::JobDied(format!("received signal: {signal:?}"))),
Err(errno) => Ok(internal_error_from_errno("waitpid", errno)),
// It is within an attacker's power to send an unexpected exit status. So we cannot treat
// this as an internal error (which would make us abstain), but must vote against.
Ok(unexpected_wait_status) => Ok(WorkerResponse::JobDied(format!(
"unexpected status from wait: {unexpected_wait_status:?}"
))),
}
}
/// Calculate the total CPU time from the given `usage` structure, returned from
/// [`nix::sys::resource::getrusage`], and calculates the total CPU time spent, including both user
/// and system time.
///
/// # Arguments
///
/// - `rusage`: Contains resource usage information.
///
/// # Returns
///
/// Returns a `Duration` representing the total CPU time.
fn get_total_cpu_usage(rusage: Usage) -> Duration {
let micros = (((rusage.user_time().tv_sec() + rusage.system_time().tv_sec()) * 1_000_000) +
(rusage.system_time().tv_usec() + rusage.user_time().tv_usec()) as i64) as u64;
return Duration::from_micros(micros)
}
/// Get a job response.
fn recv_child_response(received_data: &mut io::BufReader<&[u8]>) -> io::Result<JobResult> {
let response_bytes = framed_recv_blocking(received_data)?;
JobResult::decode(&mut response_bytes.as_slice()).map_err(|e| {
io::Error::new(
io::ErrorKind::Other,
format!("execute pvf recv_child_response: decode error: {:?}", e),
)
})
}
/// Write response to the pipe and exit process after.
///
/// # Arguments
///
/// - `pipe_write`: A `PipeWriter` structure, the writing end of a pipe.
///
/// - `response`: Child process response, or error.
fn send_child_response(pipe_write: &mut PipeWriter, response: JobResult) -> ! {
framed_send_blocking(pipe_write, response.encode().as_slice())
.unwrap_or_else(|_| process::exit(libc::EXIT_FAILURE));
if response.is_ok() {
process::exit(libc::EXIT_SUCCESS)
} else {
process::exit(libc::EXIT_FAILURE)
}
}
fn internal_error_from_errno(context: &'static str, errno: Errno) -> WorkerResponse {
WorkerResponse::InternalError(InternalValidationError::Kernel(format!(
"{}: {}: {}",
context,
errno,
io::Error::last_os_error()
)))
}
@@ -14,6 +14,8 @@ rayon = "1.5.1"
tracking-allocator = { package = "staging-tracking-allocator", path = "../../../tracking-allocator" }
tikv-jemalloc-ctl = { version = "0.5.0", optional = true }
tikv-jemallocator = { version = "0.5.0", optional = true }
os_pipe = "1.1.4"
nix = { version = "0.27.1", features = ["resource", "process"]}
parity-scale-codec = { version = "3.6.1", default-features = false, features = ["derive"] }
+445 -187
View File
@@ -28,28 +28,40 @@ const LOG_TARGET: &str = "parachain::pvf-prepare-worker";
use crate::memory_stats::max_rss_stat::{extract_max_rss_stat, get_max_rss_thread};
#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
use crate::memory_stats::memory_tracker::{get_memory_tracker_loop_stats, memory_tracker_loop};
use libc;
use nix::{
errno::Errno,
sys::{
resource::{Usage, UsageWho},
wait::WaitStatus,
},
unistd::{ForkResult, Pid},
};
use os_pipe::{self, PipeReader, PipeWriter};
use parity_scale_codec::{Decode, Encode};
use polkadot_node_core_pvf_common::{
error::{PrepareError, PrepareResult, OOM_PAYLOAD},
error::{PrepareError, PrepareResult},
executor_intf::create_runtime_from_artifact_bytes,
framed_recv_blocking, framed_send_blocking,
prepare::{MemoryStats, PrepareJobKind, PrepareStats},
pvf::PvfPrepData,
worker::{
cpu_time_monitor_loop, stringify_panic_payload,
thread::{self, WaitOutcome},
worker_event_loop, WorkerKind,
cpu_time_monitor_loop, run_worker, stringify_panic_payload,
thread::{self, spawn_worker_thread, WaitOutcome},
WorkerKind,
},
worker_dir, ProcessTime, SecurityStatus,
};
use polkadot_primitives::ExecutorParams;
use std::{
fs, io,
fs,
io::{self, Read},
os::{
fd::{AsRawFd, RawFd},
unix::net::UnixStream,
},
path::PathBuf,
process,
sync::{mpsc::channel, Arc},
time::Duration,
};
@@ -65,6 +77,7 @@ static ALLOC: TrackingAllocator<tikv_jemallocator::Jemalloc> =
static ALLOC: TrackingAllocator<std::alloc::System> = TrackingAllocator(std::alloc::System);
/// Contains the bytes for a successfully compiled artifact.
#[derive(Encode, Decode)]
pub struct CompiledArtifact(Vec<u8>);
impl CompiledArtifact {
@@ -80,6 +93,7 @@ impl AsRef<[u8]> for CompiledArtifact {
}
}
/// Get a worker request.
fn recv_request(stream: &mut UnixStream) -> io::Result<PvfPrepData> {
let pvf = framed_recv_blocking(stream)?;
let pvf = PvfPrepData::decode(&mut &pvf[..]).map_err(|e| {
@@ -91,6 +105,7 @@ fn recv_request(stream: &mut UnixStream) -> io::Result<PvfPrepData> {
Ok(pvf)
}
/// Send a worker response.
fn send_response(stream: &mut UnixStream, result: PrepareResult) -> io::Result<()> {
framed_send_blocking(stream, &result.encode())
}
@@ -111,18 +126,22 @@ fn start_memory_tracking(fd: RawFd, limit: Option<isize>) {
// Syscalls never allocate or deallocate, so this is safe.
libc::syscall(libc::SYS_write, fd, OOM_PAYLOAD.as_ptr(), OOM_PAYLOAD.len());
libc::syscall(libc::SYS_close, fd);
libc::syscall(libc::SYS_exit, 1);
// Make sure we exit from all threads. Copied from glibc.
libc::syscall(libc::SYS_exit_group, 1);
loop {
libc::syscall(libc::SYS_exit, 1);
}
}
#[cfg(not(target_os = "linux"))]
{
// Syscalls are not available on MacOS, so we have to use `libc` wrappers.
// Technicaly, there may be allocations inside, although they shouldn't be
// Technically, there may be allocations inside, although they shouldn't be
// there. In that case, we'll see deadlocks on MacOS after the OOM condition
// triggered. As we consider running a validator on MacOS unsafe, and this
// code is only run by a validator, it's a lesser evil.
libc::write(fd, OOM_PAYLOAD.as_ptr().cast(), OOM_PAYLOAD.len());
libc::close(fd);
std::process::exit(1);
libc::_exit(1);
}
})),
);
@@ -155,17 +174,19 @@ fn end_memory_tracking() -> isize {
///
/// 1. Get the code and parameters for preparation from the host.
///
/// 2. Start a memory tracker in a separate thread.
/// 2. Start a new child process
///
/// 3. Start the CPU time monitor loop and the actual preparation in two separate threads.
/// 3. Start the memory tracker and the actual preparation in two separate threads.
///
/// 4. Wait on the two threads created in step 3.
///
/// 5. Stop the memory tracker and get the stats.
///
/// 6. If compilation succeeded, write the compiled artifact into a temporary file.
/// 6. Pipe the result back to the parent process and exit from child process.
///
/// 7. Send the result of preparation back to the host. If any error occurred in the above steps, we
/// 7. If compilation succeeded, write the compiled artifact into a temporary file.
///
/// 8. Send the result of preparation back to the host. If any error occurred in the above steps, we
/// send that in the `PrepareResult`.
pub fn worker_entrypoint(
socket_path: PathBuf,
@@ -174,7 +195,7 @@ pub fn worker_entrypoint(
worker_version: Option<&str>,
security_status: SecurityStatus,
) {
worker_event_loop(
run_worker(
WorkerKind::Prepare,
socket_path,
worker_dir_path,
@@ -182,7 +203,7 @@ pub fn worker_entrypoint(
worker_version,
&security_status,
|mut stream, worker_dir_path| {
let worker_pid = std::process::id();
let worker_pid = process::id();
let temp_artifact_dest = worker_dir::prepare_tmp_artifact(&worker_dir_path);
loop {
@@ -197,186 +218,58 @@ pub fn worker_entrypoint(
let prepare_job_kind = pvf.prep_kind();
let executor_params = pvf.executor_params();
// Conditional variable to notify us when a thread is done.
let condvar = thread::get_condvar();
let (pipe_reader, pipe_writer) = os_pipe::pipe()?;
// Run the memory tracker in a regular, non-worker thread.
#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
let condvar_memory = Arc::clone(&condvar);
#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
let memory_tracker_thread = std::thread::spawn(|| memory_tracker_loop(condvar_memory));
let cpu_time_start = ProcessTime::now();
// Spawn a new thread that runs the CPU time monitor.
let (cpu_time_monitor_tx, cpu_time_monitor_rx) = channel::<()>();
let cpu_time_monitor_thread = thread::spawn_worker_thread(
"cpu time monitor thread",
move || {
cpu_time_monitor_loop(
cpu_time_start,
preparation_timeout,
cpu_time_monitor_rx,
)
let usage_before = match nix::sys::resource::getrusage(UsageWho::RUSAGE_CHILDREN) {
Ok(usage) => usage,
Err(errno) => {
let result = Err(error_from_errno("getrusage before", errno));
send_response(&mut stream, result)?;
continue
},
Arc::clone(&condvar),
WaitOutcome::TimedOut,
)?;
start_memory_tracking(
stream.as_raw_fd(),
executor_params.prechecking_max_memory().map(|v| {
v.try_into().unwrap_or_else(|_| {
gum::warn!(
LOG_TARGET,
%worker_pid,
"Illegal pre-checking max memory value {} discarded",
v,
);
0
})
}),
);
// Spawn another thread for preparation.
let prepare_thread = thread::spawn_worker_thread(
"prepare thread",
move || {
#[allow(unused_mut)]
let mut result = prepare_artifact(pvf, cpu_time_start);
// Get the `ru_maxrss` stat. If supported, call getrusage for the thread.
#[cfg(target_os = "linux")]
let mut result = result
.map(|(artifact, elapsed)| (artifact, elapsed, get_max_rss_thread()));
// If we are pre-checking, check for runtime construction errors.
//
// As pre-checking is more strict than just preparation in terms of memory
// and time, it is okay to do extra checks here. This takes negligible time
// anyway.
if let PrepareJobKind::Prechecking = prepare_job_kind {
result = result.and_then(|output| {
runtime_construction_check(
output.0.as_ref(),
executor_params.as_ref(),
)?;
Ok(output)
});
}
result
},
Arc::clone(&condvar),
WaitOutcome::Finished,
)?;
let outcome = thread::wait_for_threads(condvar);
let peak_alloc = {
let peak = end_memory_tracking();
gum::debug!(
target: LOG_TARGET,
%worker_pid,
"prepare job peak allocation is {} bytes",
peak,
);
peak
};
let result = match outcome {
WaitOutcome::Finished => {
let _ = cpu_time_monitor_tx.send(());
// SAFETY: new process is spawned within a single threaded process. This invariant
// is enforced by tests.
let result = match unsafe { nix::unistd::fork() } {
Err(errno) => Err(error_from_errno("fork", errno)),
Ok(ForkResult::Child) => {
// Dropping the stream closes the underlying socket. We want to make sure
// that the sandboxed child can't get any kind of information from the
// outside world. The only IPC it should be able to do is sending its
// response over the pipe.
drop(stream);
// Drop the read end so we don't have too many FDs open.
drop(pipe_reader);
match prepare_thread.join().unwrap_or_else(|err| {
Err(PrepareError::Panic(stringify_panic_payload(err)))
}) {
Err(err) => {
// Serialized error will be written into the socket.
Err(err)
},
Ok(ok) => {
cfg_if::cfg_if! {
if #[cfg(target_os = "linux")] {
let (artifact, cpu_time_elapsed, max_rss) = ok;
} else {
let (artifact, cpu_time_elapsed) = ok;
}
}
// Stop the memory stats worker and get its observed memory stats.
#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
let memory_tracker_stats = get_memory_tracker_loop_stats(memory_tracker_thread, worker_pid);
let memory_stats = MemoryStats {
#[cfg(any(
target_os = "linux",
feature = "jemalloc-allocator"
))]
memory_tracker_stats,
#[cfg(target_os = "linux")]
max_rss: extract_max_rss_stat(max_rss, worker_pid),
// Negative peak allocation values are legit; they are narrow
// corner cases and shouldn't affect overall statistics
// significantly
peak_tracked_alloc: if peak_alloc > 0 {
peak_alloc as u64
} else {
0u64
},
};
// Write the serialized artifact into a temp file.
//
// PVF host only keeps artifacts statuses in its memory,
// successfully compiled code gets stored on the disk (and
// consequently deserialized by execute-workers). The prepare worker
// is only required to send `Ok` to the pool to indicate the
// success.
gum::debug!(
target: LOG_TARGET,
%worker_pid,
"worker: writing artifact to {}",
temp_artifact_dest.display(),
);
fs::write(&temp_artifact_dest, &artifact)?;
Ok(PrepareStats { cpu_time_elapsed, memory_stats })
},
}
handle_child_process(
pvf,
pipe_writer,
preparation_timeout,
prepare_job_kind,
executor_params,
)
},
// If the CPU thread is not selected, we signal it to end, the join handle is
// dropped and the thread will finish in the background.
WaitOutcome::TimedOut => {
match cpu_time_monitor_thread.join() {
Ok(Some(cpu_time_elapsed)) => {
// Log if we exceed the timeout and the other thread hasn't
// finished.
gum::warn!(
target: LOG_TARGET,
%worker_pid,
"prepare job took {}ms cpu time, exceeded prepare timeout {}ms",
cpu_time_elapsed.as_millis(),
preparation_timeout.as_millis(),
);
Err(PrepareError::TimedOut)
},
Ok(None) => Err(PrepareError::IoErr(
"error communicating over closed channel".into(),
)),
// Errors in this thread are independent of the PVF.
Err(err) => Err(PrepareError::IoErr(stringify_panic_payload(err))),
}
Ok(ForkResult::Parent { child }) => {
// the read end will wait until all write ends have been closed,
// this drop is necessary to avoid deadlock
drop(pipe_writer);
handle_parent_process(
pipe_reader,
child,
temp_artifact_dest.clone(),
worker_pid,
usage_before,
preparation_timeout,
)
},
WaitOutcome::Pending => unreachable!(
"we run wait_while until the outcome is no longer pending; qed"
),
};
gum::trace!(
target: LOG_TARGET,
%worker_pid,
"worker: sending response to host: {:?}",
"worker: sending result to host: {:?}",
result
);
send_response(&mut stream, result)?;
@@ -385,10 +278,7 @@ pub fn worker_entrypoint(
);
}
fn prepare_artifact(
pvf: PvfPrepData,
cpu_time_start: ProcessTime,
) -> Result<(CompiledArtifact, Duration), PrepareError> {
fn prepare_artifact(pvf: PvfPrepData) -> Result<CompiledArtifact, PrepareError> {
let blob = match prevalidate(&pvf.code()) {
Err(err) => return Err(PrepareError::Prevalidation(format!("{:?}", err))),
Ok(b) => b,
@@ -398,7 +288,6 @@ fn prepare_artifact(
Ok(compiled_artifact) => Ok(CompiledArtifact::new(compiled_artifact)),
Err(err) => Err(PrepareError::Preparation(format!("{:?}", err))),
}
.map(|artifact| (artifact, cpu_time_start.elapsed()))
}
/// Try constructing the runtime to catch any instantiation errors during pre-checking.
@@ -412,3 +301,372 @@ fn runtime_construction_check(
.map(|_runtime| ())
.map_err(|err| PrepareError::RuntimeConstruction(format!("{:?}", err)))
}
#[derive(Encode, Decode)]
struct JobResponse {
artifact: CompiledArtifact,
memory_stats: MemoryStats,
}
/// This is used to handle child process during pvf prepare worker.
/// It prepares the artifact and tracks memory stats during preparation
/// and pipes back the response to the parent process
///
/// # Arguments
///
/// - `pvf`: `PvfPrepData` structure, containing data to prepare the artifact
///
/// - `pipe_write`: A `PipeWriter` structure, the writing end of a pipe.
///
/// - `preparation_timeout`: The timeout in `Duration`.
///
/// - `prepare_job_kind`: The kind of prepare job.
///
/// - `executor_params`: Deterministically serialized execution environment semantics.
///
/// # Returns
///
/// - If any error occur, pipe response back with `PrepareError`.
///
/// - If success, pipe back `JobResponse`.
fn handle_child_process(
pvf: PvfPrepData,
mut pipe_write: PipeWriter,
preparation_timeout: Duration,
prepare_job_kind: PrepareJobKind,
executor_params: Arc<ExecutorParams>,
) -> ! {
let worker_job_pid = process::id();
gum::debug!(
target: LOG_TARGET,
%worker_job_pid,
?prepare_job_kind,
?preparation_timeout,
"worker job: preparing artifact",
);
// Conditional variable to notify us when a thread is done.
let condvar = thread::get_condvar();
// Run the memory tracker in a regular, non-worker thread.
#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
let condvar_memory = Arc::clone(&condvar);
#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
let memory_tracker_thread = std::thread::spawn(|| memory_tracker_loop(condvar_memory));
start_memory_tracking(
pipe_write.as_raw_fd(),
executor_params.prechecking_max_memory().map(|v| {
v.try_into().unwrap_or_else(|_| {
gum::warn!(
LOG_TARGET,
%worker_job_pid,
"Illegal pre-checking max memory value {} discarded",
v,
);
0
})
}),
);
let cpu_time_start = ProcessTime::now();
// Spawn a new thread that runs the CPU time monitor.
let (cpu_time_monitor_tx, cpu_time_monitor_rx) = channel::<()>();
let cpu_time_monitor_thread = thread::spawn_worker_thread(
"cpu time monitor thread",
move || cpu_time_monitor_loop(cpu_time_start, preparation_timeout, cpu_time_monitor_rx),
Arc::clone(&condvar),
WaitOutcome::TimedOut,
)
.unwrap_or_else(|err| {
send_child_response(&mut pipe_write, Err(PrepareError::IoErr(err.to_string())))
});
let prepare_thread = spawn_worker_thread(
"prepare worker",
move || {
#[allow(unused_mut)]
let mut result = prepare_artifact(pvf);
// Get the `ru_maxrss` stat. If supported, call getrusage for the thread.
#[cfg(target_os = "linux")]
let mut result = result.map(|artifact| (artifact, get_max_rss_thread()));
// If we are pre-checking, check for runtime construction errors.
//
// As pre-checking is more strict than just preparation in terms of memory
// and time, it is okay to do extra checks here. This takes negligible time
// anyway.
if let PrepareJobKind::Prechecking = prepare_job_kind {
result = result.and_then(|output| {
runtime_construction_check(output.0.as_ref(), &executor_params)?;
Ok(output)
});
}
result
},
Arc::clone(&condvar),
WaitOutcome::Finished,
)
.unwrap_or_else(|err| {
send_child_response(&mut pipe_write, Err(PrepareError::IoErr(err.to_string())))
});
let outcome = thread::wait_for_threads(condvar);
let peak_alloc = {
let peak = end_memory_tracking();
gum::debug!(
target: LOG_TARGET,
%worker_job_pid,
"prepare job peak allocation is {} bytes",
peak,
);
peak
};
let result = match outcome {
WaitOutcome::Finished => {
let _ = cpu_time_monitor_tx.send(());
match prepare_thread.join().unwrap_or_else(|err| {
send_child_response(
&mut pipe_write,
Err(PrepareError::JobError(stringify_panic_payload(err))),
)
}) {
Err(err) => Err(err),
Ok(ok) => {
cfg_if::cfg_if! {
if #[cfg(target_os = "linux")] {
let (artifact, max_rss) = ok;
} else {
let artifact = ok;
}
}
// Stop the memory stats worker and get its observed memory stats.
#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
let memory_tracker_stats = get_memory_tracker_loop_stats(memory_tracker_thread, process::id());
let memory_stats = MemoryStats {
#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
memory_tracker_stats,
#[cfg(target_os = "linux")]
max_rss: extract_max_rss_stat(max_rss, process::id()),
// Negative peak allocation values are legit; they are narrow
// corner cases and shouldn't affect overall statistics
// significantly
peak_tracked_alloc: if peak_alloc > 0 { peak_alloc as u64 } else { 0u64 },
};
Ok(JobResponse { artifact, memory_stats })
},
}
},
// If the CPU thread is not selected, we signal it to end, the join handle is
// dropped and the thread will finish in the background.
WaitOutcome::TimedOut => match cpu_time_monitor_thread.join() {
Ok(Some(_cpu_time_elapsed)) => Err(PrepareError::TimedOut),
Ok(None) => Err(PrepareError::IoErr("error communicating over closed channel".into())),
Err(err) => Err(PrepareError::IoErr(stringify_panic_payload(err))),
},
WaitOutcome::Pending =>
unreachable!("we run wait_while until the outcome is no longer pending; qed"),
};
send_child_response(&mut pipe_write, result);
}
/// Waits for child process to finish and handle child response from pipe.
///
/// # Arguments
///
/// - `pipe_read`: A `PipeReader` used to read data from the child process.
///
/// - `child`: The child pid.
///
/// - `temp_artifact_dest`: The destination `PathBuf` to write the temporary artifact file.
///
/// - `worker_pid`: The PID of the child process.
///
/// - `usage_before`: Resource usage statistics before executing the child process.
///
/// - `timeout`: The maximum allowed time for the child process to finish, in `Duration`.
///
/// # Returns
///
/// - If the child send response without an error, this function returns `Ok(PrepareStats)`
/// containing memory and CPU usage statistics.
///
/// - If the child send response with an error, it returns a `PrepareError` with that error.
///
/// - If the child process timeout, it returns `PrepareError::TimedOut`.
fn handle_parent_process(
mut pipe_read: PipeReader,
child: Pid,
temp_artifact_dest: PathBuf,
worker_pid: u32,
usage_before: Usage,
timeout: Duration,
) -> Result<PrepareStats, PrepareError> {
// Read from the child. Don't decode unless the process exited normally, which we check later.
let mut received_data = Vec::new();
pipe_read
.read_to_end(&mut received_data)
.map_err(|err| PrepareError::IoErr(err.to_string()))?;
let status = nix::sys::wait::waitpid(child, None);
gum::trace!(
target: LOG_TARGET,
%worker_pid,
"prepare worker received wait status from job: {:?}",
status,
);
let usage_after = nix::sys::resource::getrusage(UsageWho::RUSAGE_CHILDREN)
.map_err(|errno| error_from_errno("getrusage after", errno))?;
// Using `getrusage` is needed to check whether child has timedout since we cannot rely on
// child to report its own time.
// As `getrusage` returns resource usage from all terminated child processes,
// it is necessary to subtract the usage before the current child process to isolate its cpu
// time
let cpu_tv = get_total_cpu_usage(usage_after) - get_total_cpu_usage(usage_before);
if cpu_tv >= timeout {
gum::warn!(
target: LOG_TARGET,
%worker_pid,
"prepare job took {}ms cpu time, exceeded prepare timeout {}ms",
cpu_tv.as_millis(),
timeout.as_millis(),
);
return Err(PrepareError::TimedOut)
}
match status {
Ok(WaitStatus::Exited(_pid, exit_status)) => {
let mut reader = io::BufReader::new(received_data.as_slice());
let result = recv_child_response(&mut reader)
.map_err(|err| PrepareError::JobError(err.to_string()))?;
match result {
Err(err) => Err(err),
Ok(response) => {
// The exit status should have been zero if no error occurred.
if exit_status != 0 {
return Err(PrepareError::JobError(format!(
"unexpected exit status: {}",
exit_status
)))
}
// Write the serialized artifact into a temp file.
//
// PVF host only keeps artifacts statuses in its memory,
// successfully compiled code gets stored on the disk (and
// consequently deserialized by execute-workers). The prepare worker
// is only required to send `Ok` to the pool to indicate the
// success.
gum::debug!(
target: LOG_TARGET,
%worker_pid,
"worker: writing artifact to {}",
temp_artifact_dest.display(),
);
// Write to the temp file created by the host.
if let Err(err) = fs::write(&temp_artifact_dest, &response.artifact) {
return Err(PrepareError::IoErr(err.to_string()))
};
Ok(PrepareStats {
memory_stats: response.memory_stats,
cpu_time_elapsed: cpu_tv,
})
},
}
},
// The job was killed by the given signal.
//
// The job gets SIGSYS on seccomp violations, but this signal may have been sent for some
// other reason, so we still need to check for seccomp violations elsewhere.
Ok(WaitStatus::Signaled(_pid, signal, _core_dump)) =>
Err(PrepareError::JobDied(format!("received signal: {signal:?}"))),
Err(errno) => Err(error_from_errno("waitpid", errno)),
// An attacker can make the child process return any exit status it wants. So we can treat
// all unexpected cases the same way.
Ok(unexpected_wait_status) => Err(PrepareError::JobDied(format!(
"unexpected status from wait: {unexpected_wait_status:?}"
))),
}
}
/// Calculate the total CPU time from the given `usage` structure, returned from
/// [`nix::sys::resource::getrusage`], and calculates the total CPU time spent, including both user
/// and system time.
///
/// # Arguments
///
/// - `rusage`: Contains resource usage information.
///
/// # Returns
///
/// Returns a `Duration` representing the total CPU time.
fn get_total_cpu_usage(rusage: Usage) -> Duration {
let micros = (((rusage.user_time().tv_sec() + rusage.system_time().tv_sec()) * 1_000_000) +
(rusage.system_time().tv_usec() + rusage.user_time().tv_usec()) as i64) as u64;
return Duration::from_micros(micros)
}
/// Get a job response.
fn recv_child_response(received_data: &mut io::BufReader<&[u8]>) -> io::Result<JobResult> {
let response_bytes = framed_recv_blocking(received_data)?;
JobResult::decode(&mut response_bytes.as_slice()).map_err(|e| {
io::Error::new(
io::ErrorKind::Other,
format!("prepare pvf recv_child_response: decode error: {:?}", e),
)
})
}
/// Write a job response to the pipe and exit process after.
///
/// # Arguments
///
/// - `pipe_write`: A `PipeWriter` structure, the writing end of a pipe.
///
/// - `response`: Child process response
fn send_child_response(pipe_write: &mut PipeWriter, response: JobResult) -> ! {
framed_send_blocking(pipe_write, response.encode().as_slice())
.unwrap_or_else(|_| process::exit(libc::EXIT_FAILURE));
if response.is_ok() {
process::exit(libc::EXIT_SUCCESS)
} else {
process::exit(libc::EXIT_FAILURE)
}
}
fn error_from_errno(context: &'static str, errno: Errno) -> PrepareError {
PrepareError::Kernel(format!("{}: {}: {}", context, errno, io::Error::last_os_error()))
}
type JobResult = Result<JobResponse, PrepareError>;
/// Pre-encoded length-prefixed `Result::Err(PrepareError::OutOfMemory)`
const OOM_PAYLOAD: &[u8] = b"\x02\x00\x00\x00\x00\x00\x00\x00\x01\x08";
#[test]
fn pre_encoded_payloads() {
// NOTE: This must match the type of `response` in `send_child_response`.
let oom_unencoded: JobResult = Result::Err(PrepareError::OutOfMemory);
let oom_encoded = oom_unencoded.encode();
// The payload is prefixed with its length in `framed_send`.
let mut oom_payload = oom_encoded.len().to_le_bytes().to_vec();
oom_payload.extend(oom_encoded);
assert_eq!(oom_payload, OOM_PAYLOAD);
}
+27 -26
View File
@@ -33,36 +33,37 @@ pub enum ValidationError {
pub enum InvalidCandidate {
/// PVF preparation ended up with a deterministic error.
PrepareError(String),
/// The failure is reported by the execution worker. The string contains the error message.
WorkerReportedError(String),
/// The worker has died during validation of a candidate. That may fall in one of the following
/// categories, which we cannot distinguish programmatically:
/// The candidate is reported to be invalid by the execution worker. The string contains the
/// error message.
WorkerReportedInvalid(String),
/// The worker process (not the job) has died during validation of a candidate.
///
/// (a) Some sort of transient glitch caused the worker process to abort. An example would be
/// that the host machine ran out of free memory and the OOM killer started killing the
/// processes, and in order to save the parent it will "sacrifice child" first.
///
/// (b) The candidate triggered a code path that has lead to the process death. For example,
/// the PVF found a way to consume unbounded amount of resources and then it either
/// exceeded an `rlimit` (if set) or, again, invited OOM killer. Another possibility is a
/// bug in wasmtime allowed the PVF to gain control over the execution worker.
///
/// We attribute such an event to an *invalid candidate* in either case.
///
/// The rationale for this is that a glitch may lead to unfair rejecting candidate by a single
/// validator. If the glitch is somewhat more persistent the validator will reject all
/// candidate thrown at it and hopefully the operator notices it by decreased reward
/// performance of the validator. On the other hand, if the worker died because of (b) we would
/// have better chances to stop the attack.
/// It's unlikely that this is caused by malicious code since workers spawn separate job
/// processes, and those job processes are sandboxed. But, it is possible. We retry in this
/// case, and if the error persists, we assume it's caused by the candidate and vote against.
AmbiguousWorkerDeath,
/// PVF execution (compilation is not included) took more time than was allotted.
HardTimeout,
/// A panic occurred and we can't be sure whether the candidate is really invalid or some
/// internal glitch occurred. Whenever we are unsure, we can never treat an error as internal
/// as we would abstain from voting. This is bad because if the issue was due to the candidate,
/// then all validators would abstain, stalling finality on the chain. So we will first retry
/// the candidate, and if the issue persists we are forced to vote invalid.
Panic(String),
/// The job process (not the worker) has died for one of the following reasons:
///
/// (a) A seccomp violation occurred, most likely due to an attempt by malicious code to
/// execute arbitrary code. Note that there is no foolproof way to detect this if the operator
/// has seccomp auditing disabled.
///
/// (b) The host machine ran out of free memory and the OOM killer started killing the
/// processes, and in order to save the parent it will "sacrifice child" first.
///
/// (c) Some other reason, perhaps transient or perhaps caused by malicious code.
///
/// We cannot treat this as an internal error because malicious code may have caused this.
AmbiguousJobDeath(String),
/// An unexpected error occurred in the job process and we can't be sure whether the candidate
/// is really invalid or some internal glitch occurred. Whenever we are unsure, we can never
/// treat an error as internal as we would abstain from voting. This is bad because if the
/// issue was due to the candidate, then all validators would abstain, stalling finality on the
/// chain. So we will first retry the candidate, and if the issue persists we are forced to
/// vote invalid.
JobError(String),
}
impl From<InternalValidationError> for ValidationError {
+11 -4
View File
@@ -342,20 +342,27 @@ fn handle_job_finish(
},
Outcome::InvalidCandidate { err, idle_worker } => (
Some(idle_worker),
Err(ValidationError::InvalidCandidate(InvalidCandidate::WorkerReportedError(err))),
Err(ValidationError::InvalidCandidate(InvalidCandidate::WorkerReportedInvalid(err))),
None,
),
Outcome::InternalError { err } => (None, Err(ValidationError::InternalError(err)), None),
// Either the worker or the job timed out. Kill the worker in either case. Treated as
// definitely-invalid, because if we timed out, there's no time left for a retry.
Outcome::HardTimeout =>
(None, Err(ValidationError::InvalidCandidate(InvalidCandidate::HardTimeout)), None),
// "Maybe invalid" errors (will retry).
Outcome::IoErr => (
Outcome::WorkerIntfErr => (
None,
Err(ValidationError::InvalidCandidate(InvalidCandidate::AmbiguousWorkerDeath)),
None,
),
Outcome::Panic { err } =>
(None, Err(ValidationError::InvalidCandidate(InvalidCandidate::Panic(err))), None),
Outcome::JobDied { err } => (
None,
Err(ValidationError::InvalidCandidate(InvalidCandidate::AmbiguousJobDeath(err))),
None,
),
Outcome::JobError { err } =>
(None, Err(ValidationError::InvalidCandidate(InvalidCandidate::JobError(err))), None),
};
queue.metrics.execute_finished();
@@ -30,7 +30,7 @@ use futures_timer::Delay;
use parity_scale_codec::{Decode, Encode};
use polkadot_node_core_pvf_common::{
error::InternalValidationError,
execute::{Handshake, Response},
execute::{Handshake, WorkerResponse},
worker_dir, SecurityStatus,
};
use polkadot_parachain_primitives::primitives::ValidationResult;
@@ -88,19 +88,26 @@ pub enum Outcome {
/// a trap. Errors related to the preparation process are not expected to be encountered by the
/// execution workers.
InvalidCandidate { err: String, idle_worker: IdleWorker },
/// The execution time exceeded the hard limit. The worker is terminated.
HardTimeout,
/// An I/O error happened during communication with the worker. This may mean that the worker
/// process already died. The token is not returned in any case.
WorkerIntfErr,
/// The job process has died. We must kill the worker just in case.
///
/// We cannot treat this as an internal error because malicious code may have caused this.
JobDied { err: String },
/// An unexpected error occurred in the job process.
///
/// Because malicious code can cause a job error, we must not treat it as an internal error.
JobError { err: String },
/// An internal error happened during the validation. Such an error is most likely related to
/// some transient glitch.
///
/// Should only ever be used for errors independent of the candidate and PVF. Therefore it may
/// be a problem with the worker, so we terminate it.
InternalError { err: InternalValidationError },
/// The execution time exceeded the hard limit. The worker is terminated.
HardTimeout,
/// An I/O error happened during communication with the worker. This may mean that the worker
/// process already died. The token is not returned in any case.
IoErr,
/// An unexpected panic has occurred in the execution worker.
Panic { err: String },
}
/// Given the idle token of a worker and parameters of work, communicates with the worker and
@@ -137,7 +144,7 @@ pub async fn start_work(
?error,
"failed to send an execute request",
);
return Outcome::IoErr
return Outcome::WorkerIntfErr
}
// We use a generous timeout here. This is in addition to the one in the child process, in
@@ -173,7 +180,7 @@ pub async fn start_work(
);
}
return Outcome::IoErr
return Outcome::WorkerIntfErr
},
Ok(response) => {
// Check if any syscall violations occurred during the job. For now this is
@@ -189,7 +196,7 @@ pub async fn start_work(
);
}
if let Response::Ok{duration, ..} = response {
if let WorkerResponse::Ok{duration, ..} = response {
if duration > execution_timeout {
// The job didn't complete within the timeout.
gum::warn!(
@@ -201,7 +208,7 @@ pub async fn start_work(
);
// Return a timeout error.
return Outcome::HardTimeout;
return Outcome::HardTimeout
}
}
@@ -216,23 +223,25 @@ pub async fn start_work(
validation_code_hash = ?artifact.id.code_hash,
"execution worker exceeded lenient timeout for execution, child worker likely stalled",
);
Response::TimedOut
WorkerResponse::JobTimedOut
},
};
match response {
Response::Ok { result_descriptor, duration } => Outcome::Ok {
WorkerResponse::Ok { result_descriptor, duration } => Outcome::Ok {
result_descriptor,
duration,
idle_worker: IdleWorker { stream, pid, worker_dir },
},
Response::InvalidCandidate(err) => Outcome::InvalidCandidate {
WorkerResponse::InvalidCandidate(err) => Outcome::InvalidCandidate {
err,
idle_worker: IdleWorker { stream, pid, worker_dir },
},
Response::TimedOut => Outcome::HardTimeout,
Response::Panic(err) => Outcome::Panic { err },
Response::InternalError(err) => Outcome::InternalError { err },
WorkerResponse::JobTimedOut => Outcome::HardTimeout,
WorkerResponse::JobDied(err) => Outcome::JobDied { err },
WorkerResponse::JobError(err) => Outcome::JobError { err },
WorkerResponse::InternalError(err) => Outcome::InternalError { err },
}
})
.await
@@ -306,9 +315,9 @@ async fn send_request(
framed_send(stream, &execution_timeout.encode()).await
}
async fn recv_response(stream: &mut UnixStream) -> io::Result<Response> {
async fn recv_response(stream: &mut UnixStream) -> io::Result<WorkerResponse> {
let response_bytes = framed_recv(stream).await?;
Response::decode(&mut &response_bytes[..]).map_err(|e| {
WorkerResponse::decode(&mut response_bytes.as_slice()).map_err(|e| {
io::Error::new(
io::ErrorKind::Other,
format!("execute pvf recv_response: decode error: {:?}", e),
+18 -3
View File
@@ -339,17 +339,17 @@ fn handle_mux(
spawned,
worker,
idle,
Err(PrepareError::CreateTmpFileErr(err)),
Err(PrepareError::CreateTmpFile(err)),
),
// Return `Concluded`, but do not kill the worker since the error was on the host
// side.
Outcome::RenameTmpFileErr { worker: idle, result: _, err, src, dest } =>
Outcome::RenameTmpFile { worker: idle, result: _, err, src, dest } =>
handle_concluded_no_rip(
from_pool,
spawned,
worker,
idle,
Err(PrepareError::RenameTmpFileErr { err, src, dest }),
Err(PrepareError::RenameTmpFile { err, src, dest }),
),
// Could not clear worker cache. Kill the worker so other jobs can't see the data.
Outcome::ClearWorkerDir { err } => {
@@ -387,6 +387,21 @@ fn handle_mux(
Ok(())
},
// The worker might still be usable, but we kill it just in case.
Outcome::JobDied(err) => {
if attempt_retire(metrics, spawned, worker) {
reply(
from_pool,
FromPool::Concluded {
worker,
rip: true,
result: Err(PrepareError::JobDied(err)),
},
)?;
}
Ok(())
},
Outcome::TimedOut => {
if attempt_retire(metrics, spawned, worker) {
reply(
@@ -79,7 +79,7 @@ pub enum Outcome {
CreateTmpFileErr { worker: IdleWorker, err: String },
/// The response from the worker is received, but the tmp file cannot be renamed (moved) to the
/// final destination location.
RenameTmpFileErr {
RenameTmpFile {
worker: IdleWorker,
result: PrepareResult,
err: String,
@@ -100,6 +100,10 @@ pub enum Outcome {
IoErr(String),
/// The worker ran out of memory and is aborting. The worker should be ripped.
OutOfMemory,
/// The preparation job process died, due to OOM, a seccomp violation, or some other factor.
///
/// The worker might still be usable, but we kill it just in case.
JobDied(String),
}
/// Given the idle token of a worker and parameters of work, communicates with the worker and
@@ -187,21 +191,6 @@ pub async fn start_work(
"failed to recv a prepare response: {:?}",
err,
);
// The worker died. Check if it was due to a seccomp violation.
//
// NOTE: Log, but don't change the outcome. Not all validators may have auditing
// enabled, so we don't want attackers to abuse a non-deterministic outcome.
for syscall in security::check_seccomp_violations_for_worker(audit_log_file, pid).await {
gum::error!(
target: LOG_TARGET,
worker_pid = %pid,
%syscall,
?pvf,
"A forbidden syscall was attempted! This is a violation of our seccomp security policy. Report an issue ASAP!"
);
}
Outcome::IoErr(err.to_string())
},
Err(_) => {
@@ -236,6 +225,7 @@ async fn handle_response(
Ok(result) => result,
// Timed out on the child. This should already be logged by the child.
Err(PrepareError::TimedOut) => return Outcome::TimedOut,
Err(PrepareError::JobDied(err)) => return Outcome::JobDied(err),
Err(PrepareError::OutOfMemory) => return Outcome::OutOfMemory,
Err(_) => return Outcome::Concluded { worker, result },
};
@@ -272,7 +262,7 @@ async fn handle_response(
artifact_path.display(),
err,
);
Outcome::RenameTmpFileErr {
Outcome::RenameTmpFile {
worker,
result,
err: format!("{:?}", err),
+20 -14
View File
@@ -14,7 +14,7 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! Various things for testing other crates.
//! Various utilities for testing.
pub use crate::{
host::{EXECUTE_BINARY_NAME, PREPARE_BINARY_NAME},
@@ -59,27 +59,33 @@ pub fn validate_candidate(
///
/// NOTE: This should only be called in dev code (tests, benchmarks) as it relies on the relative
/// paths of the built workers.
pub fn get_and_check_worker_paths() -> (PathBuf, PathBuf) {
pub fn build_workers_and_get_paths(is_bench: bool) -> (PathBuf, PathBuf) {
// Only needs to be called once for the current process.
static WORKER_PATHS: OnceLock<Mutex<(PathBuf, PathBuf)>> = OnceLock::new();
fn build_workers() {
let build_args = vec![
fn build_workers(is_bench: bool) {
let mut build_args = vec![
"build",
"--package=polkadot",
"--bin=polkadot-prepare-worker",
"--bin=polkadot-execute-worker",
];
let exit_status = std::process::Command::new("cargo")
if is_bench {
// Benches require --release. Regular tests are debug (no flag needed).
build_args.push("--release");
}
let mut cargo = std::process::Command::new("cargo");
let cmd = cargo
// wasm runtime not needed
.env("SKIP_WASM_BUILD", "1")
.args(build_args)
.stdout(std::process::Stdio::piped())
.status()
.expect("Failed to run the build program");
.stdout(std::process::Stdio::piped());
println!("INFO: calling `{cmd:?}`");
let exit_status = cmd.status().expect("Failed to run the build program");
if !exit_status.success() {
eprintln!("Failed to build workers: {}", exit_status.code().unwrap());
eprintln!("ERROR: Failed to build workers: {}", exit_status.code().unwrap());
std::process::exit(1);
}
}
@@ -95,23 +101,23 @@ pub fn get_and_check_worker_paths() -> (PathBuf, PathBuf) {
// explain why a build happens
if !prepare_worker_path.is_executable() {
eprintln!("Prepare worker does not exist or is not executable. Workers directory: {:?}", workers_path);
println!("WARN: Prepare worker does not exist or is not executable. Workers directory: {:?}", workers_path);
}
if !execute_worker_path.is_executable() {
eprintln!("Execute worker does not exist or is not executable. Workers directory: {:?}", workers_path);
println!("WARN: Execute worker does not exist or is not executable. Workers directory: {:?}", workers_path);
}
if let Ok(ver) = get_worker_version(&prepare_worker_path) {
if ver != NODE_VERSION {
eprintln!("Prepare worker version {ver} does not match node version {NODE_VERSION}; worker path: {prepare_worker_path:?}");
println!("WARN: Prepare worker version {ver} does not match node version {NODE_VERSION}; worker path: {prepare_worker_path:?}");
}
}
if let Ok(ver) = get_worker_version(&execute_worker_path) {
if ver != NODE_VERSION {
eprintln!("Execute worker version {ver} does not match node version {NODE_VERSION}; worker path: {execute_worker_path:?}");
println!("WARN: Execute worker version {ver} does not match node version {NODE_VERSION}; worker path: {execute_worker_path:?}");
}
}
build_workers();
build_workers(is_bench);
Mutex::new((prepare_worker_path, execute_worker_path))
});
+26 -109
View File
@@ -19,23 +19,23 @@
use assert_matches::assert_matches;
use parity_scale_codec::Encode as _;
use polkadot_node_core_pvf::{
start, testing::get_and_check_worker_paths, Config, InvalidCandidate, Metrics, PrepareError,
start, testing::build_workers_and_get_paths, Config, InvalidCandidate, Metrics, PrepareError,
PrepareJobKind, PrepareStats, PvfPrepData, ValidationError, ValidationHost,
JOB_TIMEOUT_WALL_CLOCK_FACTOR,
};
use polkadot_parachain_primitives::primitives::{BlockData, ValidationParams, ValidationResult};
use polkadot_primitives::{ExecutorParam, ExecutorParams};
#[cfg(target_os = "linux")]
use rusty_fork::rusty_fork_test;
use std::time::Duration;
use tokio::sync::Mutex;
mod adder;
#[cfg(target_os = "linux")]
mod process;
mod worker_common;
const TEST_EXECUTION_TIMEOUT: Duration = Duration::from_secs(3);
const TEST_PREPARATION_TIMEOUT: Duration = Duration::from_secs(3);
const TEST_EXECUTION_TIMEOUT: Duration = Duration::from_secs(6);
const TEST_PREPARATION_TIMEOUT: Duration = Duration::from_secs(6);
struct TestHost {
cache_dir: tempfile::TempDir,
@@ -51,7 +51,7 @@ impl TestHost {
where
F: FnOnce(&mut Config),
{
let (prepare_worker_path, execute_worker_path) = get_and_check_worker_paths();
let (prepare_worker_path, execute_worker_path) = build_workers_and_get_paths(false);
let cache_dir = tempfile::tempdir().unwrap();
let mut config = Config::new(
@@ -126,7 +126,26 @@ impl TestHost {
}
#[tokio::test]
async fn terminates_on_timeout() {
async fn prepare_job_terminates_on_timeout() {
let host = TestHost::new().await;
let start = std::time::Instant::now();
let result = host
.precheck_pvf(rococo_runtime::WASM_BINARY.unwrap(), Default::default())
.await;
match result {
Err(PrepareError::TimedOut) => {},
r => panic!("{:?}", r),
}
let duration = std::time::Instant::now().duration_since(start);
assert!(duration >= TEST_PREPARATION_TIMEOUT);
assert!(duration < TEST_PREPARATION_TIMEOUT * JOB_TIMEOUT_WALL_CLOCK_FACTOR);
}
#[tokio::test]
async fn execute_job_terminates_on_timeout() {
let host = TestHost::new().await;
let start = std::time::Instant::now();
@@ -153,108 +172,6 @@ async fn terminates_on_timeout() {
assert!(duration < TEST_EXECUTION_TIMEOUT * JOB_TIMEOUT_WALL_CLOCK_FACTOR);
}
#[cfg(target_os = "linux")]
fn kill_by_sid_and_name(sid: i32, exe_name: &'static str) {
use procfs::process;
let all_processes: Vec<process::Process> = process::all_processes()
.expect("Can't read /proc")
.filter_map(|p| match p {
Ok(p) => Some(p), // happy path
Err(e) => match e {
// process vanished during iteration, ignore it
procfs::ProcError::NotFound(_) => None,
x => {
panic!("some unknown error: {}", x);
},
},
})
.collect();
for process in all_processes {
if process.stat().unwrap().session == sid &&
process.exe().unwrap().to_str().unwrap().contains(exe_name)
{
assert_eq!(unsafe { libc::kill(process.pid(), 9) }, 0);
}
}
}
// Run these tests in their own processes with rusty-fork. They work by each creating a new session,
// then killing the worker process that matches the session ID and expected worker name.
#[cfg(target_os = "linux")]
rusty_fork_test! {
// What happens when the prepare worker dies in the middle of a job?
#[test]
fn prepare_worker_killed_during_job() {
const PROCESS_NAME: &'static str = "polkadot-prepare-worker";
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
let host = TestHost::new().await;
// Create a new session and get the session ID.
let sid = unsafe { libc::setsid() };
assert!(sid > 0);
let (result, _) = futures::join!(
// Choose a job that would normally take the entire timeout.
host.precheck_pvf(rococo_runtime::WASM_BINARY.unwrap(), Default::default()),
// Run a future that kills the job in the middle of the timeout.
async {
tokio::time::sleep(TEST_PREPARATION_TIMEOUT / 2).await;
kill_by_sid_and_name(sid, PROCESS_NAME);
}
);
assert_matches!(result, Err(PrepareError::IoErr(_)));
})
}
// What happens when the execute worker dies in the middle of a job?
#[test]
fn execute_worker_killed_during_job() {
const PROCESS_NAME: &'static str = "polkadot-execute-worker";
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
let host = TestHost::new().await;
// Create a new session and get the session ID.
let sid = unsafe { libc::setsid() };
assert!(sid > 0);
// Prepare the artifact ahead of time.
let binary = halt::wasm_binary_unwrap();
host.precheck_pvf(binary, Default::default()).await.unwrap();
let (result, _) = futures::join!(
// Choose an job that would normally take the entire timeout.
host.validate_candidate(
binary,
ValidationParams {
block_data: BlockData(Vec::new()),
parent_head: Default::default(),
relay_parent_number: 1,
relay_parent_storage_root: Default::default(),
},
Default::default(),
),
// Run a future that kills the job in the middle of the timeout.
async {
tokio::time::sleep(TEST_EXECUTION_TIMEOUT / 2).await;
kill_by_sid_and_name(sid, PROCESS_NAME);
}
);
assert_matches!(
result,
Err(ValidationError::InvalidCandidate(InvalidCandidate::AmbiguousWorkerDeath))
);
})
}
}
#[cfg(feature = "ci-only-tests")]
#[tokio::test]
async fn ensure_parallel_execution() {
+383
View File
@@ -0,0 +1,383 @@
// Copyright (C) Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! Test unexpected behaviors of the spawned processes. We test both worker processes (directly
//! spawned by the host) and job processes (spawned by the workers to securely perform PVF jobs).
use super::TestHost;
use assert_matches::assert_matches;
use polkadot_node_core_pvf::{InvalidCandidate, PrepareError, ValidationError};
use polkadot_parachain_primitives::primitives::{BlockData, ValidationParams};
use procfs::process;
use rusty_fork::rusty_fork_test;
use std::time::Duration;
const PREPARE_PROCESS_NAME: &'static str = "polkadot-prepare-worker";
const EXECUTE_PROCESS_NAME: &'static str = "polkadot-execute-worker";
const SIGNAL_KILL: i32 = 9;
const SIGNAL_STOP: i32 = 19;
fn send_signal_by_sid_and_name(
sid: i32,
exe_name: &'static str,
is_direct_child: bool,
signal: i32,
) {
let process = find_process_by_sid_and_name(sid, exe_name, is_direct_child);
assert_eq!(unsafe { libc::kill(process.pid(), signal) }, 0);
}
fn get_num_threads_by_sid_and_name(sid: i32, exe_name: &'static str, is_direct_child: bool) -> i64 {
let process = find_process_by_sid_and_name(sid, exe_name, is_direct_child);
process.stat().unwrap().num_threads
}
fn find_process_by_sid_and_name(
sid: i32,
exe_name: &'static str,
is_direct_child: bool,
) -> process::Process {
let all_processes: Vec<process::Process> = process::all_processes()
.expect("Can't read /proc")
.filter_map(|p| match p {
Ok(p) => Some(p), // happy path
Err(e) => match e {
// process vanished during iteration, ignore it
procfs::ProcError::NotFound(_) => None,
x => {
panic!("some unknown error: {}", x);
},
},
})
.collect();
let mut found = None;
for process in all_processes {
let stat = process.stat().unwrap();
if stat.session != sid || !process.exe().unwrap().to_str().unwrap().contains(exe_name) {
continue
}
// The workers are direct children of the current process, the worker job processes are not
// (they are children of the workers).
let process_is_direct_child = stat.ppid as u32 == std::process::id();
if is_direct_child != process_is_direct_child {
continue
}
if found.is_some() {
panic!("Found more than one process")
}
found = Some(process);
}
found.expect("Should have found the expected process")
}
// Run these tests in their own processes with rusty-fork. They work by each creating a new session,
// then doing something with the child process that matches the session ID and expected process
// name.
rusty_fork_test! {
// What happens when the prepare worker (not the job) times out?
#[test]
fn prepare_worker_timeout() {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
let host = TestHost::new().await;
// Create a new session and get the session ID.
let sid = unsafe { libc::setsid() };
assert!(sid > 0);
let (result, _) = futures::join!(
// Choose a job that would normally take the entire timeout.
host.precheck_pvf(rococo_runtime::WASM_BINARY.unwrap(), Default::default()),
// Send a stop signal to pause the worker.
async {
tokio::time::sleep(Duration::from_secs(1)).await;
send_signal_by_sid_and_name(sid, PREPARE_PROCESS_NAME, true, SIGNAL_STOP);
}
);
assert_matches!(result, Err(PrepareError::TimedOut));
})
}
// What happens when the execute worker (not the job) times out?
#[test]
fn execute_worker_timeout() {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
let host = TestHost::new().await;
// Create a new session and get the session ID.
let sid = unsafe { libc::setsid() };
assert!(sid > 0);
// Prepare the artifact ahead of time.
let binary = halt::wasm_binary_unwrap();
host.precheck_pvf(binary, Default::default()).await.unwrap();
let (result, _) = futures::join!(
// Choose an job that would normally take the entire timeout.
host.validate_candidate(
binary,
ValidationParams {
block_data: BlockData(Vec::new()),
parent_head: Default::default(),
relay_parent_number: 1,
relay_parent_storage_root: Default::default(),
},
Default::default(),
),
// Send a stop signal to pause the worker.
async {
tokio::time::sleep(Duration::from_secs(1)).await;
send_signal_by_sid_and_name(sid, EXECUTE_PROCESS_NAME, true, SIGNAL_STOP);
}
);
assert_matches!(
result,
Err(ValidationError::InvalidCandidate(InvalidCandidate::HardTimeout))
);
})
}
// What happens when the prepare worker dies in the middle of a job?
#[test]
fn prepare_worker_killed_during_job() {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
let host = TestHost::new().await;
// Create a new session and get the session ID.
let sid = unsafe { libc::setsid() };
assert!(sid > 0);
let (result, _) = futures::join!(
// Choose a job that would normally take the entire timeout.
host.precheck_pvf(rococo_runtime::WASM_BINARY.unwrap(), Default::default()),
// Run a future that kills the job while it's running.
async {
tokio::time::sleep(Duration::from_secs(1)).await;
send_signal_by_sid_and_name(sid, PREPARE_PROCESS_NAME, true, SIGNAL_KILL);
}
);
assert_matches!(result, Err(PrepareError::IoErr(_)));
})
}
// What happens when the execute worker dies in the middle of a job?
#[test]
fn execute_worker_killed_during_job() {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
let host = TestHost::new().await;
// Create a new session and get the session ID.
let sid = unsafe { libc::setsid() };
assert!(sid > 0);
// Prepare the artifact ahead of time.
let binary = halt::wasm_binary_unwrap();
host.precheck_pvf(binary, Default::default()).await.unwrap();
let (result, _) = futures::join!(
// Choose an job that would normally take the entire timeout.
host.validate_candidate(
binary,
ValidationParams {
block_data: BlockData(Vec::new()),
parent_head: Default::default(),
relay_parent_number: 1,
relay_parent_storage_root: Default::default(),
},
Default::default(),
),
// Run a future that kills the job while it's running.
async {
tokio::time::sleep(Duration::from_secs(1)).await;
send_signal_by_sid_and_name(sid, EXECUTE_PROCESS_NAME, true, SIGNAL_KILL);
}
);
assert_matches!(
result,
Err(ValidationError::InvalidCandidate(InvalidCandidate::AmbiguousWorkerDeath))
);
})
}
// What happens when the forked prepare job dies in the middle of its job?
#[test]
fn forked_prepare_job_killed_during_job() {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
let host = TestHost::new().await;
// Create a new session and get the session ID.
let sid = unsafe { libc::setsid() };
assert!(sid > 0);
let (result, _) = futures::join!(
// Choose a job that would normally take the entire timeout.
host.precheck_pvf(rococo_runtime::WASM_BINARY.unwrap(), Default::default()),
// Run a future that kills the job while it's running.
async {
tokio::time::sleep(Duration::from_secs(1)).await;
send_signal_by_sid_and_name(sid, PREPARE_PROCESS_NAME, false, SIGNAL_KILL);
}
);
// Note that we get a more specific error if the job died than if the whole worker died.
assert_matches!(
result,
Err(PrepareError::JobDied(err)) if err == "received signal: SIGKILL"
);
})
}
// What happens when the forked execute job dies in the middle of its job?
#[test]
fn forked_execute_job_killed_during_job() {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
let host = TestHost::new().await;
// Create a new session and get the session ID.
let sid = unsafe { libc::setsid() };
assert!(sid > 0);
// Prepare the artifact ahead of time.
let binary = halt::wasm_binary_unwrap();
host.precheck_pvf(binary, Default::default()).await.unwrap();
let (result, _) = futures::join!(
// Choose a job that would normally take the entire timeout.
host.validate_candidate(
binary,
ValidationParams {
block_data: BlockData(Vec::new()),
parent_head: Default::default(),
relay_parent_number: 1,
relay_parent_storage_root: Default::default(),
},
Default::default(),
),
// Run a future that kills the job while it's running.
async {
tokio::time::sleep(Duration::from_secs(1)).await;
send_signal_by_sid_and_name(sid, EXECUTE_PROCESS_NAME, false, SIGNAL_KILL);
}
);
// Note that we get a more specific error if the job died than if the whole worker died.
assert_matches!(
result,
Err(ValidationError::InvalidCandidate(InvalidCandidate::AmbiguousJobDeath(err)))
if err == "received signal: SIGKILL"
);
})
}
// Ensure that the spawned prepare worker is single-threaded.
//
// See `run_worker` for why we need this invariant.
#[test]
fn ensure_prepare_processes_have_correct_num_threads() {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
let host = TestHost::new().await;
// Create a new session and get the session ID.
let sid = unsafe { libc::setsid() };
assert!(sid > 0);
let _ = futures::join!(
// Choose a job that would normally take the entire timeout.
host.precheck_pvf(rococo_runtime::WASM_BINARY.unwrap(), Default::default()),
// Run a future that kills the job while it's running.
async {
tokio::time::sleep(Duration::from_secs(1)).await;
assert_eq!(
get_num_threads_by_sid_and_name(sid, PREPARE_PROCESS_NAME, true),
1
);
// Child job should have three threads: main thread, execute thread, CPU time
// monitor, and memory tracking.
assert_eq!(
get_num_threads_by_sid_and_name(sid, PREPARE_PROCESS_NAME, false),
4
);
// End the test.
send_signal_by_sid_and_name(sid, PREPARE_PROCESS_NAME, true, SIGNAL_KILL);
}
);
})
}
// Ensure that the spawned execute worker is single-threaded.
//
// See `run_worker` for why we need this invariant.
#[test]
fn ensure_execute_processes_have_correct_num_threads() {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
let host = TestHost::new().await;
// Create a new session and get the session ID.
let sid = unsafe { libc::setsid() };
assert!(sid > 0);
// Prepare the artifact ahead of time.
let binary = halt::wasm_binary_unwrap();
host.precheck_pvf(binary, Default::default()).await.unwrap();
let _ = futures::join!(
// Choose a job that would normally take the entire timeout.
host.validate_candidate(
binary,
ValidationParams {
block_data: BlockData(Vec::new()),
parent_head: Default::default(),
relay_parent_number: 1,
relay_parent_storage_root: Default::default(),
},
Default::default(),
),
// Run a future that tests the thread count while the worker is running.
async {
tokio::time::sleep(Duration::from_secs(1)).await;
assert_eq!(
get_num_threads_by_sid_and_name(sid, EXECUTE_PROCESS_NAME, true),
1
);
// Child job should have three threads: main thread, execute thread, and CPU
// time monitor.
assert_eq!(
get_num_threads_by_sid_and_name(sid, EXECUTE_PROCESS_NAME, false),
3
);
// End the test.
send_signal_by_sid_and_name(sid, EXECUTE_PROCESS_NAME, true, SIGNAL_KILL);
}
);
})
}
}
@@ -15,7 +15,7 @@
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use polkadot_node_core_pvf::{
testing::{get_and_check_worker_paths, spawn_with_program_path, SpawnErr},
testing::{build_workers_and_get_paths, spawn_with_program_path, SpawnErr},
SecurityStatus,
};
use std::{env, time::Duration};
@@ -23,7 +23,7 @@ use std::{env, time::Duration};
// Test spawning a program that immediately exits with a failure code.
#[tokio::test]
async fn spawn_immediate_exit() {
let (prepare_worker_path, _) = get_and_check_worker_paths();
let (prepare_worker_path, _) = build_workers_and_get_paths(false);
// There's no explicit `exit` subcommand in the worker; it will panic on an unknown
// subcommand anyway
@@ -41,7 +41,7 @@ async fn spawn_immediate_exit() {
#[tokio::test]
async fn spawn_timeout() {
let (_, execute_worker_path) = get_and_check_worker_paths();
let (_, execute_worker_path) = build_workers_and_get_paths(false);
let result = spawn_with_program_path(
"integration-test",
@@ -57,7 +57,7 @@ async fn spawn_timeout() {
#[tokio::test]
async fn should_connect() {
let (prepare_worker_path, _) = get_and_check_worker_paths();
let (prepare_worker_path, _) = build_workers_and_get_paths(false);
let _ = spawn_with_program_path(
"integration-test",