Use clone instead of fork on pvf (#2477)

@mrcnski Done the change on the prepare worker, once the prepare worker
part is good I'll do the same for the execute worker.

This is based on
https://github.com/koute/polkavm/blob/11beebd06276ce9b84f335350138479e714f6caf/crates/polkavm/src/sandbox/linux.rs#L711.

## TODO

- [x] Add a check for this capability at startup
- [x] Add prdoc mentioning the new Secure Validator Mode (optional)
requirement.

## Related

Closes #2162

---------

Co-authored-by: Marcin S <marcin@realemail.net>
This commit is contained in:
jserrat
2024-01-21 11:15:36 +00:00
committed by GitHub
parent caa987d26d
commit 21ef949b6e
20 changed files with 861 additions and 395 deletions
Generated
+2 -12
View File
@@ -9028,16 +9028,6 @@ dependencies = [
"num-traits",
]
[[package]]
name = "os_pipe"
version = "1.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0ae859aa07428ca9a929b936690f8b12dc5f11dd8c6992a18ca93919f28bc177"
dependencies = [
"libc",
"windows-sys 0.48.0",
]
[[package]]
name = "os_str_bytes"
version = "6.5.1"
@@ -12744,6 +12734,7 @@ dependencies = [
"futures",
"landlock",
"libc",
"nix 0.27.1",
"parity-scale-codec",
"polkadot-parachain-primitives",
"polkadot-primitives",
@@ -12764,10 +12755,10 @@ dependencies = [
name = "polkadot-node-core-pvf-execute-worker"
version = "1.0.0"
dependencies = [
"cfg-if",
"cpu-time",
"libc",
"nix 0.27.1",
"os_pipe",
"parity-scale-codec",
"polkadot-node-core-pvf-common",
"polkadot-parachain-primitives",
@@ -12784,7 +12775,6 @@ dependencies = [
"criterion 0.4.0",
"libc",
"nix 0.27.1",
"os_pipe",
"parity-scale-codec",
"polkadot-node-core-pvf-common",
"polkadot-primitives",
+1
View File
@@ -33,6 +33,7 @@ sp-tracing = { path = "../../../../../substrate/primitives/tracing" }
[target.'cfg(target_os = "linux")'.dependencies]
landlock = "0.3.0"
nix = { version = "0.27.1", features = ["sched"] }
seccompiler = "0.4.0"
[dev-dependencies]
+3 -2
View File
@@ -92,10 +92,11 @@ pub enum JobError {
TimedOut,
#[error("An unexpected panic has occurred in the execution job: {0}")]
Panic(String),
/// Some error occurred when interfacing with the kernel.
#[error("Error interfacing with the kernel: {0}")]
Kernel(String),
#[error("Could not spawn the requested thread: {0}")]
CouldNotSpawnThread(String),
#[error("An error occurred in the CPU time monitor thread: {0}")]
CpuTimeMonitorThread(String),
#[error("Could not set pdeathsig: {0}")]
CouldNotSetPdeathsig(String),
}
@@ -140,7 +140,7 @@ pub unsafe fn create_runtime_from_artifact_bytes(
executor_params: &ExecutorParams,
) -> Result<WasmtimeRuntime, WasmError> {
let mut config = DEFAULT_CONFIG.clone();
config.semantics = params_to_wasmtime_semantics(executor_params);
config.semantics = params_to_wasmtime_semantics(executor_params).0;
sc_executor_wasmtime::create_runtime_from_artifact_bytes::<HostFunctions>(
compiled_artifact_blob,
@@ -148,7 +148,10 @@ pub unsafe fn create_runtime_from_artifact_bytes(
)
}
pub fn params_to_wasmtime_semantics(par: &ExecutorParams) -> Semantics {
/// Takes the default config and overwrites any settings with existing executor parameters.
///
/// Returns the semantics as well as the stack limit (since we are guaranteed to have it).
pub fn params_to_wasmtime_semantics(par: &ExecutorParams) -> (Semantics, DeterministicStackLimit) {
let mut sem = DEFAULT_CONFIG.semantics.clone();
let mut stack_limit = sem
.deterministic_stack_limit
@@ -169,8 +172,8 @@ pub fn params_to_wasmtime_semantics(par: &ExecutorParams) -> Semantics {
ExecutorParam::PvfExecTimeout(_, _) => (), /* Not used here */
}
}
sem.deterministic_stack_limit = Some(stack_limit);
sem
sem.deterministic_stack_limit = Some(stack_limit.clone());
(sem, stack_limit)
}
/// Runs the prevalidation on the given code. Returns a [`RuntimeBlob`] if it succeeds.
@@ -187,7 +190,7 @@ pub fn prepare(
blob: RuntimeBlob,
executor_params: &ExecutorParams,
) -> Result<Vec<u8>, sc_executor_common::error::WasmError> {
let semantics = params_to_wasmtime_semantics(executor_params);
let (semantics, _) = params_to_wasmtime_semantics(executor_params);
sc_executor_wasmtime::prepare_runtime_artifact(blob, &semantics)
}
+2
View File
@@ -57,6 +57,8 @@ pub struct SecurityStatus {
pub can_enable_seccomp: bool,
/// Whether we are able to unshare the user namespace and change the filesystem root.
pub can_unshare_user_namespace_and_change_root: bool,
/// Whether we are able to call `clone` with all sandboxing flags.
pub can_do_secure_clone: bool,
}
/// A handshake with information for the worker.
+128 -32
View File
@@ -18,14 +18,19 @@
pub mod security;
use crate::{framed_recv_blocking, WorkerHandshake, LOG_TARGET};
use crate::{framed_recv_blocking, SecurityStatus, WorkerHandshake, LOG_TARGET};
use cpu_time::ProcessTime;
use futures::never::Never;
use parity_scale_codec::Decode;
use std::{
any::Any,
fmt, io,
os::unix::net::UnixStream,
fmt::{self},
fs::File,
io::{self, Read, Write},
os::{
fd::{AsRawFd, FromRawFd, RawFd},
unix::net::UnixStream,
},
path::PathBuf,
sync::mpsc::{Receiver, RecvTimeoutError},
time::Duration,
@@ -78,7 +83,7 @@ macro_rules! decl_worker_main {
"--check-can-enable-landlock" => {
#[cfg(target_os = "linux")]
let status = if let Err(err) = security::landlock::check_is_fully_enabled() {
let status = if let Err(err) = security::landlock::check_can_fully_enable() {
// Write the error to stderr, log it on the host-side.
eprintln!("{}", err);
-1
@@ -91,7 +96,7 @@ macro_rules! decl_worker_main {
},
"--check-can-enable-seccomp" => {
#[cfg(all(target_os = "linux", target_arch = "x86_64"))]
let status = if let Err(err) = security::seccomp::check_is_fully_enabled() {
let status = if let Err(err) = security::seccomp::check_can_fully_enable() {
// Write the error to stderr, log it on the host-side.
eprintln!("{}", err);
-1
@@ -107,7 +112,7 @@ macro_rules! decl_worker_main {
let cache_path_tempdir = std::path::Path::new(&args[2]);
#[cfg(target_os = "linux")]
let status = if let Err(err) =
security::change_root::check_is_fully_enabled(&cache_path_tempdir)
security::change_root::check_can_fully_enable(&cache_path_tempdir)
{
// Write the error to stderr, log it on the host-side.
eprintln!("{}", err);
@@ -119,6 +124,21 @@ macro_rules! decl_worker_main {
let status = -1;
std::process::exit(status)
},
"--check-can-do-secure-clone" => {
#[cfg(target_os = "linux")]
// SAFETY: new process is spawned within a single threaded process. This
// invariant is enforced by tests.
let status = if let Err(err) = unsafe { security::clone::check_can_fully_clone() } {
// Write the error to stderr, log it on the host-side.
eprintln!("{}", err);
-1
} else {
0
};
#[cfg(not(target_os = "linux"))]
let status = -1;
std::process::exit(status)
},
"test-sleep" => {
std::thread::sleep(std::time::Duration::from_secs(5));
@@ -171,6 +191,84 @@ macro_rules! decl_worker_main {
};
}
//taken from the os_pipe crate. Copied here to reduce one dependency and
// because its type-safe abstractions do not play well with nix's clone
#[cfg(not(target_os = "macos"))]
pub fn pipe2_cloexec() -> io::Result<(libc::c_int, libc::c_int)> {
let mut fds: [libc::c_int; 2] = [0; 2];
let res = unsafe { libc::pipe2(fds.as_mut_ptr(), libc::O_CLOEXEC) };
if res != 0 {
return Err(io::Error::last_os_error())
}
Ok((fds[0], fds[1]))
}
#[cfg(target_os = "macos")]
pub fn pipe2_cloexec() -> io::Result<(libc::c_int, libc::c_int)> {
let mut fds: [libc::c_int; 2] = [0; 2];
let res = unsafe { libc::pipe(fds.as_mut_ptr()) };
if res != 0 {
return Err(io::Error::last_os_error())
}
let res = unsafe { libc::fcntl(fds[0], libc::F_SETFD, libc::FD_CLOEXEC) };
if res != 0 {
return Err(io::Error::last_os_error())
}
let res = unsafe { libc::fcntl(fds[1], libc::F_SETFD, libc::FD_CLOEXEC) };
if res != 0 {
return Err(io::Error::last_os_error())
}
Ok((fds[0], fds[1]))
}
/// A wrapper around a file descriptor used to encapsulate and restrict
/// functionality for pipe operations.
pub struct PipeFd {
file: File,
}
impl AsRawFd for PipeFd {
/// Returns the raw file descriptor associated with this `PipeFd`
fn as_raw_fd(&self) -> RawFd {
self.file.as_raw_fd()
}
}
impl FromRawFd for PipeFd {
/// Creates a new `PipeFd` instance from a raw file descriptor.
///
/// # Safety
///
/// The fd passed in must be an owned file descriptor; in particular, it must be open.
unsafe fn from_raw_fd(fd: RawFd) -> Self {
PipeFd { file: File::from_raw_fd(fd) }
}
}
impl Read for PipeFd {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.file.read(buf)
}
fn read_to_end(&mut self, buf: &mut Vec<u8>) -> io::Result<usize> {
self.file.read_to_end(buf)
}
}
impl Write for PipeFd {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.file.write(buf)
}
fn flush(&mut self) -> io::Result<()> {
self.file.flush()
}
fn write_all(&mut self, buf: &[u8]) -> io::Result<()> {
self.file.write_all(buf)
}
}
/// Some allowed overhead that we account for in the "CPU time monitor" thread's sleeps, on the
/// child process.
pub const JOB_TIMEOUT_OVERHEAD: Duration = Duration::from_millis(50);
@@ -192,14 +290,12 @@ impl fmt::Display for WorkerKind {
}
}
// Some fields are only used for logging, and dead-code analysis ignores Debug.
#[allow(dead_code)]
#[derive(Debug)]
pub struct WorkerInfo {
pid: u32,
kind: WorkerKind,
version: Option<String>,
worker_dir_path: PathBuf,
pub pid: u32,
pub kind: WorkerKind,
pub version: Option<String>,
pub worker_dir_path: PathBuf,
}
// NOTE: The worker version must be passed in so that we accurately get the version of the worker,
@@ -218,7 +314,7 @@ pub fn run_worker<F>(
worker_version: Option<&str>,
mut event_loop: F,
) where
F: FnMut(UnixStream, PathBuf) -> io::Result<Never>,
F: FnMut(UnixStream, &WorkerInfo, SecurityStatus) -> io::Result<Never>,
{
#[cfg_attr(not(target_os = "linux"), allow(unused_mut))]
let mut worker_info = WorkerInfo {
@@ -250,11 +346,8 @@ pub fn run_worker<F>(
}
// Make sure that we can read the worker dir path, and log its contents.
let entries = || -> Result<Vec<_>, io::Error> {
std::fs::read_dir(&worker_info.worker_dir_path)?
.map(|res| res.map(|e| e.file_name()))
.collect()
}();
let entries: io::Result<Vec<_>> = std::fs::read_dir(&worker_info.worker_dir_path)
.and_then(|d| d.map(|res| res.map(|e| e.file_name())).collect());
match entries {
Ok(entries) =>
gum::trace!(target: LOG_TARGET, ?worker_info, "content of worker dir: {:?}", entries),
@@ -284,6 +377,22 @@ pub fn run_worker<F>(
{
gum::trace!(target: LOG_TARGET, ?security_status, "Enabling security features");
// First, make sure env vars were cleared, to match the environment we perform the checks
// within. (In theory, running checks with different env vars could result in different
// outcomes of the checks.)
if !security::check_env_vars_were_cleared(&worker_info) {
let err = "not all env vars were cleared when spawning the process";
gum::error!(
target: LOG_TARGET,
?worker_info,
"{}",
err
);
if security_status.secure_validator_mode {
worker_shutdown(worker_info, err);
}
}
// Call based on whether we can change root. Error out if it should work but fails.
//
// NOTE: This should not be called in a multi-threaded context (i.e. inside the tokio
@@ -337,23 +446,10 @@ pub fn run_worker<F>(
}
}
}
if !security::check_env_vars_were_cleared(&worker_info) {
let err = "not all env vars were cleared when spawning the process";
gum::error!(
target: LOG_TARGET,
?worker_info,
"{}",
err
);
if security_status.secure_validator_mode {
worker_shutdown(worker_info, err);
}
}
}
// Run the main worker loop.
let err = event_loop(stream, worker_info.worker_dir_path.clone())
let err = event_loop(stream, &worker_info, security_status)
// It's never `Ok` because it's `Ok(Never)`.
.unwrap_err();
@@ -54,8 +54,7 @@ pub fn enable_for_worker(worker_info: &WorkerInfo) -> Result<()> {
///
/// NOTE: This should not be called in a multi-threaded context. `unshare(2)`:
/// "CLONE_NEWUSER requires that the calling process is not threaded."
#[cfg(target_os = "linux")]
pub fn check_is_fully_enabled(tempdir: &Path) -> Result<()> {
pub fn check_can_fully_enable(tempdir: &Path) -> Result<()> {
let worker_dir_path = tempdir.to_owned();
try_restrict(&WorkerInfo {
pid: std::process::id(),
@@ -69,7 +68,6 @@ pub fn check_is_fully_enabled(tempdir: &Path) -> Result<()> {
///
/// NOTE: This should not be called in a multi-threaded context. `unshare(2)`:
/// "CLONE_NEWUSER requires that the calling process is not threaded."
#[cfg(target_os = "linux")]
fn try_restrict(worker_info: &WorkerInfo) -> Result<()> {
// TODO: Remove this once this is stable: https://github.com/rust-lang/rust/issues/105723
macro_rules! cstr_ptr {
@@ -78,12 +76,6 @@ fn try_restrict(worker_info: &WorkerInfo) -> Result<()> {
};
}
gum::trace!(
target: LOG_TARGET,
?worker_info,
"unsharing the user namespace and calling pivot_root",
);
let worker_dir_path_c = CString::new(worker_info.worker_dir_path.as_os_str().as_bytes())
.expect("on unix; the path will never contain 0 bytes; qed");
@@ -0,0 +1,93 @@
// Copyright (C) Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! Functionality for securing the job processes spawned by the workers using `clone`. If
//! unsupported, falls back to `fork`.
use crate::{worker::WorkerInfo, LOG_TARGET};
use nix::{
errno::Errno,
sched::{CloneCb, CloneFlags},
unistd::Pid,
};
#[derive(thiserror::Error, Debug)]
pub enum Error {
#[error("could not clone, errno: {0}")]
Clone(Errno),
}
pub type Result<T> = std::result::Result<T, Error>;
/// Try to run clone(2) on the current worker.
///
/// SAFETY: new process should be either spawned within a single threaded process, or use only
/// async-signal-safe functions.
pub unsafe fn clone_on_worker(
worker_info: &WorkerInfo,
have_unshare_newuser: bool,
cb: CloneCb,
) -> Result<Pid> {
let flags = clone_flags(have_unshare_newuser);
gum::trace!(
target: LOG_TARGET,
?worker_info,
"calling clone with flags: {:?}",
flags
);
try_clone(cb, flags)
}
/// Runs a check for clone(2) with all sandboxing flags and returns an error indicating whether it
/// can be fully enabled on the current Linux environment.
///
/// SAFETY: new process should be either spawned within a single threaded process, or use only
/// async-signal-safe functions.
pub unsafe fn check_can_fully_clone() -> Result<()> {
try_clone(Box::new(|| 0), clone_flags(false)).map(|_pid| ())
}
/// Runs clone(2) with all sandboxing flags.
///
/// SAFETY: new process should be either spawned within a single threaded process, or use only
/// async-signal-safe functions.
unsafe fn try_clone(cb: CloneCb, flags: CloneFlags) -> Result<Pid> {
let mut stack = [0u8; 2 * 1024 * 1024];
nix::sched::clone(cb, stack.as_mut_slice(), flags, None).map_err(|errno| Error::Clone(errno))
}
/// Returns flags for `clone(2)`, including all the sandbox-related ones.
fn clone_flags(have_unshare_newuser: bool) -> CloneFlags {
// NOTE: CLONE_NEWUSER does not work in `clone` if we previously called `unshare` with this
// flag. On the other hand, if we did not call `unshare` we need this flag for the CAP_SYS_ADMIN
// capability.
let maybe_clone_newuser =
if have_unshare_newuser { CloneFlags::empty() } else { CloneFlags::CLONE_NEWUSER };
// SIGCHLD flag is used to inform clone that the parent process is
// expecting a child termination signal, without this flag `waitpid` function
// return `ECHILD` error.
maybe_clone_newuser |
CloneFlags::CLONE_NEWCGROUP |
CloneFlags::CLONE_NEWIPC |
CloneFlags::CLONE_NEWNET |
CloneFlags::CLONE_NEWNS |
CloneFlags::CLONE_NEWPID |
CloneFlags::CLONE_NEWUTS |
CloneFlags::from_bits_retain(libc::SIGCHLD)
}
@@ -112,7 +112,7 @@ pub fn enable_for_worker(worker_info: &WorkerInfo) -> Result<()> {
// TODO: <https://github.com/landlock-lsm/rust-landlock/issues/36>
/// Runs a check for landlock in its own thread, and returns an error indicating whether the given
/// landlock ABI is fully enabled on the current Linux environment.
pub fn check_is_fully_enabled() -> Result<()> {
pub fn check_can_fully_enable() -> Result<()> {
match std::thread::spawn(|| try_restrict(std::iter::empty::<(PathBuf, AccessFs)>())).join() {
Ok(Ok(())) => Ok(()),
Ok(Err(err)) => Err(err),
@@ -165,7 +165,7 @@ mod tests {
#[test]
fn restricted_thread_cannot_read_file() {
// TODO: This would be nice: <https://github.com/rust-lang/rust/issues/68007>.
if check_is_fully_enabled().is_err() {
if check_can_fully_enable().is_err() {
return
}
@@ -230,7 +230,7 @@ mod tests {
#[test]
fn restricted_thread_cannot_write_file() {
// TODO: This would be nice: <https://github.com/rust-lang/rust/issues/68007>.
if check_is_fully_enabled().is_err() {
if check_can_fully_enable().is_err() {
return
}
@@ -289,7 +289,7 @@ mod tests {
#[test]
fn restricted_thread_can_truncate_file() {
// TODO: This would be nice: <https://github.com/rust-lang/rust/issues/68007>.
if check_is_fully_enabled().is_err() {
if check_can_fully_enable().is_err() {
return
}
@@ -27,15 +27,17 @@
//! - Restrict networking by blocking socket creation and io_uring.
//! - Remove env vars
use crate::{worker::WorkerInfo, LOG_TARGET};
#[cfg(target_os = "linux")]
pub mod change_root;
#[cfg(target_os = "linux")]
pub mod clone;
#[cfg(target_os = "linux")]
pub mod landlock;
#[cfg(all(target_os = "linux", target_arch = "x86_64"))]
pub mod seccomp;
use crate::{worker::WorkerInfo, LOG_TARGET};
/// Require env vars to have been removed when spawning the process, to prevent malicious code from
/// accessing them.
pub fn check_env_vars_were_cleared(worker_info: &WorkerInfo) -> bool {
@@ -110,7 +110,7 @@ pub fn enable_for_worker(worker_info: &WorkerInfo) -> Result<()> {
/// Runs a check for seccomp in its own thread, and returns an error indicating whether seccomp with
/// our rules is fully enabled on the current Linux environment.
pub fn check_is_fully_enabled() -> Result<()> {
pub fn check_can_fully_enable() -> Result<()> {
match std::thread::spawn(|| try_restrict()).join() {
Ok(Ok(())) => Ok(()),
Ok(Err(err)) => Err(err),
@@ -161,7 +161,7 @@ mod tests {
#[test]
fn sandboxed_thread_cannot_use_sockets() {
// TODO: This would be nice: <https://github.com/rust-lang/rust/issues/68007>.
if check_is_fully_enabled().is_err() {
if check_can_fully_enable().is_err() {
return
}
@@ -12,8 +12,8 @@ workspace = true
[dependencies]
cpu-time = "1.0.0"
gum = { package = "tracing-gum", path = "../../../gum" }
os_pipe = "1.1.4"
nix = { version = "0.27.1", features = ["process", "resource"] }
cfg-if = "1.0"
nix = { version = "0.27.1", features = ["process", "resource", "sched"] }
libc = "0.2.152"
parity-scale-codec = { version = "3.6.1", default-features = false, features = ["derive"] }
+254 -126
View File
@@ -16,7 +16,7 @@
//! Contains the logic for executing PVFs. Used by the polkadot-execute-worker binary.
pub use polkadot_node_core_pvf_common::{executor_interface::execute_artifact, worker_dir};
pub use polkadot_node_core_pvf_common::executor_interface::execute_artifact;
// NOTE: Initializing logging in e.g. tests will not have an effect in the workers, as they are
// separate spawned processes. Run with e.g. `RUST_LOG=parachain::pvf-execute-worker=trace`.
@@ -31,64 +31,41 @@ use nix::{
},
unistd::{ForkResult, Pid},
};
use os_pipe::{self, PipeReader, PipeWriter};
use parity_scale_codec::{Decode, Encode};
use polkadot_node_core_pvf_common::{
error::InternalValidationError,
execute::{Handshake, JobError, JobResponse, JobResult, WorkerResponse},
executor_interface::params_to_wasmtime_semantics,
framed_recv_blocking, framed_send_blocking,
worker::{
cpu_time_monitor_loop, run_worker, stringify_panic_payload,
cpu_time_monitor_loop, pipe2_cloexec, run_worker, stringify_panic_payload,
thread::{self, WaitOutcome},
WorkerKind,
PipeFd, WorkerInfo, WorkerKind,
},
worker_dir,
};
use polkadot_parachain_primitives::primitives::ValidationResult;
use polkadot_primitives::{executor_params::DEFAULT_NATIVE_STACK_MAX, ExecutorParams};
use polkadot_primitives::ExecutorParams;
use std::{
io::{self, Read},
os::unix::net::UnixStream,
os::{
fd::{AsRawFd, FromRawFd},
unix::net::UnixStream,
},
path::PathBuf,
process,
sync::{mpsc::channel, Arc},
time::Duration,
};
// Wasmtime powers the Substrate Executor. It compiles the wasm bytecode into native code.
// That native code does not create any stacks and just reuses the stack of the thread that
// wasmtime was invoked from.
//
// Also, we configure the executor to provide the deterministic stack and that requires
// supplying the amount of the native stack space that wasm is allowed to use. This is
// realized by supplying the limit into `wasmtime::Config::max_wasm_stack`.
//
// There are quirks to that configuration knob:
//
// 1. It only limits the amount of stack space consumed by wasm but does not ensure nor check that
// the stack space is actually available.
//
// That means, if the calling thread has 1 MiB of stack space left and the wasm code consumes
// more, then the wasmtime limit will **not** trigger. Instead, the wasm code will hit the
// guard page and the Rust stack overflow handler will be triggered. That leads to an
// **abort**.
//
// 2. It cannot and does not limit the stack space consumed by Rust code.
//
// Meaning that if the wasm code leaves no stack space for Rust code, then the Rust code
// will abort and that will abort the process as well.
//
// Typically on Linux the main thread gets the stack size specified by the `ulimit` and
// typically it's configured to 8 MiB. Rust's spawned threads are 2 MiB. OTOH, the
// DEFAULT_NATIVE_STACK_MAX is set to 256 MiB. Not nearly enough.
//
// Hence we need to increase it. The simplest way to fix that is to spawn a thread with the desired
// stack limit.
//
// The reasoning why we pick this particular size is:
//
// The default Rust thread stack limit 2 MiB + 256 MiB wasm stack.
/// The stack size for the execute thread.
pub const EXECUTE_THREAD_STACK_SIZE: usize = 2 * 1024 * 1024 + DEFAULT_NATIVE_STACK_MAX as usize;
/// The number of threads for the child process:
/// 1 - Main thread
/// 2 - Cpu monitor thread
/// 3 - Execute thread
///
/// NOTE: The correctness of this value is enforced by a test. If the number of threads inside
/// the child process changes in the future, this value must be changed as well.
pub const EXECUTE_WORKER_THREAD_NUMBER: u32 = 3;
/// Receives a handshake with information specific to the execute worker.
fn recv_execute_handshake(stream: &mut UnixStream) -> io::Result<Handshake> {
@@ -145,17 +122,20 @@ pub fn worker_entrypoint(
worker_dir_path,
node_version,
worker_version,
|mut stream, worker_dir_path| {
let worker_pid = process::id();
let artifact_path = worker_dir::execute_artifact(&worker_dir_path);
|mut stream, worker_info, security_status| {
let artifact_path = worker_dir::execute_artifact(&worker_info.worker_dir_path);
let Handshake { executor_params } = recv_execute_handshake(&mut stream)?;
let executor_params: Arc<ExecutorParams> = Arc::new(executor_params);
let execute_thread_stack_size = max_stack_size(&executor_params);
loop {
let (params, execution_timeout) = recv_request(&mut stream)?;
gum::debug!(
target: LOG_TARGET,
%worker_pid,
?worker_info,
?security_status,
"worker: validating artifact {}",
artifact_path.display(),
);
@@ -172,7 +152,7 @@ pub fn worker_entrypoint(
},
};
let (pipe_reader, pipe_writer) = os_pipe::pipe()?;
let (pipe_read_fd, pipe_write_fd) = pipe2_cloexec()?;
let usage_before = match nix::sys::resource::getrusage(UsageWho::RUSAGE_CHILDREN) {
Ok(usage) => usage,
@@ -182,44 +162,65 @@ pub fn worker_entrypoint(
continue
},
};
let stream_fd = stream.as_raw_fd();
// SAFETY: new process is spawned within a single threaded process. This invariant
// is enforced by tests.
let response = match unsafe { nix::unistd::fork() } {
Err(errno) => internal_error_from_errno("fork", errno),
Ok(ForkResult::Child) => {
// Dropping the stream closes the underlying socket. We want to make sure
// that the sandboxed child can't get any kind of information from the
// outside world. The only IPC it should be able to do is sending its
// response over the pipe.
drop(stream);
// Drop the read end so we don't have too many FDs open.
drop(pipe_reader);
let compiled_artifact_blob = Arc::new(compiled_artifact_blob);
let params = Arc::new(params);
handle_child_process(
pipe_writer,
compiled_artifact_blob,
executor_params,
params,
cfg_if::cfg_if! {
if #[cfg(target_os = "linux")] {
let result = if security_status.can_do_secure_clone {
handle_clone(
pipe_write_fd,
pipe_read_fd,
stream_fd,
&compiled_artifact_blob,
&executor_params,
&params,
execution_timeout,
execute_thread_stack_size,
worker_info,
security_status.can_unshare_user_namespace_and_change_root,
usage_before,
)?
} else {
// Fall back to using fork.
handle_fork(
pipe_write_fd,
pipe_read_fd,
stream_fd,
&compiled_artifact_blob,
&executor_params,
&params,
execution_timeout,
execute_thread_stack_size,
worker_info,
usage_before,
)?
};
} else {
let result = handle_fork(
pipe_write_fd,
pipe_read_fd,
stream_fd,
&compiled_artifact_blob,
&executor_params,
&params,
execution_timeout,
)
},
Ok(ForkResult::Parent { child }) => {
// the read end will wait until all write ends have been closed,
// this drop is necessary to avoid deadlock
drop(pipe_writer);
handle_parent_process(
pipe_reader,
child,
worker_pid,
execute_thread_stack_size,
worker_info,
usage_before,
execution_timeout,
)?
},
};
)?;
}
}
send_response(&mut stream, response)?;
gum::trace!(
target: LOG_TARGET,
?worker_info,
"worker: sending result to host: {:?}",
result
);
send_response(&mut stream, result)?;
}
},
);
@@ -252,39 +253,122 @@ fn validate_using_artifact(
JobResponse::Ok { result_descriptor }
}
#[cfg(target_os = "linux")]
fn handle_clone(
pipe_write_fd: i32,
pipe_read_fd: i32,
stream_fd: i32,
compiled_artifact_blob: &Arc<Vec<u8>>,
executor_params: &Arc<ExecutorParams>,
params: &Arc<Vec<u8>>,
execution_timeout: Duration,
execute_stack_size: usize,
worker_info: &WorkerInfo,
have_unshare_newuser: bool,
usage_before: Usage,
) -> io::Result<WorkerResponse> {
use polkadot_node_core_pvf_common::worker::security;
// SAFETY: new process is spawned within a single threaded process. This invariant
// is enforced by tests. Stack size being specified to ensure child doesn't overflow
match unsafe {
security::clone::clone_on_worker(
worker_info,
have_unshare_newuser,
Box::new(|| {
handle_child_process(
pipe_write_fd,
pipe_read_fd,
stream_fd,
Arc::clone(compiled_artifact_blob),
Arc::clone(executor_params),
Arc::clone(params),
execution_timeout,
execute_stack_size,
)
}),
)
} {
Ok(child) => handle_parent_process(
pipe_read_fd,
pipe_write_fd,
worker_info,
child,
usage_before,
execution_timeout,
),
Err(security::clone::Error::Clone(errno)) => Ok(internal_error_from_errno("clone", errno)),
}
}
fn handle_fork(
pipe_write_fd: i32,
pipe_read_fd: i32,
stream_fd: i32,
compiled_artifact_blob: &Arc<Vec<u8>>,
executor_params: &Arc<ExecutorParams>,
params: &Arc<Vec<u8>>,
execution_timeout: Duration,
execute_worker_stack_size: usize,
worker_info: &WorkerInfo,
usage_before: Usage,
) -> io::Result<WorkerResponse> {
// SAFETY: new process is spawned within a single threaded process. This invariant
// is enforced by tests.
match unsafe { nix::unistd::fork() } {
Ok(ForkResult::Child) => handle_child_process(
pipe_write_fd,
pipe_read_fd,
stream_fd,
Arc::clone(compiled_artifact_blob),
Arc::clone(executor_params),
Arc::clone(params),
execution_timeout,
execute_worker_stack_size,
),
Ok(ForkResult::Parent { child }) => handle_parent_process(
pipe_read_fd,
pipe_write_fd,
worker_info,
child,
usage_before,
execution_timeout,
),
Err(errno) => Ok(internal_error_from_errno("fork", errno)),
}
}
/// This is used to handle child process during pvf execute worker.
/// It execute the artifact and pipes back the response to the parent process
///
/// # Arguments
///
/// - `pipe_write`: A `PipeWriter` structure, the writing end of a pipe.
///
/// - `compiled_artifact_blob`: The artifact bytes from compiled by the prepare worker`.
///
/// - `executor_params`: Deterministically serialized execution environment semantics.
///
/// - `params`: Validation parameters.
///
/// - `execution_timeout`: The timeout in `Duration`.
/// It executes the artifact and pipes back the response to the parent process.
///
/// # Returns
///
/// - pipe back `JobResponse` to the parent process.
fn handle_child_process(
mut pipe_write: PipeWriter,
compiled_artifact_blob: Vec<u8>,
executor_params: ExecutorParams,
params: Vec<u8>,
pipe_write_fd: i32,
pipe_read_fd: i32,
stream_fd: i32,
compiled_artifact_blob: Arc<Vec<u8>>,
executor_params: Arc<ExecutorParams>,
params: Arc<Vec<u8>>,
execution_timeout: Duration,
execute_thread_stack_size: usize,
) -> ! {
// Terminate if the parent thread dies. Parent thread == worker process (it is single-threaded).
//
// RACE: the worker may die before we install the death signal. In practice this is unlikely,
// and most of the time the job process should terminate on its own when it completes.
#[cfg(target_os = "linux")]
nix::sys::prctl::set_pdeathsig(nix::sys::signal::Signal::SIGTERM).unwrap_or_else(|err| {
send_child_response(&mut pipe_write, Err(JobError::CouldNotSetPdeathsig(err.to_string())))
});
// SAFETY: this is an open and owned file descriptor at this point.
let mut pipe_write = unsafe { PipeFd::from_raw_fd(pipe_write_fd) };
// Drop the read end so we don't have too many FDs open.
if let Err(errno) = nix::unistd::close(pipe_read_fd) {
send_child_response(&mut pipe_write, job_error_from_errno("closing pipe", errno));
}
// Dropping the stream closes the underlying socket. We want to make sure
// that the sandboxed child can't get any kind of information from the
// outside world. The only IPC it should be able to do is sending its
// response over the pipe.
if let Err(errno) = nix::unistd::close(stream_fd) {
send_child_response(&mut pipe_write, job_error_from_errno("closing stream", errno));
}
gum::debug!(
target: LOG_TARGET,
@@ -308,13 +392,12 @@ fn handle_child_process(
send_child_response(&mut pipe_write, Err(JobError::CouldNotSpawnThread(err.to_string())))
});
let executor_params_2 = executor_params.clone();
let execute_thread = thread::spawn_worker_thread_with_stack_size(
"execute thread",
move || validate_using_artifact(&compiled_artifact_blob, &executor_params_2, &params),
move || validate_using_artifact(&compiled_artifact_blob, &executor_params, &params),
Arc::clone(&condvar),
WaitOutcome::Finished,
EXECUTE_THREAD_STACK_SIZE,
execute_thread_stack_size,
)
.unwrap_or_else(|err| {
send_child_response(&mut pipe_write, Err(JobError::CouldNotSpawnThread(err.to_string())))
@@ -343,28 +426,69 @@ fn handle_child_process(
send_child_response(&mut pipe_write, response);
}
/// Returns stack size based on the number of threads.
/// The stack size is represented by 2MiB * number_of_threads + native stack;
///
/// # Background
///
/// Wasmtime powers the Substrate Executor. It compiles the wasm bytecode into native code.
/// That native code does not create any stacks and just reuses the stack of the thread that
/// wasmtime was invoked from.
///
/// Also, we configure the executor to provide the deterministic stack and that requires
/// supplying the amount of the native stack space that wasm is allowed to use. This is
/// realized by supplying the limit into `wasmtime::Config::max_wasm_stack`.
///
/// There are quirks to that configuration knob:
///
/// 1. It only limits the amount of stack space consumed by wasm but does not ensure nor check that
/// the stack space is actually available.
///
/// That means, if the calling thread has 1 MiB of stack space left and the wasm code consumes
/// more, then the wasmtime limit will **not** trigger. Instead, the wasm code will hit the
/// guard page and the Rust stack overflow handler will be triggered. That leads to an
/// **abort**.
///
/// 2. It cannot and does not limit the stack space consumed by Rust code.
///
/// Meaning that if the wasm code leaves no stack space for Rust code, then the Rust code
/// will abort and that will abort the process as well.
///
/// Typically on Linux the main thread gets the stack size specified by the `ulimit` and
/// typically it's configured to 8 MiB. Rust's spawned threads are 2 MiB. OTOH, the
/// DEFAULT_NATIVE_STACK_MAX is set to 256 MiB. Not nearly enough.
///
/// Hence we need to increase it. The simplest way to fix that is to spawn an execute thread with
/// the desired stack limit. We must also make sure the job process has enough stack for *all* its
/// threads. This function can be used to get the stack size of either the execute thread or execute
/// job process.
fn max_stack_size(executor_params: &ExecutorParams) -> usize {
let (_sem, deterministic_stack_limit) = params_to_wasmtime_semantics(executor_params);
return (2 * 1024 * 1024 + deterministic_stack_limit.native_stack_max) as usize;
}
/// Waits for child process to finish and handle child response from pipe.
///
/// # Arguments
///
/// - `pipe_read`: A `PipeReader` used to read data from the child process.
///
/// - `child`: The child pid.
///
/// - `usage_before`: Resource usage statistics before executing the child process.
///
/// - `timeout`: The maximum allowed time for the child process to finish, in `Duration`.
///
/// # Returns
///
/// - The response, either `Ok` or some error state.
fn handle_parent_process(
mut pipe_read: PipeReader,
pipe_read_fd: i32,
pipe_write_fd: i32,
worker_info: &WorkerInfo,
job_pid: Pid,
worker_pid: u32,
usage_before: Usage,
timeout: Duration,
) -> io::Result<WorkerResponse> {
// the read end will wait until all write ends have been closed,
// this drop is necessary to avoid deadlock
if let Err(errno) = nix::unistd::close(pipe_write_fd) {
return Ok(internal_error_from_errno("closing pipe write fd", errno));
};
// SAFETY: pipe_read_fd is an open and owned file descriptor at this point.
let mut pipe_read = unsafe { PipeFd::from_raw_fd(pipe_read_fd) };
// Read from the child. Don't decode unless the process exited normally, which we check later.
let mut received_data = Vec::new();
pipe_read
@@ -376,7 +500,7 @@ fn handle_parent_process(
let status = nix::sys::wait::waitpid(job_pid, None);
gum::trace!(
target: LOG_TARGET,
%worker_pid,
?worker_info,
%job_pid,
"execute worker received wait status from job: {:?}",
status,
@@ -396,7 +520,7 @@ fn handle_parent_process(
if cpu_tv >= timeout {
gum::warn!(
target: LOG_TARGET,
%worker_pid,
?worker_info,
%job_pid,
"execute job took {}ms cpu time, exceeded execute timeout {}ms",
cpu_tv.as_millis(),
@@ -429,7 +553,7 @@ fn handle_parent_process(
Err(job_error) => {
gum::warn!(
target: LOG_TARGET,
%worker_pid,
?worker_info,
%job_pid,
"execute job error: {}",
job_error,
@@ -490,14 +614,14 @@ fn recv_child_response(received_data: &mut io::BufReader<&[u8]>) -> io::Result<J
})
}
/// Write response to the pipe and exit process after.
/// Write a job response to the pipe and exit process after.
///
/// # Arguments
///
/// - `pipe_write`: A `PipeWriter` structure, the writing end of a pipe.
/// - `pipe_write`: A `PipeFd` structure, the writing end of a pipe.
///
/// - `response`: Child process response, or error.
fn send_child_response(pipe_write: &mut PipeWriter, response: JobResult) -> ! {
/// - `response`: Child process response
fn send_child_response(pipe_write: &mut PipeFd, response: JobResult) -> ! {
framed_send_blocking(pipe_write, response.encode().as_slice())
.unwrap_or_else(|_| process::exit(libc::EXIT_FAILURE));
@@ -516,3 +640,7 @@ fn internal_error_from_errno(context: &'static str, errno: Errno) -> WorkerRespo
io::Error::last_os_error()
)))
}
fn job_error_from_errno(context: &'static str, errno: Errno) -> JobResult {
Err(JobError::Kernel(format!("{}: {}: {}", context, errno, io::Error::last_os_error())))
}
@@ -18,8 +18,7 @@ rayon = "1.5.1"
tracking-allocator = { package = "staging-tracking-allocator", path = "../../../tracking-allocator" }
tikv-jemalloc-ctl = { version = "0.5.0", optional = true }
tikv-jemallocator = { version = "0.5.0", optional = true }
os_pipe = "1.1.4"
nix = { version = "0.27.1", features = ["process", "resource"] }
nix = { version = "0.27.1", features = ["process", "resource", "sched"] }
parity-scale-codec = { version = "3.6.1", default-features = false, features = ["derive"] }
+196 -88
View File
@@ -18,8 +18,6 @@
mod memory_stats;
use polkadot_node_core_pvf_common::executor_interface::{prepare, prevalidate};
// NOTE: Initializing logging in e.g. tests will not have an effect in the workers, as they are
// separate spawned processes. Run with e.g. `RUST_LOG=parachain::pvf-prepare-worker=trace`.
const LOG_TARGET: &str = "parachain::pvf-prepare-worker";
@@ -37,7 +35,11 @@ use nix::{
},
unistd::{ForkResult, Pid},
};
use os_pipe::{self, PipeReader, PipeWriter};
use polkadot_node_core_pvf_common::{
executor_interface::{prepare, prevalidate},
worker::{pipe2_cloexec, PipeFd, WorkerInfo},
};
use parity_scale_codec::{Decode, Encode};
use polkadot_node_core_pvf_common::{
error::{PrepareError, PrepareWorkerResult},
@@ -57,10 +59,10 @@ use std::{
fs,
io::{self, Read},
os::{
fd::{AsRawFd, RawFd},
fd::{AsRawFd, FromRawFd, RawFd},
unix::net::UnixStream,
},
path::PathBuf,
path::{Path, PathBuf},
process,
sync::{mpsc::channel, Arc},
time::Duration,
@@ -76,6 +78,16 @@ static ALLOC: TrackingAllocator<tikv_jemallocator::Jemalloc> =
#[global_allocator]
static ALLOC: TrackingAllocator<std::alloc::System> = TrackingAllocator(std::alloc::System);
/// The number of threads for the child process:
/// 1 - Main thread
/// 2 - Cpu monitor thread
/// 3 - Memory tracker thread
/// 4 - Prepare thread
///
/// NOTE: The correctness of this value is enforced by a test. If the number of threads inside
/// the child process changes in the future, this value must be changed as well.
pub const PREPARE_WORKER_THREAD_NUMBER: u32 = 4;
/// Contains the bytes for a successfully compiled artifact.
#[derive(Encode, Decode)]
pub struct CompiledArtifact(Vec<u8>);
@@ -200,15 +212,15 @@ pub fn worker_entrypoint(
worker_dir_path,
node_version,
worker_version,
|mut stream, worker_dir_path| {
let worker_pid = process::id();
let temp_artifact_dest = worker_dir::prepare_tmp_artifact(&worker_dir_path);
|mut stream, worker_info, security_status| {
let temp_artifact_dest = worker_dir::prepare_tmp_artifact(&worker_info.worker_dir_path);
loop {
let pvf = recv_request(&mut stream)?;
gum::debug!(
target: LOG_TARGET,
%worker_pid,
?worker_info,
?security_status,
"worker: preparing artifact",
);
@@ -216,7 +228,7 @@ pub fn worker_entrypoint(
let prepare_job_kind = pvf.prep_kind();
let executor_params = pvf.executor_params();
let (pipe_reader, pipe_writer) = os_pipe::pipe()?;
let (pipe_read_fd, pipe_write_fd) = pipe2_cloexec()?;
let usage_before = match nix::sys::resource::getrusage(UsageWho::RUSAGE_CHILDREN) {
Ok(usage) => usage,
@@ -227,46 +239,58 @@ pub fn worker_entrypoint(
},
};
// SAFETY: new process is spawned within a single threaded process. This invariant
// is enforced by tests.
let result = match unsafe { nix::unistd::fork() } {
Err(errno) => Err(error_from_errno("fork", errno)),
Ok(ForkResult::Child) => {
// Dropping the stream closes the underlying socket. We want to make sure
// that the sandboxed child can't get any kind of information from the
// outside world. The only IPC it should be able to do is sending its
// response over the pipe.
drop(stream);
// Drop the read end so we don't have too many FDs open.
drop(pipe_reader);
let stream_fd = stream.as_raw_fd();
handle_child_process(
pvf,
pipe_writer,
cfg_if::cfg_if! {
if #[cfg(target_os = "linux")] {
let result = if security_status.can_do_secure_clone {
handle_clone(
&pvf,
pipe_write_fd,
pipe_read_fd,
stream_fd,
preparation_timeout,
prepare_job_kind,
&executor_params,
worker_info,
security_status.can_unshare_user_namespace_and_change_root,
&temp_artifact_dest,
usage_before,
)
} else {
// Fall back to using fork.
handle_fork(
&pvf,
pipe_write_fd,
pipe_read_fd,
stream_fd,
preparation_timeout,
prepare_job_kind,
&executor_params,
worker_info,
&temp_artifact_dest,
usage_before,
)
};
} else {
let result = handle_fork(
&pvf,
pipe_write_fd,
pipe_read_fd,
stream_fd,
preparation_timeout,
prepare_job_kind,
executor_params,
)
},
Ok(ForkResult::Parent { child }) => {
// the read end will wait until all write ends have been closed,
// this drop is necessary to avoid deadlock
drop(pipe_writer);
handle_parent_process(
pipe_reader,
worker_pid,
child,
temp_artifact_dest.clone(),
&executor_params,
worker_info,
&temp_artifact_dest,
usage_before,
preparation_timeout,
)
},
};
);
}
}
gum::trace!(
target: LOG_TARGET,
%worker_pid,
?worker_info,
"worker: sending result to host: {:?}",
result
);
@@ -306,21 +330,94 @@ struct JobResponse {
memory_stats: MemoryStats,
}
#[cfg(target_os = "linux")]
fn handle_clone(
pvf: &PvfPrepData,
pipe_write_fd: i32,
pipe_read_fd: i32,
stream_fd: i32,
preparation_timeout: Duration,
prepare_job_kind: PrepareJobKind,
executor_params: &Arc<ExecutorParams>,
worker_info: &WorkerInfo,
have_unshare_newuser: bool,
temp_artifact_dest: &Path,
usage_before: Usage,
) -> Result<PrepareWorkerSuccess, PrepareError> {
use polkadot_node_core_pvf_common::worker::security;
// SAFETY: new process is spawned within a single threaded process. This invariant
// is enforced by tests. Stack size being specified to ensure child doesn't overflow
match unsafe {
security::clone::clone_on_worker(
worker_info,
have_unshare_newuser,
Box::new(|| {
handle_child_process(
pvf.clone(),
pipe_write_fd,
pipe_read_fd,
stream_fd,
preparation_timeout,
prepare_job_kind,
Arc::clone(&executor_params),
)
}),
)
} {
Ok(child) => handle_parent_process(
pipe_read_fd,
pipe_write_fd,
worker_info,
child,
temp_artifact_dest,
usage_before,
preparation_timeout,
),
Err(security::clone::Error::Clone(errno)) => Err(error_from_errno("clone", errno)),
}
}
fn handle_fork(
pvf: &PvfPrepData,
pipe_write_fd: i32,
pipe_read_fd: i32,
stream_fd: i32,
preparation_timeout: Duration,
prepare_job_kind: PrepareJobKind,
executor_params: &Arc<ExecutorParams>,
worker_info: &WorkerInfo,
temp_artifact_dest: &Path,
usage_before: Usage,
) -> Result<PrepareWorkerSuccess, PrepareError> {
// SAFETY: new process is spawned within a single threaded process. This invariant
// is enforced by tests.
match unsafe { nix::unistd::fork() } {
Ok(ForkResult::Child) => handle_child_process(
pvf.clone(),
pipe_write_fd,
pipe_read_fd,
stream_fd,
preparation_timeout,
prepare_job_kind,
Arc::clone(executor_params),
),
Ok(ForkResult::Parent { child }) => handle_parent_process(
pipe_read_fd,
pipe_write_fd,
worker_info,
child,
temp_artifact_dest,
usage_before,
preparation_timeout,
),
Err(errno) => Err(error_from_errno("fork", errno)),
}
}
/// This is used to handle child process during pvf prepare worker.
/// It prepares the artifact and tracks memory stats during preparation
/// and pipes back the response to the parent process
///
/// # Arguments
///
/// - `pvf`: `PvfPrepData` structure, containing data to prepare the artifact
///
/// - `pipe_write`: A `PipeWriter` structure, the writing end of a pipe.
///
/// - `preparation_timeout`: The timeout in `Duration`.
///
/// - `prepare_job_kind`: The kind of prepare job.
///
/// - `executor_params`: Deterministically serialized execution environment semantics.
/// and pipes back the response to the parent process.
///
/// # Returns
///
@@ -329,19 +426,34 @@ struct JobResponse {
/// - If success, pipe back `JobResponse`.
fn handle_child_process(
pvf: PvfPrepData,
mut pipe_write: PipeWriter,
pipe_write_fd: i32,
pipe_read_fd: i32,
stream_fd: i32,
preparation_timeout: Duration,
prepare_job_kind: PrepareJobKind,
executor_params: Arc<ExecutorParams>,
) -> ! {
// Terminate if the parent thread dies. Parent thread == worker process (it is single-threaded).
//
// RACE: the worker may die before we install the death signal. In practice this is unlikely,
// and most of the time the job process should terminate on its own when it completes.
#[cfg(target_os = "linux")]
nix::sys::prctl::set_pdeathsig(nix::sys::signal::Signal::SIGTERM).unwrap_or_else(|err| {
send_child_response(&mut pipe_write, Err(PrepareError::IoErr(err.to_string())))
});
// SAFETY: pipe_writer is an open and owned file descriptor at this point.
let mut pipe_write = unsafe { PipeFd::from_raw_fd(pipe_write_fd) };
// Drop the read end so we don't have too many FDs open.
if let Err(errno) = nix::unistd::close(pipe_read_fd) {
send_child_response(
&mut pipe_write,
JobResult::Err(error_from_errno("closing pipe", errno)),
);
}
// Dropping the stream closes the underlying socket. We want to make sure
// that the sandboxed child can't get any kind of information from the
// outside world. The only IPC it should be able to do is sending its
// response over the pipe.
if let Err(errno) = nix::unistd::close(stream_fd) {
send_child_response(
&mut pipe_write,
JobResult::Err(error_from_errno("error closing stream", errno)),
);
}
let worker_job_pid = process::id();
gum::debug!(
@@ -489,20 +601,6 @@ fn handle_child_process(
/// Waits for child process to finish and handle child response from pipe.
///
/// # Arguments
///
/// - `pipe_read`: A `PipeReader` used to read data from the child process.
///
/// - `child`: The child pid.
///
/// - `temp_artifact_dest`: The destination `PathBuf` to write the temporary artifact file.
///
/// - `worker_pid`: The PID of the child process.
///
/// - `usage_before`: Resource usage statistics before executing the child process.
///
/// - `timeout`: The maximum allowed time for the child process to finish, in `Duration`.
///
/// # Returns
///
/// - If the child send response without an error, this function returns `Ok(PrepareStats)`
@@ -512,13 +610,23 @@ fn handle_child_process(
///
/// - If the child process timeout, it returns `PrepareError::TimedOut`.
fn handle_parent_process(
mut pipe_read: PipeReader,
worker_pid: u32,
pipe_read_fd: i32,
pipe_write_fd: i32,
worker_info: &WorkerInfo,
job_pid: Pid,
temp_artifact_dest: PathBuf,
temp_artifact_dest: &Path,
usage_before: Usage,
timeout: Duration,
) -> Result<PrepareWorkerSuccess, PrepareError> {
// the read end will wait until all write ends have been closed,
// this drop is necessary to avoid deadlock
if let Err(errno) = nix::unistd::close(pipe_write_fd) {
return Err(error_from_errno("closing pipe write fd", errno));
};
// SAFETY: this is an open and owned file descriptor at this point.
let mut pipe_read = unsafe { PipeFd::from_raw_fd(pipe_read_fd) };
// Read from the child. Don't decode unless the process exited normally, which we check later.
let mut received_data = Vec::new();
pipe_read
@@ -528,7 +636,7 @@ fn handle_parent_process(
let status = nix::sys::wait::waitpid(job_pid, None);
gum::trace!(
target: LOG_TARGET,
%worker_pid,
?worker_info,
%job_pid,
"prepare worker received wait status from job: {:?}",
status,
@@ -546,7 +654,7 @@ fn handle_parent_process(
if cpu_tv >= timeout {
gum::warn!(
target: LOG_TARGET,
%worker_pid,
?worker_info,
%job_pid,
"prepare job took {}ms cpu time, exceeded prepare timeout {}ms",
cpu_tv.as_millis(),
@@ -581,13 +689,13 @@ fn handle_parent_process(
// success.
gum::debug!(
target: LOG_TARGET,
%worker_pid,
?worker_info,
%job_pid,
"worker: writing artifact to {}",
temp_artifact_dest.display(),
);
// Write to the temp file created by the host.
if let Err(err) = fs::write(&temp_artifact_dest, &artifact) {
if let Err(err) = fs::write(temp_artifact_dest, &artifact) {
return Err(PrepareError::IoErr(err.to_string()))
};
@@ -651,10 +759,10 @@ fn recv_child_response(received_data: &mut io::BufReader<&[u8]>) -> io::Result<J
///
/// # Arguments
///
/// - `pipe_write`: A `PipeWriter` structure, the writing end of a pipe.
/// - `pipe_write`: A `PipeFd` structure, the writing end of a pipe.
///
/// - `response`: Child process response
fn send_child_response(pipe_write: &mut PipeWriter, response: JobResult) -> ! {
fn send_child_response(pipe_write: &mut PipeFd, response: JobResult) -> ! {
framed_send_blocking(pipe_write, response.encode().as_slice())
.unwrap_or_else(|_| process::exit(libc::EXIT_FAILURE));
+1 -1
View File
@@ -906,7 +906,7 @@ pub(crate) mod tests {
let _ = pulse.next().await.unwrap();
let el = start.elapsed().as_millis();
assert!(el > 50 && el < 150, "{}", el);
assert!(el > 50 && el < 150, "pulse duration: {}", el);
}
}
+130 -96
View File
@@ -32,14 +32,20 @@ use std::{fmt, path::Path};
pub async fn check_security_status(config: &Config) -> Result<SecurityStatus, String> {
let Config { prepare_worker_program_path, secure_validator_mode, cache_path, .. } = config;
let (landlock, seccomp, change_root) = join!(
let (landlock, seccomp, change_root, secure_clone) = join!(
check_landlock(prepare_worker_program_path),
check_seccomp(prepare_worker_program_path),
check_can_unshare_user_namespace_and_change_root(prepare_worker_program_path, cache_path)
check_can_unshare_user_namespace_and_change_root(prepare_worker_program_path, cache_path),
check_can_do_secure_clone(prepare_worker_program_path),
);
let full_security_status =
FullSecurityStatus::new(*secure_validator_mode, landlock, seccomp, change_root);
let full_security_status = FullSecurityStatus::new(
*secure_validator_mode,
landlock,
seccomp,
change_root,
secure_clone,
);
let security_status = full_security_status.as_partial();
if full_security_status.err_occurred() {
@@ -73,6 +79,7 @@ impl FullSecurityStatus {
landlock: SecureModeResult,
seccomp: SecureModeResult,
change_root: SecureModeResult,
secure_clone: SecureModeResult,
) -> Self {
Self {
partial: SecurityStatus {
@@ -80,8 +87,9 @@ impl FullSecurityStatus {
can_enable_landlock: landlock.is_ok(),
can_enable_seccomp: seccomp.is_ok(),
can_unshare_user_namespace_and_change_root: change_root.is_ok(),
can_do_secure_clone: secure_clone.is_ok(),
},
errs: [landlock, seccomp, change_root]
errs: [landlock, seccomp, change_root, secure_clone]
.into_iter()
.filter_map(|result| result.err())
.collect(),
@@ -120,9 +128,10 @@ type SecureModeResult = std::result::Result<(), SecureModeError>;
/// Errors related to enabling Secure Validator Mode.
#[derive(Debug)]
enum SecureModeError {
CannotEnableLandlock(String),
CannotEnableLandlock { err: String, abi: u8 },
CannotEnableSeccomp(String),
CannotUnshareUserNamespaceAndChangeRoot(String),
CannotDoSecureClone(String),
}
impl SecureModeError {
@@ -132,12 +141,16 @@ impl SecureModeError {
match self {
// Landlock is present on relatively recent Linuxes. This is optional if the unshare
// capability is present, providing FS sandboxing a different way.
CannotEnableLandlock(_) => security_status.can_unshare_user_namespace_and_change_root,
CannotEnableLandlock { .. } =>
security_status.can_unshare_user_namespace_and_change_root,
// seccomp should be present on all modern Linuxes unless it's been disabled.
CannotEnableSeccomp(_) => false,
// Should always be present on modern Linuxes. If not, Landlock also provides FS
// sandboxing, so don't enforce this.
CannotUnshareUserNamespaceAndChangeRoot(_) => security_status.can_enable_landlock,
// We have not determined the kernel requirements for this capability, and it's also not
// necessary for FS or networking restrictions.
CannotDoSecureClone(_) => true,
}
}
}
@@ -146,9 +159,10 @@ impl fmt::Display for SecureModeError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
use SecureModeError::*;
match self {
CannotEnableLandlock(err) => write!(f, "Cannot enable landlock, a Linux 5.13+ kernel security feature: {err}"),
CannotEnableLandlock{err, abi} => write!(f, "Cannot enable landlock (ABI {abi}), a Linux 5.13+ kernel security feature: {err}"),
CannotEnableSeccomp(err) => write!(f, "Cannot enable seccomp, a Linux-specific kernel security feature: {err}"),
CannotUnshareUserNamespaceAndChangeRoot(err) => write!(f, "Cannot unshare user namespace and change root, which are Linux-specific kernel security features: {err}"),
CannotDoSecureClone(err) => write!(f, "Cannot call clone with all sandboxing flags, a Linux-specific kernel security features: {err}"),
}
}
}
@@ -208,32 +222,11 @@ async fn check_can_unshare_user_namespace_and_change_root(
.map_err(|err| SecureModeError::CannotUnshareUserNamespaceAndChangeRoot(
format!("could not create a temporary directory in {:?}: {}", cache_path, err)
))?;
match tokio::process::Command::new(prepare_worker_program_path)
.arg("--check-can-unshare-user-namespace-and-change-root")
.arg(cache_dir_tempdir.path())
.output()
.await
{
Ok(output) if output.status.success() => Ok(()),
Ok(output) => {
let stderr = std::str::from_utf8(&output.stderr)
.expect("child process writes a UTF-8 string to stderr; qed")
.trim();
if stderr.is_empty() {
Err(SecureModeError::CannotUnshareUserNamespaceAndChangeRoot(
"not available".into()
))
} else {
Err(SecureModeError::CannotUnshareUserNamespaceAndChangeRoot(
format!("not available: {}", stderr)
))
}
},
Err(err) =>
Err(SecureModeError::CannotUnshareUserNamespaceAndChangeRoot(
format!("could not start child process: {}", err)
)),
}
spawn_process_for_security_check(
prepare_worker_program_path,
"--check-can-unshare-user-namespace-and-change-root",
&[cache_dir_tempdir.path()],
).await.map_err(|err| SecureModeError::CannotUnshareUserNamespaceAndChangeRoot(err))
} else {
Err(SecureModeError::CannotUnshareUserNamespaceAndChangeRoot(
"only available on Linux".into()
@@ -253,37 +246,17 @@ async fn check_landlock(
) -> SecureModeResult {
cfg_if::cfg_if! {
if #[cfg(target_os = "linux")] {
match tokio::process::Command::new(prepare_worker_program_path)
.arg("--check-can-enable-landlock")
.output()
.await
{
Ok(output) if output.status.success() => Ok(()),
Ok(output) => {
let abi =
polkadot_node_core_pvf_common::worker::security::landlock::LANDLOCK_ABI as u8;
let stderr = std::str::from_utf8(&output.stderr)
.expect("child process writes a UTF-8 string to stderr; qed")
.trim();
if stderr.is_empty() {
Err(SecureModeError::CannotEnableLandlock(
format!("landlock ABI {} not available", abi)
))
} else {
Err(SecureModeError::CannotEnableLandlock(
format!("not available: {}", stderr)
))
}
},
Err(err) =>
Err(SecureModeError::CannotEnableLandlock(
format!("could not start child process: {}", err)
)),
}
let abi = polkadot_node_core_pvf_common::worker::security::landlock::LANDLOCK_ABI as u8;
spawn_process_for_security_check(
prepare_worker_program_path,
"--check-can-enable-landlock",
std::iter::empty::<&str>(),
).await.map_err(|err| SecureModeError::CannotEnableLandlock { err, abi })
} else {
Err(SecureModeError::CannotEnableLandlock(
"only available on Linux".into()
))
Err(SecureModeError::CannotEnableLandlock {
err: "only available on Linux".into(),
abi: 0,
})
}
}
}
@@ -301,31 +274,11 @@ async fn check_seccomp(
if #[cfg(target_os = "linux")] {
cfg_if::cfg_if! {
if #[cfg(target_arch = "x86_64")] {
match tokio::process::Command::new(prepare_worker_program_path)
.arg("--check-can-enable-seccomp")
.output()
.await
{
Ok(output) if output.status.success() => Ok(()),
Ok(output) => {
let stderr = std::str::from_utf8(&output.stderr)
.expect("child process writes a UTF-8 string to stderr; qed")
.trim();
if stderr.is_empty() {
Err(SecureModeError::CannotEnableSeccomp(
"not available".into()
))
} else {
Err(SecureModeError::CannotEnableSeccomp(
format!("not available: {}", stderr)
))
}
},
Err(err) =>
Err(SecureModeError::CannotEnableSeccomp(
format!("could not start child process: {}", err)
)),
}
spawn_process_for_security_check(
prepare_worker_program_path,
"--check-can-enable-seccomp",
std::iter::empty::<&str>(),
).await.map_err(|err| SecureModeError::CannotEnableSeccomp(err))
} else {
Err(SecureModeError::CannotEnableSeccomp(
"only supported on CPUs from the x86_64 family (usually Intel or AMD)".into()
@@ -348,24 +301,85 @@ async fn check_seccomp(
}
}
/// Check if we can call `clone` with all sandboxing flags, and return an error if not.
///
/// We do this check by spawning a new process and trying to sandbox it. To get as close as possible
/// to running the check in a worker, we try it... in a worker. The expected return status is 0 on
/// success and -1 on failure.
async fn check_can_do_secure_clone(
#[cfg_attr(not(target_os = "linux"), allow(unused_variables))]
prepare_worker_program_path: &Path,
) -> SecureModeResult {
cfg_if::cfg_if! {
if #[cfg(target_os = "linux")] {
spawn_process_for_security_check(
prepare_worker_program_path,
"--check-can-do-secure-clone",
std::iter::empty::<&str>(),
).await.map_err(|err| SecureModeError::CannotDoSecureClone(err))
} else {
Err(SecureModeError::CannotDoSecureClone(
"only available on Linux".into()
))
}
}
}
#[cfg(target_os = "linux")]
async fn spawn_process_for_security_check<I, S>(
prepare_worker_program_path: &Path,
check_arg: &'static str,
extra_args: I,
) -> Result<(), String>
where
I: IntoIterator<Item = S>,
S: AsRef<std::ffi::OsStr>,
{
let mut command = tokio::process::Command::new(prepare_worker_program_path);
// Clear env vars. (In theory, running checks with different env vars could result in different
// outcomes of the checks.)
command.env_clear();
// Add back any env vars we want to keep.
if let Ok(value) = std::env::var("RUST_LOG") {
command.env("RUST_LOG", value);
}
match command.arg(check_arg).args(extra_args).output().await {
Ok(output) if output.status.success() => Ok(()),
Ok(output) => {
let stderr = std::str::from_utf8(&output.stderr)
.expect("child process writes a UTF-8 string to stderr; qed")
.trim();
if stderr.is_empty() {
Err("not available".into())
} else {
Err(format!("not available: {}", stderr))
}
},
Err(err) => Err(format!("could not start child process: {}", err)),
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_secure_mode_error_optionality() {
let err = SecureModeError::CannotEnableLandlock(String::new());
let err = SecureModeError::CannotEnableLandlock { err: String::new(), abi: 3 };
assert!(err.is_allowed_in_secure_mode(&SecurityStatus {
secure_validator_mode: true,
can_enable_landlock: false,
can_enable_seccomp: false,
can_unshare_user_namespace_and_change_root: true
can_unshare_user_namespace_and_change_root: true,
can_do_secure_clone: true,
}));
assert!(!err.is_allowed_in_secure_mode(&SecurityStatus {
secure_validator_mode: true,
can_enable_landlock: false,
can_enable_seccomp: true,
can_unshare_user_namespace_and_change_root: false
can_unshare_user_namespace_and_change_root: false,
can_do_secure_clone: false,
}));
let err = SecureModeError::CannotEnableSeccomp(String::new());
@@ -373,13 +387,15 @@ mod tests {
secure_validator_mode: true,
can_enable_landlock: false,
can_enable_seccomp: false,
can_unshare_user_namespace_and_change_root: true
can_unshare_user_namespace_and_change_root: true,
can_do_secure_clone: true,
}));
assert!(!err.is_allowed_in_secure_mode(&SecurityStatus {
secure_validator_mode: true,
can_enable_landlock: false,
can_enable_seccomp: true,
can_unshare_user_namespace_and_change_root: false
can_unshare_user_namespace_and_change_root: false,
can_do_secure_clone: false,
}));
let err = SecureModeError::CannotUnshareUserNamespaceAndChangeRoot(String::new());
@@ -387,13 +403,31 @@ mod tests {
secure_validator_mode: true,
can_enable_landlock: true,
can_enable_seccomp: false,
can_unshare_user_namespace_and_change_root: false
can_unshare_user_namespace_and_change_root: false,
can_do_secure_clone: false,
}));
assert!(!err.is_allowed_in_secure_mode(&SecurityStatus {
secure_validator_mode: true,
can_enable_landlock: false,
can_enable_seccomp: true,
can_unshare_user_namespace_and_change_root: false
can_unshare_user_namespace_and_change_root: false,
can_do_secure_clone: false,
}));
let err = SecureModeError::CannotDoSecureClone(String::new());
assert!(err.is_allowed_in_secure_mode(&SecurityStatus {
secure_validator_mode: true,
can_enable_landlock: true,
can_enable_seccomp: true,
can_unshare_user_namespace_and_change_root: true,
can_do_secure_clone: true,
}));
assert!(err.is_allowed_in_secure_mode(&SecurityStatus {
secure_validator_mode: false,
can_enable_landlock: false,
can_enable_seccomp: false,
can_unshare_user_namespace_and_change_root: false,
can_do_secure_clone: false,
}));
}
}
+3
View File
@@ -457,10 +457,13 @@ async fn all_security_features_work() {
assert_eq!(
host.security_status().await,
SecurityStatus {
// Disabled in tests to not enforce the presence of security features. This CI-only test
// is the only one that tests them.
secure_validator_mode: false,
can_enable_landlock,
can_enable_seccomp: true,
can_unshare_user_namespace_and_change_root: true,
can_do_secure_clone: true,
}
);
}
+4 -12
View File
@@ -94,7 +94,7 @@ fn find_process_by_sid_and_name(
found
}
/// Sets up the test and makes sure everything gets cleaned up after.
/// Sets up the test.
///
/// We run the runtime manually because `#[tokio::test]` doesn't work in `rusty_fork_test!`.
fn test_wrapper<F, Fut>(f: F)
@@ -112,14 +112,6 @@ where
// Pass a clone of the host so that it does not get dropped after.
f(host.clone(), sid).await;
// Sleep to give processes a chance to get cleaned up, preventing races in the next step.
tokio::time::sleep(Duration::from_millis(500)).await;
// Make sure job processes got cleaned up. Pass `is_direct_child: false` to target the
// job processes.
assert!(find_process_by_sid_and_name(sid, PREPARE_PROCESS_NAME, false).is_none());
assert!(find_process_by_sid_and_name(sid, EXECUTE_PROCESS_NAME, false).is_none());
});
}
@@ -127,7 +119,7 @@ where
// then finding the child process that matches the session ID and expected process name and doing
// something with that child.
rusty_fork_test! {
// Everything succeeded. All created subprocesses for jobs should get cleaned up, to avoid memory leaks.
// Everything succeeds.
#[test]
fn successful_prepare_and_validate() {
test_wrapper(|host, _sid| async move {
@@ -331,7 +323,7 @@ rusty_fork_test! {
// monitor, and memory tracking.
assert_eq!(
get_num_threads_by_sid_and_name(sid, PREPARE_PROCESS_NAME, false),
4
polkadot_node_core_pvf_prepare_worker::PREPARE_WORKER_THREAD_NUMBER as i64,
);
// End the test.
@@ -374,7 +366,7 @@ rusty_fork_test! {
// time monitor.
assert_eq!(
get_num_threads_by_sid_and_name(sid, EXECUTE_PROCESS_NAME, false),
3
polkadot_node_core_pvf_execute_worker::EXECUTE_WORKER_THREAD_NUMBER as i64,
);
// End the test.
@@ -0,0 +1,22 @@
title: "Use clone instead of fork on pvf"
doc:
- audience: Node Operator
description: |
For validators: Adds a new, optional security capability.
Most modern Linux machines should support it, otherwise you will get a warning like:
"- Optional: Cannot call clone with all sandboxing flags, a Linux-specific kernel security features: not available"
If you are already running in a secure environment such as a container, this may conflict with our security features; your only option may be to ignore the warning.
Otherwise, it is recommended to upgrade your Linux version!
migrations:
db: []
runtime: []
crates:
- name: polkadot-node-core-pvf
- name: polkadot-node-core-pvf-prepare-worker
- name: polkadot-node-core-pvf-execute-worker
host_functions: []