PVF: more filesystem sandboxing (#1373)

This commit is contained in:
Marcin S
2023-09-28 18:24:29 +02:00
committed by GitHub
parent de71fecc4e
commit c1eb342b14
24 changed files with 1528 additions and 612 deletions
+4 -3
View File
@@ -172,9 +172,10 @@ impl Artifacts {
///
/// The recognized artifacts will be filled in the table and unrecognized will be removed.
pub async fn new(cache_path: &Path) -> Self {
// Make sure that the cache path directory and all its parents are created.
// First delete the entire cache. Nodes are long-running so this should populate shortly.
// First delete the entire cache. This includes artifacts and any leftover worker dirs (see
// [`WorkerDir`]). Nodes are long-running so this should populate shortly.
let _ = tokio::fs::remove_dir_all(cache_path).await;
// Make sure that the cache path directory and all its parents are created.
let _ = tokio::fs::create_dir_all(cache_path).await;
Self { artifacts: HashMap::new() }
@@ -295,7 +296,7 @@ mod tests {
#[tokio::test]
async fn artifacts_removes_cache_on_startup() {
let fake_cache_path = crate::worker_intf::tmpfile("test-cache").await.unwrap();
let fake_cache_path = crate::worker_intf::tmppath("test-cache").await.unwrap();
let fake_artifact_path = {
let mut p = fake_cache_path.clone();
p.push("wasmtime_0x1234567890123456789012345678901234567890123456789012345678901234");
@@ -30,6 +30,7 @@ use futures::{
stream::{FuturesUnordered, StreamExt as _},
Future, FutureExt,
};
use polkadot_node_core_pvf_common::SecurityStatus;
use polkadot_primitives::{ExecutorParams, ExecutorParamsHash};
use slotmap::HopSlotMap;
use std::{
@@ -139,8 +140,10 @@ struct Queue {
// Some variables related to the current session.
program_path: PathBuf,
cache_path: PathBuf,
spawn_timeout: Duration,
node_version: Option<String>,
security_status: SecurityStatus,
/// The queue of jobs that are waiting for a worker to pick up.
queue: VecDeque<ExecuteJob>,
@@ -152,16 +155,20 @@ impl Queue {
fn new(
metrics: Metrics,
program_path: PathBuf,
cache_path: PathBuf,
worker_capacity: usize,
spawn_timeout: Duration,
node_version: Option<String>,
security_status: SecurityStatus,
to_queue_rx: mpsc::Receiver<ToQueue>,
) -> Self {
Self {
metrics,
program_path,
cache_path,
spawn_timeout,
node_version,
security_status,
to_queue_rx,
queue: VecDeque::new(),
mux: Mux::new(),
@@ -405,9 +412,11 @@ fn spawn_extra_worker(queue: &mut Queue, job: ExecuteJob) {
queue.mux.push(
spawn_worker_task(
queue.program_path.clone(),
queue.cache_path.clone(),
job,
queue.spawn_timeout,
queue.node_version.clone(),
queue.security_status.clone(),
)
.boxed(),
);
@@ -423,18 +432,22 @@ fn spawn_extra_worker(queue: &mut Queue, job: ExecuteJob) {
/// execute other jobs with a compatible execution environment.
async fn spawn_worker_task(
program_path: PathBuf,
cache_path: PathBuf,
job: ExecuteJob,
spawn_timeout: Duration,
node_version: Option<String>,
security_status: SecurityStatus,
) -> QueueEvent {
use futures_timer::Delay;
loop {
match super::worker_intf::spawn(
&program_path,
&cache_path,
job.executor_params.clone(),
spawn_timeout,
node_version.as_deref(),
security_status.clone(),
)
.await
{
@@ -496,17 +509,21 @@ fn assign(queue: &mut Queue, worker: Worker, job: ExecuteJob) {
pub fn start(
metrics: Metrics,
program_path: PathBuf,
cache_path: PathBuf,
worker_capacity: usize,
spawn_timeout: Duration,
node_version: Option<String>,
security_status: SecurityStatus,
) -> (mpsc::Sender<ToQueue>, impl Future<Output = ()>) {
let (to_queue_tx, to_queue_rx) = mpsc::channel(20);
let run = Queue::new(
metrics,
program_path,
cache_path,
worker_capacity,
spawn_timeout,
node_version,
security_status,
to_queue_rx,
)
.run();
+145 -76
View File
@@ -19,8 +19,8 @@
use crate::{
artifacts::ArtifactPathId,
worker_intf::{
path_to_bytes, spawn_with_program_path, IdleWorker, SpawnErr, WorkerHandle,
JOB_TIMEOUT_WALL_CLOCK_FACTOR,
clear_worker_dir_path, framed_recv, framed_send, spawn_with_program_path, IdleWorker,
SpawnErr, WorkerDir, WorkerHandle, JOB_TIMEOUT_WALL_CLOCK_FACTOR,
},
LOG_TARGET,
};
@@ -30,7 +30,7 @@ use parity_scale_codec::{Decode, Encode};
use polkadot_node_core_pvf_common::{
error::InternalValidationError,
execute::{Handshake, Response},
framed_recv, framed_send,
worker_dir, SecurityStatus,
};
use polkadot_parachain_primitives::primitives::ValidationResult;
use polkadot_primitives::ExecutorParams;
@@ -38,21 +38,30 @@ use std::{path::Path, time::Duration};
use tokio::{io, net::UnixStream};
/// Spawns a new worker with the given program path that acts as the worker and the spawn timeout.
/// Sends a handshake message to the worker as soon as it is spawned.
///
/// The program should be able to handle `<program-path> execute-worker <socket-path>` invocation.
/// Sends a handshake message to the worker as soon as it is spawned.
pub async fn spawn(
program_path: &Path,
cache_path: &Path,
executor_params: ExecutorParams,
spawn_timeout: Duration,
node_version: Option<&str>,
security_status: SecurityStatus,
) -> Result<(IdleWorker, WorkerHandle), SpawnErr> {
let mut extra_args = vec!["execute-worker"];
if let Some(node_version) = node_version {
extra_args.extend_from_slice(&["--node-impl-version", node_version]);
}
let (mut idle_worker, worker_handle) =
spawn_with_program_path("execute", program_path, &extra_args, spawn_timeout).await?;
let (mut idle_worker, worker_handle) = spawn_with_program_path(
"execute",
program_path,
cache_path,
&extra_args,
spawn_timeout,
security_status,
)
.await?;
send_handshake(&mut idle_worker.stream, Handshake { executor_params })
.await
.map_err(|error| {
@@ -104,89 +113,151 @@ pub async fn start_work(
execution_timeout: Duration,
validation_params: Vec<u8>,
) -> Outcome {
let IdleWorker { mut stream, pid } = worker;
let IdleWorker { mut stream, pid, worker_dir } = worker;
gum::debug!(
target: LOG_TARGET,
worker_pid = %pid,
?worker_dir,
validation_code_hash = ?artifact.id.code_hash,
"starting execute for {}",
artifact.path.display(),
);
if let Err(error) =
send_request(&mut stream, &artifact.path, &validation_params, execution_timeout).await
{
gum::warn!(
target: LOG_TARGET,
worker_pid = %pid,
validation_code_hash = ?artifact.id.code_hash,
?error,
"failed to send an execute request",
);
return Outcome::IoErr
}
// We use a generous timeout here. This is in addition to the one in the child process, in
// case the child stalls. We have a wall clock timeout here in the host, but a CPU timeout
// in the child. We want to use CPU time because it varies less than wall clock time under
// load, but the CPU resources of the child can only be measured from the parent after the
// child process terminates.
let timeout = execution_timeout * JOB_TIMEOUT_WALL_CLOCK_FACTOR;
let response = futures::select! {
response = recv_response(&mut stream).fuse() => {
match response {
Err(error) => {
gum::warn!(
target: LOG_TARGET,
worker_pid = %pid,
validation_code_hash = ?artifact.id.code_hash,
?error,
"failed to recv an execute response",
);
return Outcome::IoErr
},
Ok(response) => {
if let Response::Ok{duration, ..} = response {
if duration > execution_timeout {
// The job didn't complete within the timeout.
gum::warn!(
target: LOG_TARGET,
worker_pid = %pid,
"execute job took {}ms cpu time, exceeded execution timeout {}ms.",
duration.as_millis(),
execution_timeout.as_millis(),
);
// Return a timeout error.
return Outcome::HardTimeout;
}
}
response
},
}
},
_ = Delay::new(timeout).fuse() => {
with_worker_dir_setup(worker_dir, pid, &artifact.path, |worker_dir| async move {
if let Err(error) = send_request(&mut stream, &validation_params, execution_timeout).await {
gum::warn!(
target: LOG_TARGET,
worker_pid = %pid,
validation_code_hash = ?artifact.id.code_hash,
"execution worker exceeded lenient timeout for execution, child worker likely stalled",
?error,
"failed to send an execute request",
);
Response::TimedOut
},
};
return Outcome::IoErr
}
match response {
Response::Ok { result_descriptor, duration } =>
Outcome::Ok { result_descriptor, duration, idle_worker: IdleWorker { stream, pid } },
Response::InvalidCandidate(err) =>
Outcome::InvalidCandidate { err, idle_worker: IdleWorker { stream, pid } },
Response::TimedOut => Outcome::HardTimeout,
Response::Panic(err) => Outcome::Panic { err },
Response::InternalError(err) => Outcome::InternalError { err },
// We use a generous timeout here. This is in addition to the one in the child process, in
// case the child stalls. We have a wall clock timeout here in the host, but a CPU timeout
// in the child. We want to use CPU time because it varies less than wall clock time under
// load, but the CPU resources of the child can only be measured from the parent after the
// child process terminates.
let timeout = execution_timeout * JOB_TIMEOUT_WALL_CLOCK_FACTOR;
let response = futures::select! {
response = recv_response(&mut stream).fuse() => {
match response {
Err(error) => {
gum::warn!(
target: LOG_TARGET,
worker_pid = %pid,
validation_code_hash = ?artifact.id.code_hash,
?error,
"failed to recv an execute response",
);
return Outcome::IoErr
},
Ok(response) => {
if let Response::Ok{duration, ..} = response {
if duration > execution_timeout {
// The job didn't complete within the timeout.
gum::warn!(
target: LOG_TARGET,
worker_pid = %pid,
"execute job took {}ms cpu time, exceeded execution timeout {}ms.",
duration.as_millis(),
execution_timeout.as_millis(),
);
// Return a timeout error.
return Outcome::HardTimeout;
}
}
response
},
}
},
_ = Delay::new(timeout).fuse() => {
gum::warn!(
target: LOG_TARGET,
worker_pid = %pid,
validation_code_hash = ?artifact.id.code_hash,
"execution worker exceeded lenient timeout for execution, child worker likely stalled",
);
Response::TimedOut
},
};
match response {
Response::Ok { result_descriptor, duration } => Outcome::Ok {
result_descriptor,
duration,
idle_worker: IdleWorker { stream, pid, worker_dir },
},
Response::InvalidCandidate(err) => Outcome::InvalidCandidate {
err,
idle_worker: IdleWorker { stream, pid, worker_dir },
},
Response::TimedOut => Outcome::HardTimeout,
Response::Panic(err) => Outcome::Panic { err },
Response::InternalError(err) => Outcome::InternalError { err },
}
})
.await
}
/// Create a temporary file for an artifact in the worker cache, execute the given future/closure
/// passing the file path in, and clean up the worker cache.
///
/// Failure to clean up the worker cache results in an error - leaving any files here could be a
/// security issue, and we should shut down the worker. This should be very rare.
async fn with_worker_dir_setup<F, Fut>(
worker_dir: WorkerDir,
pid: u32,
artifact_path: &Path,
f: F,
) -> Outcome
where
Fut: futures::Future<Output = Outcome>,
F: FnOnce(WorkerDir) -> Fut,
{
// Cheaply create a hard link to the artifact. The artifact is always at a known location in the
// worker cache, and the child can't access any other artifacts or gain any information from the
// original filename.
let link_path = worker_dir::execute_artifact(&worker_dir.path);
if let Err(err) = tokio::fs::hard_link(artifact_path, link_path).await {
gum::warn!(
target: LOG_TARGET,
worker_pid = %pid,
?worker_dir,
"failed to clear worker cache after the job: {:?}",
err,
);
return Outcome::InternalError {
err: InternalValidationError::CouldNotCreateLink(format!("{:?}", err)),
}
}
let worker_dir_path = worker_dir.path.clone();
let outcome = f(worker_dir).await;
// Try to clear the worker dir.
if let Err(err) = clear_worker_dir_path(&worker_dir_path) {
gum::warn!(
target: LOG_TARGET,
worker_pid = %pid,
?worker_dir_path,
"failed to clear worker cache after the job: {:?}",
err,
);
return Outcome::InternalError {
err: InternalValidationError::CouldNotClearWorkerDir {
err: format!("{:?}", err),
path: worker_dir_path.to_str().map(String::from),
},
}
}
outcome
}
async fn send_handshake(stream: &mut UnixStream, handshake: Handshake) -> io::Result<()> {
@@ -195,11 +266,9 @@ async fn send_handshake(stream: &mut UnixStream, handshake: Handshake) -> io::Re
async fn send_request(
stream: &mut UnixStream,
artifact_path: &Path,
validation_params: &[u8],
execution_timeout: Duration,
) -> io::Result<()> {
framed_send(stream, path_to_bytes(artifact_path)).await?;
framed_send(stream, validation_params).await?;
framed_send(stream, &execution_timeout.encode()).await
}
+102 -18
View File
@@ -34,6 +34,7 @@ use futures::{
use polkadot_node_core_pvf_common::{
error::{PrepareError, PrepareResult},
pvf::PvfPrepData,
SecurityStatus,
};
use polkadot_parachain_primitives::primitives::ValidationResult;
use std::{
@@ -202,8 +203,13 @@ impl Config {
pub fn start(config: Config, metrics: Metrics) -> (ValidationHost, impl Future<Output = ()>) {
gum::debug!(target: LOG_TARGET, ?config, "starting PVF validation host");
// Run checks for supported security features once per host startup.
warn_if_no_landlock();
// Run checks for supported security features once per host startup. Warn here if not enabled.
let security_status = {
let can_enable_landlock = check_landlock(&config.prepare_worker_program_path);
let can_unshare_user_namespace_and_change_root =
check_can_unshare_user_namespace_and_change_root(&config.prepare_worker_program_path);
SecurityStatus { can_enable_landlock, can_unshare_user_namespace_and_change_root }
};
let (to_host_tx, to_host_rx) = mpsc::channel(10);
@@ -215,6 +221,7 @@ pub fn start(config: Config, metrics: Metrics) -> (ValidationHost, impl Future<O
config.cache_path.clone(),
config.prepare_worker_spawn_timeout,
config.node_version.clone(),
security_status.clone(),
);
let (to_prepare_queue_tx, from_prepare_queue_rx, run_prepare_queue) = prepare::start_queue(
@@ -229,9 +236,11 @@ pub fn start(config: Config, metrics: Metrics) -> (ValidationHost, impl Future<O
let (to_execute_queue_tx, run_execute_queue) = execute::start(
metrics,
config.execute_worker_program_path.to_owned(),
config.cache_path.clone(),
config.execute_workers_max_num,
config.execute_worker_spawn_timeout,
config.node_version,
security_status,
);
let (to_sweeper_tx, to_sweeper_rx) = mpsc::channel(100);
@@ -873,28 +882,103 @@ fn pulse_every(interval: std::time::Duration) -> impl futures::Stream<Item = ()>
.map(|_| ())
}
/// Check if landlock is supported and emit a warning if not.
fn warn_if_no_landlock() {
#[cfg(target_os = "linux")]
{
use polkadot_node_core_pvf_common::worker::security::landlock;
let status = landlock::get_status();
if !landlock::status_is_fully_enabled(&status) {
let abi = landlock::LANDLOCK_ABI as u8;
/// Check if we can sandbox the root and emit a warning if not.
///
/// We do this check by spawning a new process and trying to sandbox it. To get as close as possible
/// to running the check in a worker, we try it... in a worker. The expected return status is 0 on
/// success and -1 on failure.
fn check_can_unshare_user_namespace_and_change_root(
#[cfg_attr(not(target_os = "linux"), allow(unused_variables))]
prepare_worker_program_path: &Path,
) -> bool {
cfg_if::cfg_if! {
if #[cfg(target_os = "linux")] {
let output = std::process::Command::new(prepare_worker_program_path)
.arg("--check-can-unshare-user-namespace-and-change-root")
.output();
match output {
Ok(output) if output.status.success() => true,
Ok(output) => {
let stderr = std::str::from_utf8(&output.stderr)
.expect("child process writes a UTF-8 string to stderr; qed")
.trim();
gum::warn!(
target: LOG_TARGET,
?prepare_worker_program_path,
// Docs say to always print status using `Display` implementation.
status = %output.status,
%stderr,
"Cannot unshare user namespace and change root, which are Linux-specific kernel security features. Running validation of malicious PVF code has a higher risk of compromising this machine. Consider running with support for unsharing user namespaces for maximum security."
);
false
},
Err(err) => {
gum::warn!(
target: LOG_TARGET,
?prepare_worker_program_path,
"Could not start child process: {}",
err
);
false
},
}
} else {
gum::warn!(
target: LOG_TARGET,
?status,
%abi,
"Cannot fully enable landlock, a Linux kernel security feature. Running validation of malicious PVF code has a higher risk of compromising this machine. Consider upgrading the kernel version for maximum security."
"Cannot unshare user namespace and change root, which are Linux-specific kernel security features. Running validation of malicious PVF code has a higher risk of compromising this machine. Consider running on Linux with support for unsharing user namespaces for maximum security."
);
false
}
}
}
#[cfg(not(target_os = "linux"))]
gum::warn!(
target: LOG_TARGET,
"Cannot enable landlock, a Linux kernel security feature. Running validation of malicious PVF code has a higher risk of compromising this machine. Consider running on Linux with landlock support for maximum security."
);
/// Check if landlock is supported and emit a warning if not.
///
/// We do this check by spawning a new process and trying to sandbox it. To get as close as possible
/// to running the check in a worker, we try it... in a worker. The expected return status is 0 on
/// success and -1 on failure.
fn check_landlock(
#[cfg_attr(not(target_os = "linux"), allow(unused_variables))]
prepare_worker_program_path: &Path,
) -> bool {
cfg_if::cfg_if! {
if #[cfg(target_os = "linux")] {
match std::process::Command::new(prepare_worker_program_path)
.arg("--check-can-enable-landlock")
.status()
{
Ok(status) if status.success() => true,
Ok(status) => {
let abi =
polkadot_node_core_pvf_common::worker::security::landlock::LANDLOCK_ABI as u8;
gum::warn!(
target: LOG_TARGET,
?prepare_worker_program_path,
?status,
%abi,
"Cannot fully enable landlock, a Linux-specific kernel security feature. Running validation of malicious PVF code has a higher risk of compromising this machine. Consider upgrading the kernel version for maximum security."
);
false
},
Err(err) => {
gum::warn!(
target: LOG_TARGET,
?prepare_worker_program_path,
"Could not start child process: {}",
err
);
false
},
}
} else {
gum::warn!(
target: LOG_TARGET,
"Cannot enable landlock, a Linux-specific kernel security feature. Running validation of malicious PVF code has a higher risk of compromising this machine. Consider running on Linux with landlock support for maximum security."
);
false
}
}
}
#[cfg(test)]
+1
View File
@@ -111,6 +111,7 @@ pub use polkadot_node_core_pvf_common::{
error::{InternalValidationError, PrepareError},
prepare::{PrepareJobKind, PrepareStats},
pvf::PvfPrepData,
SecurityStatus,
};
/// The log target for this crate.
+45 -7
View File
@@ -27,6 +27,7 @@ use futures::{
use polkadot_node_core_pvf_common::{
error::{PrepareError, PrepareResult},
pvf::PvfPrepData,
SecurityStatus,
};
use slotmap::HopSlotMap;
use std::{
@@ -110,10 +111,12 @@ enum PoolEvent {
type Mux = FuturesUnordered<BoxFuture<'static, PoolEvent>>;
struct Pool {
// Some variables related to the current session.
program_path: PathBuf,
cache_path: PathBuf,
spawn_timeout: Duration,
node_version: Option<String>,
security_status: SecurityStatus,
to_pool: mpsc::Receiver<ToPool>,
from_pool: mpsc::UnboundedSender<FromPool>,
@@ -132,6 +135,7 @@ async fn run(
cache_path,
spawn_timeout,
node_version,
security_status,
to_pool,
mut from_pool,
mut spawned,
@@ -160,6 +164,7 @@ async fn run(
&cache_path,
spawn_timeout,
node_version.clone(),
security_status.clone(),
&mut spawned,
&mut mux,
to_pool,
@@ -207,6 +212,7 @@ fn handle_to_pool(
cache_path: &Path,
spawn_timeout: Duration,
node_version: Option<String>,
security_status: SecurityStatus,
spawned: &mut HopSlotMap<Worker, WorkerData>,
mux: &mut Mux,
to_pool: ToPool,
@@ -216,7 +222,14 @@ fn handle_to_pool(
gum::debug!(target: LOG_TARGET, "spawning a new prepare worker");
metrics.prepare_worker().on_begin_spawn();
mux.push(
spawn_worker_task(program_path.to_owned(), spawn_timeout, node_version).boxed(),
spawn_worker_task(
program_path.to_owned(),
cache_path.to_owned(),
spawn_timeout,
node_version,
security_status,
)
.boxed(),
);
},
ToPool::StartWork { worker, pvf, artifact_path } => {
@@ -229,7 +242,6 @@ fn handle_to_pool(
worker,
idle,
pvf,
cache_path.to_owned(),
artifact_path,
preparation_timer,
)
@@ -258,13 +270,23 @@ fn handle_to_pool(
async fn spawn_worker_task(
program_path: PathBuf,
cache_path: PathBuf,
spawn_timeout: Duration,
node_version: Option<String>,
security_status: SecurityStatus,
) -> PoolEvent {
use futures_timer::Delay;
loop {
match worker_intf::spawn(&program_path, spawn_timeout, node_version.as_deref()).await {
match worker_intf::spawn(
&program_path,
&cache_path,
spawn_timeout,
node_version.as_deref(),
security_status.clone(),
)
.await
{
Ok((idle, handle)) => break PoolEvent::Spawn(idle, handle),
Err(err) => {
gum::warn!(target: LOG_TARGET, "failed to spawn a prepare worker: {:?}", err);
@@ -281,11 +303,10 @@ async fn start_work_task<Timer>(
worker: Worker,
idle: IdleWorker,
pvf: PvfPrepData,
cache_path: PathBuf,
artifact_path: PathBuf,
_preparation_timer: Option<Timer>,
) -> PoolEvent {
let outcome = worker_intf::start_work(&metrics, idle, pvf, &cache_path, artifact_path).await;
let outcome = worker_intf::start_work(&metrics, idle, pvf, artifact_path).await;
PoolEvent::StartWork(worker, outcome)
}
@@ -322,14 +343,29 @@ fn handle_mux(
),
// Return `Concluded`, but do not kill the worker since the error was on the host
// side.
Outcome::RenameTmpFileErr { worker: idle, result: _, err } =>
Outcome::RenameTmpFileErr { worker: idle, result: _, err, src, dest } =>
handle_concluded_no_rip(
from_pool,
spawned,
worker,
idle,
Err(PrepareError::RenameTmpFileErr(err)),
Err(PrepareError::RenameTmpFileErr { err, src, dest }),
),
// Could not clear worker cache. Kill the worker so other jobs can't see the data.
Outcome::ClearWorkerDir { err } => {
if attempt_retire(metrics, spawned, worker) {
reply(
from_pool,
FromPool::Concluded {
worker,
rip: true,
result: Err(PrepareError::ClearWorkerDir(err)),
},
)?;
}
Ok(())
},
Outcome::Unreachable => {
if attempt_retire(metrics, spawned, worker) {
reply(from_pool, FromPool::Rip(worker))?;
@@ -434,6 +470,7 @@ pub fn start(
cache_path: PathBuf,
spawn_timeout: Duration,
node_version: Option<String>,
security_status: SecurityStatus,
) -> (mpsc::Sender<ToPool>, mpsc::UnboundedReceiver<FromPool>, impl Future<Output = ()>) {
let (to_pool_tx, to_pool_rx) = mpsc::channel(10);
let (from_pool_tx, from_pool_rx) = mpsc::unbounded();
@@ -444,6 +481,7 @@ pub fn start(
cache_path,
spawn_timeout,
node_version,
security_status,
to_pool: to_pool_rx,
from_pool: from_pool_tx,
spawned: HopSlotMap::with_capacity_and_key(20),
+139 -107
View File
@@ -19,17 +19,17 @@
use crate::{
metrics::Metrics,
worker_intf::{
path_to_bytes, spawn_with_program_path, tmpfile_in, IdleWorker, SpawnErr, WorkerHandle,
JOB_TIMEOUT_WALL_CLOCK_FACTOR,
clear_worker_dir_path, framed_recv, framed_send, spawn_with_program_path, IdleWorker,
SpawnErr, WorkerDir, WorkerHandle, JOB_TIMEOUT_WALL_CLOCK_FACTOR,
},
LOG_TARGET,
};
use parity_scale_codec::{Decode, Encode};
use polkadot_node_core_pvf_common::{
error::{PrepareError, PrepareResult},
framed_recv, framed_send,
prepare::PrepareStats,
pvf::PvfPrepData,
worker_dir, SecurityStatus,
};
use sp_core::hexdisplay::HexDisplay;
@@ -41,19 +41,33 @@ use tokio::{io, net::UnixStream};
/// Spawns a new worker with the given program path that acts as the worker and the spawn timeout.
///
/// The program should be able to handle `<program-path> prepare-worker <socket-path>` invocation.
/// Sends a handshake message to the worker as soon as it is spawned.
pub async fn spawn(
program_path: &Path,
cache_path: &Path,
spawn_timeout: Duration,
node_version: Option<&str>,
security_status: SecurityStatus,
) -> Result<(IdleWorker, WorkerHandle), SpawnErr> {
let mut extra_args = vec!["prepare-worker"];
if let Some(node_version) = node_version {
extra_args.extend_from_slice(&["--node-impl-version", node_version]);
}
spawn_with_program_path("prepare", program_path, &extra_args, spawn_timeout).await
spawn_with_program_path(
"prepare",
program_path,
cache_path,
&extra_args,
spawn_timeout,
security_status,
)
.await
}
/// Outcome of PVF preparation.
///
/// If the idle worker token is not returned, it means the worker must be terminated.
pub enum Outcome {
/// The worker has finished the work assigned to it.
Concluded { worker: IdleWorker, result: PrepareResult },
@@ -62,9 +76,19 @@ pub enum Outcome {
Unreachable,
/// The temporary file for the artifact could not be created at the given cache path.
CreateTmpFileErr { worker: IdleWorker, err: String },
/// The response from the worker is received, but the file cannot be renamed (moved) to the
/// The response from the worker is received, but the tmp file cannot be renamed (moved) to the
/// final destination location.
RenameTmpFileErr { worker: IdleWorker, result: PrepareResult, err: String },
RenameTmpFileErr {
worker: IdleWorker,
result: PrepareResult,
err: String,
// Unfortunately `PathBuf` doesn't implement `Encode`/`Decode`, so we do a fallible
// conversion to `Option<String>`.
src: Option<String>,
dest: Option<String>,
},
/// The worker cache could not be cleared for the given reason.
ClearWorkerDir { err: String },
/// The worker failed to finish the job until the given deadline.
///
/// The worker is no longer usable and should be killed.
@@ -84,83 +108,88 @@ pub async fn start_work(
metrics: &Metrics,
worker: IdleWorker,
pvf: PvfPrepData,
cache_path: &Path,
artifact_path: PathBuf,
) -> Outcome {
let IdleWorker { stream, pid } = worker;
let IdleWorker { stream, pid, worker_dir } = worker;
gum::debug!(
target: LOG_TARGET,
worker_pid = %pid,
?worker_dir,
"starting prepare for {}",
artifact_path.display(),
);
with_tmp_file(stream, pid, cache_path, |tmp_file, mut stream| async move {
let preparation_timeout = pvf.prep_timeout();
if let Err(err) = send_request(&mut stream, pvf, &tmp_file).await {
gum::warn!(
target: LOG_TARGET,
worker_pid = %pid,
"failed to send a prepare request: {:?}",
err,
);
return Outcome::Unreachable
}
// Wait for the result from the worker, keeping in mind that there may be a timeout, the
// worker may get killed, or something along these lines. In that case we should propagate
// the error to the pool.
//
// We use a generous timeout here. This is in addition to the one in the child process, in
// case the child stalls. We have a wall clock timeout here in the host, but a CPU timeout
// in the child. We want to use CPU time because it varies less than wall clock time under
// load, but the CPU resources of the child can only be measured from the parent after the
// child process terminates.
let timeout = preparation_timeout * JOB_TIMEOUT_WALL_CLOCK_FACTOR;
let result = tokio::time::timeout(timeout, recv_response(&mut stream, pid)).await;
match result {
// Received bytes from worker within the time limit.
Ok(Ok(prepare_result)) =>
handle_response(
metrics,
IdleWorker { stream, pid },
prepare_result,
pid,
tmp_file,
artifact_path,
preparation_timeout,
)
.await,
Ok(Err(err)) => {
// Communication error within the time limit.
with_worker_dir_setup(
worker_dir,
stream,
pid,
|tmp_artifact_file, mut stream, worker_dir| async move {
let preparation_timeout = pvf.prep_timeout();
if let Err(err) = send_request(&mut stream, pvf).await {
gum::warn!(
target: LOG_TARGET,
worker_pid = %pid,
"failed to recv a prepare response: {:?}",
"failed to send a prepare request: {:?}",
err,
);
Outcome::IoErr(err.to_string())
},
Err(_) => {
// Timed out here on the host.
gum::warn!(
target: LOG_TARGET,
worker_pid = %pid,
"did not recv a prepare response within the time limit",
);
Outcome::TimedOut
},
}
})
return Outcome::Unreachable
}
// Wait for the result from the worker, keeping in mind that there may be a timeout, the
// worker may get killed, or something along these lines. In that case we should
// propagate the error to the pool.
//
// We use a generous timeout here. This is in addition to the one in the child process,
// in case the child stalls. We have a wall clock timeout here in the host, but a CPU
// timeout in the child. We want to use CPU time because it varies less than wall clock
// time under load, but the CPU resources of the child can only be measured from the
// parent after the child process terminates.
let timeout = preparation_timeout * JOB_TIMEOUT_WALL_CLOCK_FACTOR;
let result = tokio::time::timeout(timeout, recv_response(&mut stream, pid)).await;
match result {
// Received bytes from worker within the time limit.
Ok(Ok(prepare_result)) =>
handle_response(
metrics,
IdleWorker { stream, pid, worker_dir },
prepare_result,
pid,
tmp_artifact_file,
artifact_path,
preparation_timeout,
)
.await,
Ok(Err(err)) => {
// Communication error within the time limit.
gum::warn!(
target: LOG_TARGET,
worker_pid = %pid,
"failed to recv a prepare response: {:?}",
err,
);
Outcome::IoErr(err.to_string())
},
Err(_) => {
// Timed out here on the host.
gum::warn!(
target: LOG_TARGET,
worker_pid = %pid,
"did not recv a prepare response within the time limit",
);
Outcome::TimedOut
},
}
},
)
.await
}
/// Handles the case where we successfully received response bytes on the host from the child.
///
/// NOTE: Here we know the artifact exists, but is still located in a temporary file which will be
/// cleared by `with_tmp_file`.
/// Here we know the artifact exists, but is still located in a temporary file which will be cleared
/// by [`with_worker_dir_setup`].
async fn handle_response(
metrics: &Metrics,
worker: IdleWorker,
@@ -209,7 +238,13 @@ async fn handle_response(
artifact_path.display(),
err,
);
Outcome::RenameTmpFileErr { worker, result, err: format!("{:?}", err) }
Outcome::RenameTmpFileErr {
worker,
result,
err: format!("{:?}", err),
src: tmp_file.to_str().map(String::from),
dest: artifact_path.to_str().map(String::from),
}
},
};
@@ -220,61 +255,58 @@ async fn handle_response(
outcome
}
/// Create a temporary file for an artifact at the given cache path and execute the given
/// future/closure passing the file path in.
/// Create a temporary file for an artifact in the worker cache, execute the given future/closure
/// passing the file path in, and clean up the worker cache.
///
/// The function will try best effort to not leave behind the temporary file.
async fn with_tmp_file<F, Fut>(stream: UnixStream, pid: u32, cache_path: &Path, f: F) -> Outcome
/// Failure to clean up the worker cache results in an error - leaving any files here could be a
/// security issue, and we should shut down the worker. This should be very rare.
async fn with_worker_dir_setup<F, Fut>(
worker_dir: WorkerDir,
stream: UnixStream,
pid: u32,
f: F,
) -> Outcome
where
Fut: futures::Future<Output = Outcome>,
F: FnOnce(PathBuf, UnixStream) -> Fut,
F: FnOnce(PathBuf, UnixStream, WorkerDir) -> Fut,
{
let tmp_file = match tmpfile_in("prepare-artifact-", cache_path).await {
Ok(f) => f,
Err(err) => {
gum::warn!(
target: LOG_TARGET,
worker_pid = %pid,
"failed to create a temp file for the artifact: {:?}",
err,
);
return Outcome::CreateTmpFileErr {
worker: IdleWorker { stream, pid },
err: format!("{:?}", err),
}
},
// Create the tmp file here so that the child doesn't need any file creation rights. This will
// be cleared at the end of this function.
let tmp_file = worker_dir::prepare_tmp_artifact(&worker_dir.path);
if let Err(err) = tokio::fs::File::create(&tmp_file).await {
gum::warn!(
target: LOG_TARGET,
worker_pid = %pid,
?worker_dir,
"failed to create a temp file for the artifact: {:?}",
err,
);
return Outcome::CreateTmpFileErr {
worker: IdleWorker { stream, pid, worker_dir },
err: format!("{:?}", err),
}
};
let outcome = f(tmp_file.clone(), stream).await;
let worker_dir_path = worker_dir.path.clone();
let outcome = f(tmp_file, stream, worker_dir).await;
// The function called above is expected to move `tmp_file` to a new location upon success.
// However, the function may as well fail and in that case we should remove the tmp file here.
//
// In any case, we try to remove the file here so that there are no leftovers. We only report
// errors that are different from the `NotFound`.
match tokio::fs::remove_file(tmp_file).await {
Ok(()) => (),
Err(err) if err.kind() == std::io::ErrorKind::NotFound => (),
Err(err) => {
gum::warn!(
target: LOG_TARGET,
worker_pid = %pid,
"failed to remove the tmp file: {:?}",
err,
);
},
// Try to clear the worker dir.
if let Err(err) = clear_worker_dir_path(&worker_dir_path) {
gum::warn!(
target: LOG_TARGET,
worker_pid = %pid,
?worker_dir_path,
"failed to clear worker cache after the job: {:?}",
err,
);
return Outcome::ClearWorkerDir { err: format!("{:?}", err) }
}
outcome
}
async fn send_request(
stream: &mut UnixStream,
pvf: PvfPrepData,
tmp_file: &Path,
) -> io::Result<()> {
async fn send_request(stream: &mut UnixStream, pvf: PvfPrepData) -> io::Result<()> {
framed_send(stream, &pvf.encode()).await?;
framed_send(stream, path_to_bytes(tmp_file)).await?;
Ok(())
}
+202 -96
View File
@@ -20,6 +20,7 @@ use crate::LOG_TARGET;
use futures::FutureExt as _;
use futures_timer::Delay;
use pin_project::pin_project;
use polkadot_node_core_pvf_common::{worker_dir, SecurityStatus};
use rand::Rng;
use std::{
fmt, mem,
@@ -39,99 +40,106 @@ use tokio::{
pub const JOB_TIMEOUT_WALL_CLOCK_FACTOR: u32 = 4;
/// This is publicly exposed only for integration tests.
///
/// # Parameters
///
/// - `debug_id`: An identifier for the process (e.g. "execute" or "prepare").
///
/// - `program_path`: The path to the program.
///
/// - `cache_path`: The path to the artifact cache.
///
/// - `extra_args`: Optional extra CLI arguments to the program. NOTE: Should only contain data
/// required before the handshake, like node/worker versions for the version check. Other data
/// should go through the handshake.
///
/// - `spawn_timeout`: The amount of time to wait for the child process to spawn.
///
/// - `security_status`: contains the detected status of security features.
#[doc(hidden)]
pub async fn spawn_with_program_path(
debug_id: &'static str,
program_path: impl Into<PathBuf>,
cache_path: &Path,
extra_args: &[&str],
spawn_timeout: Duration,
security_status: SecurityStatus,
) -> Result<(IdleWorker, WorkerHandle), SpawnErr> {
let program_path = program_path.into();
with_transient_socket_path(debug_id, |socket_path| {
let socket_path = socket_path.to_owned();
let extra_args: Vec<String> = extra_args.iter().map(|arg| arg.to_string()).collect();
let worker_dir = WorkerDir::new(debug_id, cache_path).await?;
let socket_path = worker_dir::socket(&worker_dir.path);
async move {
let listener = UnixListener::bind(&socket_path).map_err(|err| {
let extra_args: Vec<String> = extra_args.iter().map(|arg| arg.to_string()).collect();
let listener = UnixListener::bind(&socket_path).map_err(|err| {
gum::warn!(
target: LOG_TARGET,
%debug_id,
?program_path,
?extra_args,
?worker_dir,
?socket_path,
"cannot bind unix socket: {:?}",
err,
);
SpawnErr::Bind
})?;
let handle = WorkerHandle::spawn(&program_path, &extra_args, &worker_dir.path, security_status)
.map_err(|err| {
gum::warn!(
target: LOG_TARGET,
%debug_id,
?program_path,
?extra_args,
?worker_dir.path,
?socket_path,
"cannot spawn a worker: {:?}",
err,
);
SpawnErr::ProcessSpawn
})?;
let worker_dir_path = worker_dir.path.clone();
futures::select! {
accept_result = listener.accept().fuse() => {
let (stream, _) = accept_result.map_err(|err| {
gum::warn!(
target: LOG_TARGET,
%debug_id,
?program_path,
?extra_args,
"cannot bind unix socket: {:?}",
?worker_dir_path,
?socket_path,
"cannot accept a worker: {:?}",
err,
);
SpawnErr::Bind
SpawnErr::Accept
})?;
let handle =
WorkerHandle::spawn(&program_path, &extra_args, socket_path).map_err(|err| {
gum::warn!(
target: LOG_TARGET,
%debug_id,
?program_path,
?extra_args,
"cannot spawn a worker: {:?}",
err,
);
SpawnErr::ProcessSpawn
})?;
futures::select! {
accept_result = listener.accept().fuse() => {
let (stream, _) = accept_result.map_err(|err| {
gum::warn!(
target: LOG_TARGET,
%debug_id,
?program_path,
?extra_args,
"cannot accept a worker: {:?}",
err,
);
SpawnErr::Accept
})?;
Ok((IdleWorker { stream, pid: handle.id() }, handle))
}
_ = Delay::new(spawn_timeout).fuse() => {
gum::warn!(
target: LOG_TARGET,
%debug_id,
?program_path,
?extra_args,
?spawn_timeout,
"spawning and connecting to socket timed out",
);
Err(SpawnErr::AcceptTimeout)
}
}
Ok((IdleWorker { stream, pid: handle.id(), worker_dir }, handle))
}
})
.await
_ = Delay::new(spawn_timeout).fuse() => {
gum::warn!(
target: LOG_TARGET,
%debug_id,
?program_path,
?extra_args,
?worker_dir_path,
?socket_path,
?spawn_timeout,
"spawning and connecting to socket timed out",
);
Err(SpawnErr::AcceptTimeout)
}
}
}
async fn with_transient_socket_path<T, F, Fut>(debug_id: &'static str, f: F) -> Result<T, SpawnErr>
where
F: FnOnce(&Path) -> Fut,
Fut: futures::Future<Output = Result<T, SpawnErr>> + 'static,
{
let socket_path = tmpfile(&format!("pvf-host-{}", debug_id))
.await
.map_err(|_| SpawnErr::TmpFile)?;
let result = f(&socket_path).await;
// Best effort to remove the socket file. Under normal circumstances the socket will be removed
// by the worker. We make sure that it is removed here, just in case a failed rendezvous.
let _ = tokio::fs::remove_file(socket_path).await;
result
}
/// Returns a path under the given `dir`. The file name will start with the given prefix.
/// Returns a path under the given `dir`. The path name will start with the given prefix.
///
/// There is only a certain number of retries. If exceeded this function will give up and return an
/// error.
pub async fn tmpfile_in(prefix: &str, dir: &Path) -> io::Result<PathBuf> {
fn tmppath(prefix: &str, dir: &Path) -> PathBuf {
pub async fn tmppath_in(prefix: &str, dir: &Path) -> io::Result<PathBuf> {
fn make_tmppath(prefix: &str, dir: &Path) -> PathBuf {
use rand::distributions::Alphanumeric;
const DESCRIMINATOR_LEN: usize = 10;
@@ -143,27 +151,28 @@ pub async fn tmpfile_in(prefix: &str, dir: &Path) -> io::Result<PathBuf> {
let s = std::str::from_utf8(&buf)
.expect("the string is collected from a valid utf-8 sequence; qed");
let mut file = dir.to_owned();
file.push(s);
file
let mut path = dir.to_owned();
path.push(s);
path
}
const NUM_RETRIES: usize = 50;
for _ in 0..NUM_RETRIES {
let candidate_path = tmppath(prefix, dir);
if !candidate_path.exists() {
return Ok(candidate_path)
let tmp_path = make_tmppath(prefix, dir);
if !tmp_path.exists() {
return Ok(tmp_path)
}
}
Err(io::Error::new(io::ErrorKind::Other, "failed to create a temporary file"))
Err(io::Error::new(io::ErrorKind::Other, "failed to create a temporary path"))
}
/// The same as [`tmpfile_in`], but uses [`std::env::temp_dir`] as the directory.
pub async fn tmpfile(prefix: &str) -> io::Result<PathBuf> {
/// The same as [`tmppath_in`], but uses [`std::env::temp_dir`] as the directory.
#[cfg(test)]
pub async fn tmppath(prefix: &str) -> io::Result<PathBuf> {
let temp_dir = PathBuf::from(std::env::temp_dir());
tmpfile_in(prefix, &temp_dir).await
tmppath_in(prefix, &temp_dir).await
}
/// A struct that represents an idle worker.
@@ -177,13 +186,19 @@ pub struct IdleWorker {
/// The identifier of this process. Used to reset the niceness.
pub pid: u32,
/// The temporary per-worker path. We clean up the worker dir between jobs and delete it when
/// the worker dies.
pub worker_dir: WorkerDir,
}
/// An error happened during spawning a worker process.
#[derive(Clone, Debug)]
pub enum SpawnErr {
/// Cannot obtain a temporary file location.
TmpFile,
/// Cannot obtain a temporary path location.
TmpPath,
/// An FS error occurred.
Fs(String),
/// Cannot bind the socket to the given path.
Bind,
/// An error happened during accepting a connection to the socket.
@@ -219,12 +234,32 @@ impl WorkerHandle {
fn spawn(
program: impl AsRef<Path>,
extra_args: &[String],
socket_path: impl AsRef<Path>,
worker_dir_path: impl AsRef<Path>,
security_status: SecurityStatus,
) -> io::Result<Self> {
let mut child = process::Command::new(program.as_ref())
let security_args = {
let mut args = vec![];
if security_status.can_enable_landlock {
args.push("--can-enable-landlock".to_string());
}
if security_status.can_unshare_user_namespace_and_change_root {
args.push("--can-unshare-user-namespace-and-change-root".to_string());
}
args
};
// Clear all env vars from the spawned process.
let mut command = process::Command::new(program.as_ref());
command.env_clear();
// Add back any env vars we want to keep.
if let Ok(value) = std::env::var("RUST_LOG") {
command.env("RUST_LOG", value);
}
let mut child = command
.args(extra_args)
.arg("--socket-path")
.arg(socket_path.as_ref().as_os_str())
.arg("--worker-dir-path")
.arg(worker_dir_path.as_ref().as_os_str())
.args(&security_args)
.stdout(std::process::Stdio::piped())
.kill_on_drop(true)
.spawn()?;
@@ -306,16 +341,6 @@ impl fmt::Debug for WorkerHandle {
}
}
/// Convert the given path into a byte buffer.
pub fn path_to_bytes(path: &Path) -> &[u8] {
// Ideally, we take the `OsStr` of the path, send that and reconstruct this on the other side.
// However, libstd doesn't provide us with such an option. There are crates out there that
// allow for extraction of a path, but TBH it doesn't seem to be a real issue.
//
// However, should be there reports we can incorporate such a crate here.
path.to_str().expect("non-UTF-8 path").as_bytes()
}
/// Write some data prefixed by its length into `w`.
pub async fn framed_send(w: &mut (impl AsyncWrite + Unpin), buf: &[u8]) -> io::Result<()> {
let len_buf = buf.len().to_le_bytes();
@@ -333,3 +358,84 @@ pub async fn framed_recv(r: &mut (impl AsyncRead + Unpin)) -> io::Result<Vec<u8>
r.read_exact(&mut buf).await?;
Ok(buf)
}
/// A temporary worker dir that contains only files needed by the worker. The worker will change its
/// root (the `/` directory) to this directory; it should have access to no other paths on its
/// filesystem.
///
/// NOTE: This struct cleans up its associated directory when it is dropped. Therefore it should not
/// implement `Clone`.
///
/// # File structure
///
/// The overall file structure for the PVF system is as follows. The `worker-dir-X`s are managed by
/// this struct.
///
/// ```nocompile
/// + /<cache_path>/
/// - artifact-1
/// - artifact-2
/// - [...]
/// - worker-dir-1/ (new `/` for worker-1)
/// + socket (created by host)
/// + tmp-artifact (created by host) (prepare-only)
/// + artifact (link -> artifact-1) (created by host) (execute-only)
/// - worker-dir-2/ (new `/` for worker-2)
/// + [...]
/// ```
#[derive(Debug)]
pub struct WorkerDir {
pub path: PathBuf,
}
impl WorkerDir {
/// Creates a new, empty worker dir with a random name in the given cache dir.
pub async fn new(debug_id: &'static str, cache_dir: &Path) -> Result<Self, SpawnErr> {
let prefix = format!("worker-dir-{}-", debug_id);
let path = tmppath_in(&prefix, cache_dir).await.map_err(|_| SpawnErr::TmpPath)?;
tokio::fs::create_dir(&path)
.await
.map_err(|err| SpawnErr::Fs(err.to_string()))?;
Ok(Self { path })
}
}
// Try to clean up the temporary worker dir at the end of the worker's lifetime. It should be wiped
// on startup, but we make a best effort not to leave it around.
impl Drop for WorkerDir {
fn drop(&mut self) {
let _ = std::fs::remove_dir_all(&self.path);
}
}
// Not async since Rust has trouble with async recursion. There should be few files here anyway.
//
// TODO: A lingering malicious job can still access future files in this dir. See
// <https://github.com/paritytech/polkadot-sdk/issues/574> for how to fully secure this.
/// Clear the temporary worker dir without deleting it. Not deleting is important because the worker
/// has mounted its own separate filesystem here.
///
/// Should be called right after a job has finished. We don't want jobs to have access to
/// artifacts from previous jobs.
pub fn clear_worker_dir_path(worker_dir_path: &Path) -> io::Result<()> {
fn remove_dir_contents(path: &Path) -> io::Result<()> {
for entry in std::fs::read_dir(&path)? {
let entry = entry?;
let path = entry.path();
if entry.file_type()?.is_dir() {
remove_dir_contents(&path)?;
std::fs::remove_dir(path)?;
} else {
std::fs::remove_file(path)?;
}
}
Ok(())
}
// Note the worker dir may not exist anymore because of the worker dying and being cleaned up.
match remove_dir_contents(worker_dir_path) {
Err(err) if matches!(err.kind(), io::ErrorKind::NotFound) => Ok(()),
result => result,
}
}