PVF: Move PVF workers into separate crate (#7101)

* Move PVF workers into separate crate

* Fix indentation

* Fix compilation errors

* Fix more compilation errors

* Rename `worker.rs` files, make host interface to worker more clear

* Fix more compilation errors

* Fix more compilation errors

* Add link to issue

* Address review comments

* Update comment
This commit is contained in:
Marcin S
2023-04-21 12:40:09 +02:00
committed by GitHub
parent ac09a84115
commit e277f95b3b
42 changed files with 878 additions and 627 deletions
+2
View File
@@ -65,9 +65,11 @@ use std::{
time::{Duration, SystemTime},
};
/// Contains the bytes for a successfully compiled artifact.
pub struct CompiledArtifact(Vec<u8>);
impl CompiledArtifact {
/// Creates a `CompiledArtifact`.
pub fn new(code: Vec<u8>) -> Self {
Self(code)
}
+1 -15
View File
@@ -16,7 +16,7 @@
use crate::prepare::PrepareStats;
use parity_scale_codec::{Decode, Encode};
use std::{any::Any, fmt};
use std::fmt;
/// Result of PVF preparation performed by the validation host. Contains stats about the preparation if
/// successful
@@ -126,17 +126,3 @@ impl From<PrepareError> for ValidationError {
}
}
}
/// Attempt to convert an opaque panic payload to a string.
///
/// This is a best effort, and is not guaranteed to provide the most accurate value.
pub(crate) fn stringify_panic_payload(payload: Box<dyn Any + Send + 'static>) -> String {
match payload.downcast::<&'static str>() {
Ok(msg) => msg.to_string(),
Err(payload) => match payload.downcast::<String>() {
Ok(msg) => *msg,
// At least we tried...
Err(_) => "unknown panic payload".to_string(),
},
}
}
+3 -3
View File
@@ -18,10 +18,10 @@
//!
//! The validation host [runs the queue][`start`] communicating with it by sending [`ToQueue`]
//! messages. The queue will spawn workers in new processes. Those processes should jump to
//! [`worker_entrypoint`].
//! `polkadot_node_core_pvf_worker::execute_worker_entrypoint`.
mod queue;
mod worker;
mod worker_intf;
pub use queue::{start, PendingExecutionRequest, ToQueue};
pub use worker::{worker_entrypoint, Response as ExecuteResponse};
pub use worker_intf::{Handshake as ExecuteHandshake, Response as ExecuteResponse};
+10 -5
View File
@@ -16,7 +16,7 @@
//! A queue that handles requests for PVF execution.
use super::worker::Outcome;
use super::worker_intf::Outcome;
use crate::{
artifacts::{ArtifactId, ArtifactPathId},
host::ResultSender,
@@ -416,7 +416,8 @@ async fn spawn_worker_task(
use futures_timer::Delay;
loop {
match super::worker::spawn(&program_path, job.executor_params.clone(), spawn_timeout).await
match super::worker_intf::spawn(&program_path, job.executor_params.clone(), spawn_timeout)
.await
{
Ok((idle, handle)) => break QueueEvent::Spawn(idle, handle, job),
Err(err) => {
@@ -460,9 +461,13 @@ fn assign(queue: &mut Queue, worker: Worker, job: ExecuteJob) {
queue.mux.push(
async move {
let _timer = execution_timer;
let outcome =
super::worker::start_work(idle, job.artifact.clone(), job.exec_timeout, job.params)
.await;
let outcome = super::worker_intf::start_work(
idle,
job.artifact.clone(),
job.exec_timeout,
job.params,
)
.await;
QueueEvent::StartWork(worker, outcome, job.artifact.id, job.result_tx)
}
.boxed(),
@@ -14,28 +14,23 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! Host interface to the execute worker.
use crate::{
artifacts::ArtifactPathId,
executor_intf::Executor,
worker_common::{
bytes_to_path, cpu_time_monitor_loop, framed_recv, framed_send, path_to_bytes,
spawn_with_program_path, worker_event_loop, IdleWorker, SpawnErr, WorkerHandle,
JOB_TIMEOUT_WALL_CLOCK_FACTOR,
framed_recv, framed_send, path_to_bytes, spawn_with_program_path, IdleWorker, SpawnErr,
WorkerHandle, JOB_TIMEOUT_WALL_CLOCK_FACTOR,
},
LOG_TARGET,
};
use cpu_time::ProcessTime;
use futures::{pin_mut, select_biased, FutureExt};
use futures::FutureExt;
use futures_timer::Delay;
use parity_scale_codec::{Decode, Encode};
use polkadot_parachain::primitives::ValidationResult;
use polkadot_primitives::ExecutorParams;
use std::{
path::{Path, PathBuf},
sync::{mpsc::channel, Arc},
time::Duration,
};
use std::{path::Path, time::Duration};
use tokio::{io, net::UnixStream};
/// Spawns a new worker with the given program path that acts as the worker and the spawn timeout.
@@ -185,17 +180,6 @@ async fn send_handshake(stream: &mut UnixStream, handshake: Handshake) -> io::Re
framed_send(stream, &handshake.encode()).await
}
async fn recv_handshake(stream: &mut UnixStream) -> io::Result<Handshake> {
let handshake_enc = framed_recv(stream).await?;
let handshake = Handshake::decode(&mut &handshake_enc[..]).map_err(|_| {
io::Error::new(
io::ErrorKind::Other,
"execute pvf recv_handshake: failed to decode Handshake".to_owned(),
)
})?;
Ok(handshake)
}
async fn send_request(
stream: &mut UnixStream,
artifact_path: &Path,
@@ -207,29 +191,6 @@ async fn send_request(
framed_send(stream, &execution_timeout.encode()).await
}
async fn recv_request(stream: &mut UnixStream) -> io::Result<(PathBuf, Vec<u8>, Duration)> {
let artifact_path = framed_recv(stream).await?;
let artifact_path = bytes_to_path(&artifact_path).ok_or_else(|| {
io::Error::new(
io::ErrorKind::Other,
"execute pvf recv_request: non utf-8 artifact path".to_string(),
)
})?;
let params = framed_recv(stream).await?;
let execution_timeout = framed_recv(stream).await?;
let execution_timeout = Duration::decode(&mut &execution_timeout[..]).map_err(|_| {
io::Error::new(
io::ErrorKind::Other,
"execute pvf recv_request: failed to decode duration".to_string(),
)
})?;
Ok((artifact_path, params, execution_timeout))
}
async fn send_response(stream: &mut UnixStream, response: Response) -> io::Result<()> {
framed_send(stream, &response.encode()).await
}
async fn recv_response(stream: &mut UnixStream) -> io::Result<Response> {
let response_bytes = framed_recv(stream).await?;
Response::decode(&mut &response_bytes[..]).map_err(|e| {
@@ -240,28 +201,43 @@ async fn recv_response(stream: &mut UnixStream) -> io::Result<Response> {
})
}
/// The payload of the one-time handshake that is done when a worker process is created. Carries
/// data from the host to the worker.
#[derive(Encode, Decode)]
struct Handshake {
executor_params: ExecutorParams,
pub struct Handshake {
/// The executor parameters.
pub executor_params: ExecutorParams,
}
/// The response from an execution job on the worker.
#[derive(Encode, Decode)]
pub enum Response {
Ok { result_descriptor: ValidationResult, duration: Duration },
/// The job completed successfully.
Ok {
/// The result of parachain validation.
result_descriptor: ValidationResult,
/// The amount of CPU time taken by the job.
duration: Duration,
},
/// The candidate is invalid.
InvalidCandidate(String),
/// The job timed out.
TimedOut,
/// Some internal error occurred. Should only be used for errors independent of the candidate.
InternalError(String),
}
impl Response {
fn format_invalid(ctx: &'static str, msg: &str) -> Self {
/// Creates an invalid response from a context `ctx` and a message `msg` (which can be empty).
pub fn format_invalid(ctx: &'static str, msg: &str) -> Self {
if msg.is_empty() {
Self::InvalidCandidate(ctx.to_string())
} else {
Self::InvalidCandidate(format!("{}: {}", ctx, msg))
}
}
fn format_internal(ctx: &'static str, msg: &str) -> Self {
/// Creates an internal response from a context `ctx` and a message `msg` (which can be empty).
pub fn format_internal(ctx: &'static str, msg: &str) -> Self {
if msg.is_empty() {
Self::InternalError(ctx.to_string())
} else {
@@ -269,110 +245,3 @@ impl Response {
}
}
}
/// The entrypoint that the spawned execute worker should start with. The `socket_path` specifies
/// the path to the socket used to communicate with the host. The `node_version`, if `Some`,
/// is checked against the worker version. A mismatch results in immediate worker termination.
/// `None` is used for tests and in other situations when version check is not necessary.
pub fn worker_entrypoint(socket_path: &str, node_version: Option<&str>) {
worker_event_loop("execute", socket_path, node_version, |rt_handle, mut stream| async move {
let worker_pid = std::process::id();
let handshake = recv_handshake(&mut stream).await?;
let executor = Arc::new(Executor::new(handshake.executor_params).map_err(|e| {
io::Error::new(io::ErrorKind::Other, format!("cannot create executor: {}", e))
})?);
loop {
let (artifact_path, params, execution_timeout) = recv_request(&mut stream).await?;
gum::debug!(
target: LOG_TARGET,
%worker_pid,
"worker: validating artifact {}",
artifact_path.display(),
);
// Used to signal to the cpu time monitor thread that it can finish.
let (finished_tx, finished_rx) = channel::<()>();
let cpu_time_start = ProcessTime::now();
// Spawn a new thread that runs the CPU time monitor.
let cpu_time_monitor_fut = rt_handle
.spawn_blocking(move || {
cpu_time_monitor_loop(cpu_time_start, execution_timeout, finished_rx)
})
.fuse();
let executor_2 = executor.clone();
let execute_fut = rt_handle
.spawn_blocking(move || {
validate_using_artifact(&artifact_path, &params, executor_2, cpu_time_start)
})
.fuse();
pin_mut!(cpu_time_monitor_fut);
pin_mut!(execute_fut);
let response = select_biased! {
// If this future is not selected, the join handle is dropped and the thread will
// finish in the background.
cpu_time_monitor_res = cpu_time_monitor_fut => {
match cpu_time_monitor_res {
Ok(Some(cpu_time_elapsed)) => {
// Log if we exceed the timeout and the other thread hasn't finished.
gum::warn!(
target: LOG_TARGET,
%worker_pid,
"execute job took {}ms cpu time, exceeded execute timeout {}ms",
cpu_time_elapsed.as_millis(),
execution_timeout.as_millis(),
);
Response::TimedOut
},
Ok(None) => Response::InternalError("error communicating over finished channel".into()),
Err(e) => Response::format_internal("cpu time monitor thread error", &e.to_string()),
}
},
execute_res = execute_fut => {
let _ = finished_tx.send(());
execute_res.unwrap_or_else(|e| Response::format_internal("execute thread error", &e.to_string()))
},
};
send_response(&mut stream, response).await?;
}
});
}
fn validate_using_artifact(
artifact_path: &Path,
params: &[u8],
executor: Arc<Executor>,
cpu_time_start: ProcessTime,
) -> Response {
// Check here if the file exists, because the error from Substrate is not match-able.
// TODO: Re-evaluate after <https://github.com/paritytech/substrate/issues/13860>.
let file_metadata = std::fs::metadata(artifact_path);
if let Err(err) = file_metadata {
return Response::format_internal("execute: could not find or open file", &err.to_string())
}
let descriptor_bytes = match unsafe {
// SAFETY: this should be safe since the compiled artifact passed here comes from the
// file created by the prepare workers. These files are obtained by calling
// [`executor_intf::prepare`].
executor.execute(artifact_path.as_ref(), params)
} {
Err(err) => return Response::format_invalid("execute", &err),
Ok(d) => d,
};
let duration = cpu_time_start.elapsed();
let result_descriptor = match ValidationResult::decode(&mut &descriptor_bytes[..]) {
Err(err) =>
return Response::format_invalid("validation result decoding failed", &err.to_string()),
Ok(r) => r,
};
Response::Ok { result_descriptor, duration }
}
-432
View File
@@ -1,432 +0,0 @@
// Copyright (C) Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! Interface to the Substrate Executor
use polkadot_primitives::{ExecutorParam, ExecutorParams};
use sc_executor_common::{
runtime_blob::RuntimeBlob,
wasm_runtime::{HeapAllocStrategy, InvokeMethod, WasmModule as _},
};
use sc_executor_wasmtime::{Config, DeterministicStackLimit, Semantics};
use sp_core::storage::{ChildInfo, TrackedStorageKey};
use sp_externalities::MultiRemovalResults;
use std::{
any::{Any, TypeId},
path::Path,
};
// Memory configuration
//
// When Substrate Runtime is instantiated, a number of WASM pages are allocated for the Substrate
// Runtime instance's linear memory. The exact number of pages is a sum of whatever the WASM blob
// itself requests (by default at least enough to hold the data section as well as have some space
// left for the stack; this is, of course, overridable at link time when compiling the runtime)
// plus the number of pages specified in the `extra_heap_pages` passed to the executor.
//
// By default, rustc (or `lld` specifically) should allocate 1 MiB for the shadow stack, or 16 pages.
// The data section for runtimes are typically rather small and can fit in a single digit number of
// WASM pages, so let's say an extra 16 pages. Thus let's assume that 32 pages or 2 MiB are used for
// these needs by default.
const DEFAULT_HEAP_PAGES_ESTIMATE: u32 = 32;
const EXTRA_HEAP_PAGES: u32 = 2048;
/// The number of bytes devoted for the stack during wasm execution of a PVF.
const NATIVE_STACK_MAX: u32 = 256 * 1024 * 1024;
// VALUES OF THE DEFAULT CONFIGURATION SHOULD NEVER BE CHANGED
// They are used as base values for the execution environment parametrization.
// To overwrite them, add new ones to `EXECUTOR_PARAMS` in the `session_info` pallet and perform
// a runtime upgrade to make them active.
const DEFAULT_CONFIG: Config = Config {
allow_missing_func_imports: true,
cache_path: None,
semantics: Semantics {
heap_alloc_strategy: sc_executor_common::wasm_runtime::HeapAllocStrategy::Dynamic {
maximum_pages: Some(DEFAULT_HEAP_PAGES_ESTIMATE + EXTRA_HEAP_PAGES),
},
instantiation_strategy:
sc_executor_wasmtime::InstantiationStrategy::RecreateInstanceCopyOnWrite,
// Enable deterministic stack limit to pin down the exact number of items the wasmtime stack
// can contain before it traps with stack overflow.
//
// Here is how the values below were chosen.
//
// At the moment of writing, the default native stack size limit is 1 MiB. Assuming a logical item
// (see the docs about the field and the instrumentation algorithm) is 8 bytes, 1 MiB can
// fit 2x 65536 logical items.
//
// Since reaching the native stack limit is undesirable, we halve the logical item limit and
// also increase the native 256x. This hopefully should preclude wasm code from reaching
// the stack limit set by the wasmtime.
deterministic_stack_limit: Some(DeterministicStackLimit {
logical_max: 65536,
native_stack_max: NATIVE_STACK_MAX,
}),
canonicalize_nans: true,
// Rationale for turning the multi-threaded compilation off is to make the preparation time
// easily reproducible and as deterministic as possible.
//
// Currently the prepare queue doesn't distinguish between precheck and prepare requests.
// On the one hand, it simplifies the code, on the other, however, slows down compile times
// for execute requests. This behavior may change in future.
parallel_compilation: false,
// WASM extensions. Only those that are meaningful to us may be controlled here. By default,
// we're using WASM MVP, which means all the extensions are disabled. Nevertheless, some
// extensions (e.g., sign extension ops) are enabled by Wasmtime and cannot be disabled.
wasm_reference_types: false,
wasm_simd: false,
wasm_bulk_memory: false,
wasm_multi_value: false,
},
};
/// Runs the prevalidation on the given code. Returns a [`RuntimeBlob`] if it succeeds.
pub fn prevalidate(code: &[u8]) -> Result<RuntimeBlob, sc_executor_common::error::WasmError> {
let blob = RuntimeBlob::new(code)?;
// It's assumed this function will take care of any prevalidation logic
// that needs to be done.
//
// Do nothing for now.
Ok(blob)
}
/// Runs preparation on the given runtime blob. If successful, it returns a serialized compiled
/// artifact which can then be used to pass into `Executor::execute` after writing it to the disk.
pub fn prepare(
blob: RuntimeBlob,
executor_params: &ExecutorParams,
) -> Result<Vec<u8>, sc_executor_common::error::WasmError> {
let semantics = params_to_wasmtime_semantics(executor_params)
.map_err(|e| sc_executor_common::error::WasmError::Other(e))?;
sc_executor_wasmtime::prepare_runtime_artifact(blob, &semantics)
}
fn params_to_wasmtime_semantics(par: &ExecutorParams) -> Result<Semantics, String> {
let mut sem = DEFAULT_CONFIG.semantics.clone();
let mut stack_limit = if let Some(stack_limit) = sem.deterministic_stack_limit.clone() {
stack_limit
} else {
return Err("No default stack limit set".to_owned())
};
for p in par.iter() {
match p {
ExecutorParam::MaxMemoryPages(max_pages) =>
sem.heap_alloc_strategy =
HeapAllocStrategy::Dynamic { maximum_pages: Some(*max_pages) },
ExecutorParam::StackLogicalMax(slm) => stack_limit.logical_max = *slm,
ExecutorParam::StackNativeMax(snm) => stack_limit.native_stack_max = *snm,
ExecutorParam::WasmExtBulkMemory => sem.wasm_bulk_memory = true,
ExecutorParam::PrecheckingMaxMemory(_) => (), // TODO: Not implemented yet
ExecutorParam::PvfPrepTimeout(_, _) | ExecutorParam::PvfExecTimeout(_, _) => (), // Not used here
}
}
sem.deterministic_stack_limit = Some(stack_limit);
Ok(sem)
}
pub struct Executor {
thread_pool: rayon::ThreadPool,
config: Config,
}
impl Executor {
pub fn new(params: ExecutorParams) -> Result<Self, String> {
// Wasmtime powers the Substrate Executor. It compiles the wasm bytecode into native code.
// That native code does not create any stacks and just reuses the stack of the thread that
// wasmtime was invoked from.
//
// Also, we configure the executor to provide the deterministic stack and that requires
// supplying the amount of the native stack space that wasm is allowed to use. This is
// realized by supplying the limit into `wasmtime::Config::max_wasm_stack`.
//
// There are quirks to that configuration knob:
//
// 1. It only limits the amount of stack space consumed by wasm but does not ensure nor check
// that the stack space is actually available.
//
// That means, if the calling thread has 1 MiB of stack space left and the wasm code consumes
// more, then the wasmtime limit will **not** trigger. Instead, the wasm code will hit the
// guard page and the Rust stack overflow handler will be triggered. That leads to an
// **abort**.
//
// 2. It cannot and does not limit the stack space consumed by Rust code.
//
// Meaning that if the wasm code leaves no stack space for Rust code, then the Rust code
// will abort and that will abort the process as well.
//
// Typically on Linux the main thread gets the stack size specified by the `ulimit` and
// typically it's configured to 8 MiB. Rust's spawned threads are 2 MiB. OTOH, the
// NATIVE_STACK_MAX is set to 256 MiB. Not nearly enough.
//
// Hence we need to increase it.
//
// The simplest way to fix that is to spawn a thread with the desired stack limit. In order
// to avoid costs of creating a thread, we use a thread pool. The execution is
// single-threaded hence the thread pool has only one thread.
//
// The reasoning why we pick this particular size is:
//
// The default Rust thread stack limit 2 MiB + 256 MiB wasm stack.
let thread_stack_size = 2 * 1024 * 1024 + NATIVE_STACK_MAX as usize;
let thread_pool = rayon::ThreadPoolBuilder::new()
.num_threads(1)
.stack_size(thread_stack_size)
.build()
.map_err(|e| format!("Failed to create thread pool: {:?}", e))?;
let mut config = DEFAULT_CONFIG.clone();
config.semantics = params_to_wasmtime_semantics(&params)?;
Ok(Self { thread_pool, config })
}
/// Executes the given PVF in the form of a compiled artifact and returns the result of execution
/// upon success.
///
/// # Safety
///
/// The caller must ensure that the compiled artifact passed here was:
/// 1) produced by [`prepare`],
/// 2) written to the disk as a file,
/// 3) was not modified,
/// 4) will not be modified while any runtime using this artifact is alive, or is being
/// instantiated.
///
/// Failure to adhere to these requirements might lead to crashes and arbitrary code execution.
pub unsafe fn execute(
&self,
compiled_artifact_path: &Path,
params: &[u8],
) -> Result<Vec<u8>, String> {
let mut result = None;
self.thread_pool.scope({
let result = &mut result;
move |s| {
s.spawn(move |_| {
// spawn does not return a value, so we need to use a variable to pass the result.
*result = Some(
do_execute(compiled_artifact_path, self.config.clone(), params)
.map_err(|err| format!("execute error: {:?}", err)),
);
});
}
});
result.unwrap_or_else(|| Err("rayon thread pool spawn failed".to_string()))
}
}
unsafe fn do_execute(
compiled_artifact_path: &Path,
config: Config,
params: &[u8],
) -> Result<Vec<u8>, sc_executor_common::error::Error> {
let mut extensions = sp_externalities::Extensions::new();
extensions.register(sp_core::traits::ReadRuntimeVersionExt::new(ReadRuntimeVersion));
let mut ext = ValidationExternalities(extensions);
sc_executor::with_externalities_safe(&mut ext, || {
let runtime = sc_executor_wasmtime::create_runtime_from_artifact::<HostFunctions>(
compiled_artifact_path,
config,
)?;
runtime.new_instance()?.call(InvokeMethod::Export("validate_block"), params)
})?
}
type HostFunctions = (
sp_io::misc::HostFunctions,
sp_io::crypto::HostFunctions,
sp_io::hashing::HostFunctions,
sp_io::allocator::HostFunctions,
sp_io::logging::HostFunctions,
sp_io::trie::HostFunctions,
);
/// The validation externalities that will panic on any storage related access.
struct ValidationExternalities(sp_externalities::Extensions);
impl sp_externalities::Externalities for ValidationExternalities {
fn storage(&self, _: &[u8]) -> Option<Vec<u8>> {
panic!("storage: unsupported feature for parachain validation")
}
fn storage_hash(&self, _: &[u8]) -> Option<Vec<u8>> {
panic!("storage_hash: unsupported feature for parachain validation")
}
fn child_storage_hash(&self, _: &ChildInfo, _: &[u8]) -> Option<Vec<u8>> {
panic!("child_storage_hash: unsupported feature for parachain validation")
}
fn child_storage(&self, _: &ChildInfo, _: &[u8]) -> Option<Vec<u8>> {
panic!("child_storage: unsupported feature for parachain validation")
}
fn kill_child_storage(
&mut self,
_child_info: &ChildInfo,
_maybe_limit: Option<u32>,
_maybe_cursor: Option<&[u8]>,
) -> MultiRemovalResults {
panic!("kill_child_storage: unsupported feature for parachain validation")
}
fn clear_prefix(
&mut self,
_prefix: &[u8],
_maybe_limit: Option<u32>,
_maybe_cursor: Option<&[u8]>,
) -> MultiRemovalResults {
panic!("clear_prefix: unsupported feature for parachain validation")
}
fn clear_child_prefix(
&mut self,
_child_info: &ChildInfo,
_prefix: &[u8],
_maybe_limit: Option<u32>,
_maybe_cursor: Option<&[u8]>,
) -> MultiRemovalResults {
panic!("clear_child_prefix: unsupported feature for parachain validation")
}
fn place_storage(&mut self, _: Vec<u8>, _: Option<Vec<u8>>) {
panic!("place_storage: unsupported feature for parachain validation")
}
fn place_child_storage(&mut self, _: &ChildInfo, _: Vec<u8>, _: Option<Vec<u8>>) {
panic!("place_child_storage: unsupported feature for parachain validation")
}
fn storage_root(&mut self, _: sp_core::storage::StateVersion) -> Vec<u8> {
panic!("storage_root: unsupported feature for parachain validation")
}
fn child_storage_root(&mut self, _: &ChildInfo, _: sp_core::storage::StateVersion) -> Vec<u8> {
panic!("child_storage_root: unsupported feature for parachain validation")
}
fn next_child_storage_key(&self, _: &ChildInfo, _: &[u8]) -> Option<Vec<u8>> {
panic!("next_child_storage_key: unsupported feature for parachain validation")
}
fn next_storage_key(&self, _: &[u8]) -> Option<Vec<u8>> {
panic!("next_storage_key: unsupported feature for parachain validation")
}
fn storage_append(&mut self, _key: Vec<u8>, _value: Vec<u8>) {
panic!("storage_append: unsupported feature for parachain validation")
}
fn storage_start_transaction(&mut self) {
panic!("storage_start_transaction: unsupported feature for parachain validation")
}
fn storage_rollback_transaction(&mut self) -> Result<(), ()> {
panic!("storage_rollback_transaction: unsupported feature for parachain validation")
}
fn storage_commit_transaction(&mut self) -> Result<(), ()> {
panic!("storage_commit_transaction: unsupported feature for parachain validation")
}
fn wipe(&mut self) {
panic!("wipe: unsupported feature for parachain validation")
}
fn commit(&mut self) {
panic!("commit: unsupported feature for parachain validation")
}
fn read_write_count(&self) -> (u32, u32, u32, u32) {
panic!("read_write_count: unsupported feature for parachain validation")
}
fn reset_read_write_count(&mut self) {
panic!("reset_read_write_count: unsupported feature for parachain validation")
}
fn get_whitelist(&self) -> Vec<TrackedStorageKey> {
panic!("get_whitelist: unsupported feature for parachain validation")
}
fn set_whitelist(&mut self, _: Vec<TrackedStorageKey>) {
panic!("set_whitelist: unsupported feature for parachain validation")
}
fn set_offchain_storage(&mut self, _: &[u8], _: std::option::Option<&[u8]>) {
panic!("set_offchain_storage: unsupported feature for parachain validation")
}
fn get_read_and_written_keys(&self) -> Vec<(Vec<u8>, u32, u32, bool)> {
panic!("get_read_and_written_keys: unsupported feature for parachain validation")
}
}
impl sp_externalities::ExtensionStore for ValidationExternalities {
fn extension_by_type_id(&mut self, type_id: TypeId) -> Option<&mut dyn Any> {
self.0.get_mut(type_id)
}
fn register_extension_with_type_id(
&mut self,
type_id: TypeId,
extension: Box<dyn sp_externalities::Extension>,
) -> Result<(), sp_externalities::Error> {
self.0.register_with_type_id(type_id, extension)
}
fn deregister_extension_by_type_id(
&mut self,
type_id: TypeId,
) -> Result<(), sp_externalities::Error> {
if self.0.deregister(type_id) {
Ok(())
} else {
Err(sp_externalities::Error::ExtensionIsNotRegistered(type_id))
}
}
}
struct ReadRuntimeVersion;
impl sp_core::traits::ReadRuntimeVersion for ReadRuntimeVersion {
fn read_runtime_version(
&self,
wasm_code: &[u8],
_ext: &mut dyn sp_externalities::Externalities,
) -> Result<Vec<u8>, String> {
let blob = RuntimeBlob::uncompress_if_needed(wasm_code)
.map_err(|e| format!("Failed to read the PVF runtime blob: {:?}", e))?;
match sc_executor::read_embedded_version(&blob)
.map_err(|e| format!("Failed to read the static section from the PVF blob: {:?}", e))?
{
Some(version) => {
use parity_scale_codec::Encode;
Ok(version.encode())
},
None => Err("runtime version section is not found".to_string()),
}
}
}
+13 -19
View File
@@ -29,11 +29,11 @@
//!
//! Then using the handle the client can send three types of requests:
//!
//! (a) PVF pre-checking. This takes the PVF [code][`Pvf`] and tries to prepare it (verify and
//! (a) PVF pre-checking. This takes the `Pvf` code and tries to prepare it (verify and
//! compile) in order to pre-check its validity.
//!
//! (b) PVF execution. This accepts the PVF [`params`][`polkadot_parachain::primitives::ValidationParams`]
//! and the PVF [code][`Pvf`], prepares (verifies and compiles) the code, and then executes PVF
//! and the `Pvf` code, prepares (verifies and compiles) the code, and then executes PVF
//! with the `params`.
//!
//! (c) Heads up. This request allows to signal that the given PVF may be needed soon and that it
@@ -91,7 +91,6 @@
mod artifacts;
mod error;
mod execute;
mod executor_intf;
mod host;
mod metrics;
mod prepare;
@@ -99,27 +98,22 @@ mod priority;
mod pvf;
mod worker_common;
#[doc(hidden)]
pub mod testing;
#[doc(hidden)]
pub use sp_tracing;
pub use artifacts::CompiledArtifact;
pub use error::{InvalidCandidate, PrepareError, PrepareResult, ValidationError};
pub use prepare::PrepareStats;
pub use execute::{ExecuteHandshake, ExecuteResponse};
#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
pub use prepare::MemoryAllocationStats;
pub use prepare::{MemoryStats, PrepareStats};
pub use priority::Priority;
pub use pvf::PvfPrepData;
pub use host::{start, Config, ValidationHost};
pub use metrics::Metrics;
pub use worker_common::JOB_TIMEOUT_WALL_CLOCK_FACTOR;
pub use execute::worker_entrypoint as execute_worker_entrypoint;
pub use prepare::worker_entrypoint as prepare_worker_entrypoint;
pub use executor_intf::{prepare, prevalidate};
pub use sc_executor_common;
pub use sp_maybe_compressed_blob;
pub use worker_common::{framed_recv, framed_send, JOB_TIMEOUT_WALL_CLOCK_FACTOR};
const LOG_TARGET: &str = "parachain::pvf";
#[doc(hidden)]
pub mod testing {
pub use crate::worker_common::{spawn_with_program_path, SpawnErr};
}
@@ -1,237 +0,0 @@
// Copyright (C) Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! Memory stats for preparation.
//!
//! Right now we gather three measurements:
//!
//! - `ru_maxrss` (resident set size) from `getrusage`.
//! - `resident` memory stat provided by `tikv-malloc-ctl`.
//! - `allocated` memory stat also from `tikv-malloc-ctl`.
//!
//! Currently we are only logging these for the purposes of gathering data. In the future, we may
//! use these stats to reject PVFs during pre-checking. See
//! <https://github.com/paritytech/polkadot/issues/6472#issuecomment-1381941762> for more
//! background.
use parity_scale_codec::{Decode, Encode};
/// Helper struct to contain all the memory stats, including [`MemoryAllocationStats`] and, if
/// supported by the OS, `ru_maxrss`.
#[derive(Clone, Debug, Default, Encode, Decode)]
pub struct MemoryStats {
/// Memory stats from `tikv_jemalloc_ctl`.
#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
pub memory_tracker_stats: Option<MemoryAllocationStats>,
/// `ru_maxrss` from `getrusage`. A string error since `io::Error` is not `Encode`able.
#[cfg(target_os = "linux")]
pub max_rss: Option<i64>,
}
/// Statistics of collected memory metrics.
#[non_exhaustive]
#[derive(Clone, Debug, Default, Encode, Decode)]
pub struct MemoryAllocationStats {
/// Total resident memory, in bytes.
pub resident: u64,
/// Total allocated memory, in bytes.
pub allocated: u64,
}
/// Module for the memory tracker. The memory tracker runs in its own thread, where it polls memory
/// usage at an interval.
///
/// NOTE: Requires jemalloc enabled.
#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
pub mod memory_tracker {
use super::*;
use crate::LOG_TARGET;
use std::{
sync::mpsc::{Receiver, RecvTimeoutError, Sender},
time::Duration,
};
use tikv_jemalloc_ctl::{epoch, stats, Error};
use tokio::task::JoinHandle;
#[derive(Clone)]
struct MemoryAllocationTracker {
epoch: tikv_jemalloc_ctl::epoch_mib,
allocated: stats::allocated_mib,
resident: stats::resident_mib,
}
impl MemoryAllocationTracker {
pub fn new() -> Result<Self, Error> {
Ok(Self {
epoch: epoch::mib()?,
allocated: stats::allocated::mib()?,
resident: stats::resident::mib()?,
})
}
pub fn snapshot(&self) -> Result<MemoryAllocationStats, Error> {
// update stats by advancing the allocation epoch
self.epoch.advance()?;
// Convert to `u64`, as `usize` is not `Encode`able.
let allocated = self.allocated.read()? as u64;
let resident = self.resident.read()? as u64;
Ok(MemoryAllocationStats { allocated, resident })
}
}
/// Runs a thread in the background that observes memory statistics. The goal is to try to get
/// accurate stats during preparation.
///
/// # Algorithm
///
/// 1. Create the memory tracker.
///
/// 2. Sleep for some short interval. Whenever we wake up, take a snapshot by updating the
/// allocation epoch.
///
/// 3. When we receive a signal that preparation has completed, take one last snapshot and return
/// the maximum observed values.
///
/// # Errors
///
/// For simplicity, any errors are returned as a string. As this is not a critical component, errors
/// are used for informational purposes (logging) only.
pub fn memory_tracker_loop(finished_rx: Receiver<()>) -> Result<MemoryAllocationStats, String> {
// This doesn't need to be too fine-grained since preparation currently takes 3-10s or more.
// Apart from that, there is not really a science to this number.
const POLL_INTERVAL: Duration = Duration::from_millis(100);
let tracker = MemoryAllocationTracker::new().map_err(|err| err.to_string())?;
let mut max_stats = MemoryAllocationStats::default();
let mut update_stats = || -> Result<(), String> {
let current_stats = tracker.snapshot().map_err(|err| err.to_string())?;
if current_stats.resident > max_stats.resident {
max_stats.resident = current_stats.resident;
}
if current_stats.allocated > max_stats.allocated {
max_stats.allocated = current_stats.allocated;
}
Ok(())
};
loop {
// Take a snapshot and update the max stats.
update_stats()?;
// Sleep.
match finished_rx.recv_timeout(POLL_INTERVAL) {
// Received finish signal.
Ok(()) => {
update_stats()?;
return Ok(max_stats)
},
// Timed out, restart loop.
Err(RecvTimeoutError::Timeout) => continue,
Err(RecvTimeoutError::Disconnected) =>
return Err("memory_tracker_loop: finished_rx disconnected".into()),
}
}
}
/// Helper function to terminate the memory tracker thread and get the stats. Helps isolate all this
/// error handling.
pub async fn get_memory_tracker_loop_stats(
fut: JoinHandle<Result<MemoryAllocationStats, String>>,
tx: Sender<()>,
worker_pid: u32,
) -> Option<MemoryAllocationStats> {
// Signal to the memory tracker thread to terminate.
if let Err(err) = tx.send(()) {
gum::warn!(
target: LOG_TARGET,
%worker_pid,
"worker: error sending signal to memory tracker_thread: {}",
err
);
None
} else {
// Join on the thread handle.
match fut.await {
Ok(Ok(stats)) => Some(stats),
Ok(Err(err)) => {
gum::warn!(
target: LOG_TARGET,
%worker_pid,
"worker: error occurred in the memory tracker thread: {}", err
);
None
},
Err(err) => {
gum::warn!(
target: LOG_TARGET,
%worker_pid,
"worker: error joining on memory tracker thread: {}", err
);
None
},
}
}
}
}
/// Module for dealing with the `ru_maxrss` (peak resident memory) stat from `getrusage`.
///
/// NOTE: `getrusage` with the `RUSAGE_THREAD` parameter is only supported on Linux. `RUSAGE_SELF`
/// works on MacOS, but we need to get the max rss only for the preparation thread. Gettng it for
/// the current process would conflate the stats of previous jobs run by the process.
#[cfg(target_os = "linux")]
pub mod max_rss_stat {
use crate::LOG_TARGET;
use core::mem::MaybeUninit;
use libc::{getrusage, rusage, RUSAGE_THREAD};
use std::io;
/// Get the rusage stats for the current thread.
fn getrusage_thread() -> io::Result<rusage> {
let mut result: MaybeUninit<rusage> = MaybeUninit::zeroed();
// SAFETY: `result` is a valid pointer, so calling this is safe.
if unsafe { getrusage(RUSAGE_THREAD, result.as_mut_ptr()) } == -1 {
return Err(io::Error::last_os_error())
}
// SAFETY: `result` was successfully initialized by `getrusage`.
unsafe { Ok(result.assume_init()) }
}
/// Gets the `ru_maxrss` for the current thread.
pub fn get_max_rss_thread() -> io::Result<i64> {
// `c_long` is either `i32` or `i64` depending on architecture. `i64::from` always works.
getrusage_thread().map(|rusage| i64::from(rusage.ru_maxrss))
}
/// Extracts the max_rss stat and logs any error.
pub fn extract_max_rss_stat(max_rss: io::Result<i64>, worker_pid: u32) -> Option<i64> {
max_rss
.map_err(|err| {
gum::warn!(
target: LOG_TARGET,
%worker_pid,
"error getting `ru_maxrss` in preparation thread: {}",
err
);
err
})
.ok()
}
}
+28 -7
View File
@@ -20,23 +20,44 @@
//! (by running [`start_pool`]).
//!
//! The pool will spawn workers in new processes and those should execute pass control to
//! [`worker_entrypoint`].
//! `polkadot_node_core_pvf_worker::prepare_worker_entrypoint`.
mod memory_stats;
mod pool;
mod queue;
mod worker;
mod worker_intf;
pub use memory_stats::MemoryStats;
pub use pool::start as start_pool;
pub use queue::{start as start_queue, FromQueue, ToQueue};
pub use worker::worker_entrypoint;
use parity_scale_codec::{Decode, Encode};
/// Preparation statistics, including the CPU time and memory taken.
#[derive(Debug, Clone, Default, Encode, Decode)]
pub struct PrepareStats {
cpu_time_elapsed: std::time::Duration,
memory_stats: MemoryStats,
/// The CPU time that elapsed for the preparation job.
pub cpu_time_elapsed: std::time::Duration,
/// The observed memory statistics for the preparation job.
pub memory_stats: MemoryStats,
}
/// Helper struct to contain all the memory stats, including `MemoryAllocationStats` and, if
/// supported by the OS, `ru_maxrss`.
#[derive(Clone, Debug, Default, Encode, Decode)]
pub struct MemoryStats {
/// Memory stats from `tikv_jemalloc_ctl`.
#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
pub memory_tracker_stats: Option<MemoryAllocationStats>,
/// `ru_maxrss` from `getrusage`. `None` if an error occurred.
#[cfg(target_os = "linux")]
pub max_rss: Option<i64>,
}
/// Statistics of collected memory metrics.
#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
#[derive(Clone, Debug, Default, Encode, Decode)]
pub struct MemoryAllocationStats {
/// Total resident memory, in bytes.
pub resident: u64,
/// Total allocated memory, in bytes.
pub allocated: u64,
}
+3 -3
View File
@@ -14,7 +14,7 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use super::worker::{self, Outcome};
use super::worker_intf::{self, Outcome};
use crate::{
error::{PrepareError, PrepareResult},
metrics::Metrics,
@@ -250,7 +250,7 @@ async fn spawn_worker_task(program_path: PathBuf, spawn_timeout: Duration) -> Po
use futures_timer::Delay;
loop {
match worker::spawn(&program_path, spawn_timeout).await {
match worker_intf::spawn(&program_path, spawn_timeout).await {
Ok((idle, handle)) => break PoolEvent::Spawn(idle, handle),
Err(err) => {
gum::warn!(target: LOG_TARGET, "failed to spawn a prepare worker: {:?}", err);
@@ -271,7 +271,7 @@ async fn start_work_task<Timer>(
artifact_path: PathBuf,
_preparation_timer: Option<Timer>,
) -> PoolEvent {
let outcome = worker::start_work(&metrics, idle, pvf, &cache_path, artifact_path).await;
let outcome = worker_intf::start_work(&metrics, idle, pvf, &cache_path, artifact_path).await;
PoolEvent::StartWork(worker, outcome)
}
+1 -1
View File
@@ -226,7 +226,7 @@ async fn handle_enqueue(
target: LOG_TARGET,
validation_code_hash = ?pvf.code_hash(),
?priority,
preparation_timeout = ?pvf.prep_timeout,
preparation_timeout = ?pvf.prep_timeout(),
"PVF is enqueued for preparation.",
);
queue.metrics.prepare_enqueued();
@@ -14,33 +14,24 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
#[cfg(target_os = "linux")]
use super::memory_stats::max_rss_stat::{extract_max_rss_stat, get_max_rss_thread};
#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
use super::memory_stats::memory_tracker::{get_memory_tracker_loop_stats, memory_tracker_loop};
use super::memory_stats::MemoryStats;
//! Host interface to the prepare worker.
use crate::{
artifacts::CompiledArtifact,
error::{PrepareError, PrepareResult},
metrics::Metrics,
prepare::PrepareStats,
pvf::PvfPrepData,
worker_common::{
bytes_to_path, cpu_time_monitor_loop, framed_recv, framed_send, path_to_bytes,
spawn_with_program_path, tmpfile_in, worker_event_loop, IdleWorker, SpawnErr, WorkerHandle,
JOB_TIMEOUT_WALL_CLOCK_FACTOR,
framed_recv, framed_send, path_to_bytes, spawn_with_program_path, tmpfile_in, IdleWorker,
SpawnErr, WorkerHandle, JOB_TIMEOUT_WALL_CLOCK_FACTOR,
},
LOG_TARGET,
};
use cpu_time::ProcessTime;
use futures::{pin_mut, select_biased, FutureExt};
use parity_scale_codec::{Decode, Encode};
use sp_core::hexdisplay::HexDisplay;
use std::{
panic,
path::{Path, PathBuf},
sync::mpsc::channel,
time::Duration,
};
use tokio::{io, net::UnixStream};
@@ -104,7 +95,7 @@ pub async fn start_work(
);
with_tmp_file(stream, pid, cache_path, |tmp_file, mut stream| async move {
let preparation_timeout = pvf.prep_timeout;
let preparation_timeout = pvf.prep_timeout();
if let Err(err) = send_request(&mut stream, pvf, &tmp_file).await {
gum::warn!(
target: LOG_TARGET,
@@ -285,28 +276,6 @@ async fn send_request(
Ok(())
}
async fn recv_request(stream: &mut UnixStream) -> io::Result<(PvfPrepData, PathBuf)> {
let pvf = framed_recv(stream).await?;
let pvf = PvfPrepData::decode(&mut &pvf[..]).map_err(|e| {
io::Error::new(
io::ErrorKind::Other,
format!("prepare pvf recv_request: failed to decode PvfPrepData: {}", e),
)
})?;
let tmp_file = framed_recv(stream).await?;
let tmp_file = bytes_to_path(&tmp_file).ok_or_else(|| {
io::Error::new(
io::ErrorKind::Other,
"prepare pvf recv_request: non utf-8 artifact path".to_string(),
)
})?;
Ok((pvf, tmp_file))
}
async fn send_response(stream: &mut UnixStream, result: PrepareResult) -> io::Result<()> {
framed_send(stream, &result.encode()).await
}
async fn recv_response(stream: &mut UnixStream, pid: u32) -> io::Result<PrepareResult> {
let result = framed_recv(stream).await?;
let result = PrepareResult::decode(&mut &result[..]).map_err(|e| {
@@ -325,158 +294,3 @@ async fn recv_response(stream: &mut UnixStream, pid: u32) -> io::Result<PrepareR
})?;
Ok(result)
}
/// The entrypoint that the spawned prepare worker should start with. The `socket_path` specifies
/// the path to the socket used to communicate with the host. The `node_version`, if `Some`,
/// is checked against the worker version. A mismatch results in immediate worker termination.
/// `None` is used for tests and in other situations when version check is not necessary.
///
/// # Flow
///
/// This runs the following in a loop:
///
/// 1. Get the code and parameters for preparation from the host.
///
/// 2. Start a memory tracker in a separate thread.
///
/// 3. Start the CPU time monitor loop and the actual preparation in two separate threads.
///
/// 4. Select on the two threads created in step 3. If the CPU timeout was hit, the CPU time monitor
/// thread will trigger first.
///
/// 5. Stop the memory tracker and get the stats.
///
/// 6. If compilation succeeded, write the compiled artifact into a temporary file.
///
/// 7. Send the result of preparation back to the host. If any error occurred in the above steps, we
/// send that in the `PrepareResult`.
pub fn worker_entrypoint(socket_path: &str, node_version: Option<&str>) {
worker_event_loop("prepare", socket_path, node_version, |rt_handle, mut stream| async move {
let worker_pid = std::process::id();
loop {
let (pvf, dest) = recv_request(&mut stream).await?;
gum::debug!(
target: LOG_TARGET,
%worker_pid,
"worker: preparing artifact",
);
let cpu_time_start = ProcessTime::now();
let preparation_timeout = pvf.prep_timeout;
// Run the memory tracker.
#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
let (memory_tracker_tx, memory_tracker_rx) = channel::<()>();
#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
let memory_tracker_fut = rt_handle.spawn_blocking(move || memory_tracker_loop(memory_tracker_rx));
// Spawn a new thread that runs the CPU time monitor.
let (cpu_time_monitor_tx, cpu_time_monitor_rx) = channel::<()>();
let cpu_time_monitor_fut = rt_handle
.spawn_blocking(move || {
cpu_time_monitor_loop(cpu_time_start, preparation_timeout, cpu_time_monitor_rx)
})
.fuse();
// Spawn another thread for preparation.
let prepare_fut = rt_handle
.spawn_blocking(move || {
let result = prepare_artifact(pvf);
// Get the `ru_maxrss` stat. If supported, call getrusage for the thread.
#[cfg(target_os = "linux")]
let result = result.map(|artifact| (artifact, get_max_rss_thread()));
result
})
.fuse();
pin_mut!(cpu_time_monitor_fut);
pin_mut!(prepare_fut);
let result = select_biased! {
// If this future is not selected, the join handle is dropped and the thread will
// finish in the background.
join_res = cpu_time_monitor_fut => {
match join_res {
Ok(Some(cpu_time_elapsed)) => {
// Log if we exceed the timeout and the other thread hasn't finished.
gum::warn!(
target: LOG_TARGET,
%worker_pid,
"prepare job took {}ms cpu time, exceeded prepare timeout {}ms",
cpu_time_elapsed.as_millis(),
preparation_timeout.as_millis(),
);
Err(PrepareError::TimedOut)
},
Ok(None) => Err(PrepareError::IoErr("error communicating over finished channel".into())),
Err(err) => Err(PrepareError::IoErr(err.to_string())),
}
},
prepare_res = prepare_fut => {
let cpu_time_elapsed = cpu_time_start.elapsed();
let _ = cpu_time_monitor_tx.send(());
match prepare_res.unwrap_or_else(|err| Err(PrepareError::IoErr(err.to_string()))) {
Err(err) => {
// Serialized error will be written into the socket.
Err(err)
},
Ok(ok) => {
// Stop the memory stats worker and get its observed memory stats.
#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
let memory_tracker_stats =
get_memory_tracker_loop_stats(memory_tracker_fut, memory_tracker_tx, worker_pid).await;
#[cfg(target_os = "linux")]
let (ok, max_rss) = ok;
let memory_stats = MemoryStats {
#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
memory_tracker_stats,
#[cfg(target_os = "linux")]
max_rss: extract_max_rss_stat(max_rss, worker_pid),
};
// Write the serialized artifact into a temp file.
//
// PVF host only keeps artifacts statuses in its memory, successfully
// compiled code gets stored on the disk (and consequently deserialized
// by execute-workers). The prepare worker is only required to send `Ok`
// to the pool to indicate the success.
gum::debug!(
target: LOG_TARGET,
%worker_pid,
"worker: writing artifact to {}",
dest.display(),
);
tokio::fs::write(&dest, &ok).await?;
Ok(PrepareStats{cpu_time_elapsed, memory_stats})
},
}
},
};
send_response(&mut stream, result).await?;
}
});
}
fn prepare_artifact(pvf: PvfPrepData) -> Result<CompiledArtifact, PrepareError> {
panic::catch_unwind(|| {
let blob = match crate::executor_intf::prevalidate(&pvf.code()) {
Err(err) => return Err(PrepareError::Prevalidation(format!("{:?}", err))),
Ok(b) => b,
};
match crate::executor_intf::prepare(blob, &pvf.executor_params()) {
Ok(compiled_artifact) => Ok(CompiledArtifact::new(compiled_artifact)),
Err(err) => Err(PrepareError::Preparation(format!("{:?}", err))),
}
})
.map_err(|panic_payload| {
PrepareError::Panic(crate::error::stringify_panic_payload(panic_payload))
})
.and_then(|inner_result| inner_result)
}
+11 -6
View File
@@ -36,13 +36,13 @@ use crate::host::tests::TEST_PREPARATION_TIMEOUT;
#[derive(Clone, Encode, Decode)]
pub struct PvfPrepData {
/// Wasm code (uncompressed)
pub(crate) code: Arc<Vec<u8>>,
code: Arc<Vec<u8>>,
/// Wasm code hash
pub(crate) code_hash: ValidationCodeHash,
code_hash: ValidationCodeHash,
/// Executor environment parameters for the session for which artifact is prepared
pub(crate) executor_params: Arc<ExecutorParams>,
executor_params: Arc<ExecutorParams>,
/// Preparation timeout
pub(crate) prep_timeout: Duration,
prep_timeout: Duration,
}
impl PvfPrepData {
@@ -69,15 +69,20 @@ impl PvfPrepData {
}
/// Returns PVF code
pub(crate) fn code(&self) -> Arc<Vec<u8>> {
pub fn code(&self) -> Arc<Vec<u8>> {
self.code.clone()
}
/// Returns executor params
pub(crate) fn executor_params(&self) -> Arc<ExecutorParams> {
pub fn executor_params(&self) -> Arc<ExecutorParams> {
self.executor_params.clone()
}
/// Returns preparation timeout.
pub fn prep_timeout(&self) -> Duration {
self.prep_timeout
}
/// Creates a structure for tests
#[cfg(test)]
pub(crate) fn from_discriminator_and_timeout(num: u32, timeout: Duration) -> Self {
-97
View File
@@ -1,97 +0,0 @@
// Copyright (C) Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! Various things for testing other crates.
//!
//! N.B. This is not guarded with some feature flag. Overexposing items here may affect the final
//! artifact even for production builds.
use polkadot_primitives::ExecutorParams;
pub mod worker_common {
pub use crate::worker_common::{spawn_with_program_path, SpawnErr};
}
/// A function that emulates the stitches together behaviors of the preparation and the execution
/// worker in a single synchronous function.
pub fn validate_candidate(
code: &[u8],
params: &[u8],
) -> Result<Vec<u8>, Box<dyn std::error::Error>> {
use crate::executor_intf::{prepare, prevalidate, Executor};
let code = sp_maybe_compressed_blob::decompress(code, 10 * 1024 * 1024)
.expect("Decompressing code failed");
let blob = prevalidate(&code)?;
let artifact = prepare(blob, &ExecutorParams::default())?;
let tmpdir = tempfile::tempdir()?;
let artifact_path = tmpdir.path().join("blob");
std::fs::write(&artifact_path, &artifact)?;
let executor = Executor::new(ExecutorParams::default())?;
let result = unsafe {
// SAFETY: This is trivially safe since the artifact is obtained by calling `prepare`
// and is written into a temporary directory in an unmodified state.
executor.execute(&artifact_path, params)?
};
Ok(result)
}
/// Use this macro to declare a `fn main() {}` that will check the arguments and dispatch them to
/// the appropriate worker, making the executable that can be used for spawning workers.
#[macro_export]
macro_rules! decl_puppet_worker_main {
() => {
fn main() {
$crate::sp_tracing::try_init_simple();
let args = std::env::args().collect::<Vec<_>>();
if args.len() < 3 {
panic!("wrong number of arguments");
}
let mut version = None;
let mut socket_path: &str = "";
for i in 2..args.len() {
match args[i].as_ref() {
"--socket-path" => socket_path = args[i + 1].as_str(),
"--node-version" => version = Some(args[i + 1].as_str()),
_ => (),
}
}
let subcommand = &args[1];
match subcommand.as_ref() {
"exit" => {
std::process::exit(1);
},
"sleep" => {
std::thread::sleep(std::time::Duration::from_secs(5));
},
"prepare-worker" => {
$crate::prepare_worker_entrypoint(&socket_path, version);
},
"execute-worker" => {
$crate::execute_worker_entrypoint(&socket_path, version);
},
other => panic!("unknown subcommand: {}", other),
}
}
};
}
+3 -117
View File
@@ -17,8 +17,7 @@
//! Common logic for implementation of worker processes.
use crate::LOG_TARGET;
use cpu_time::ProcessTime;
use futures::{never::Never, FutureExt as _};
use futures::FutureExt as _;
use futures_timer::Delay;
use pin_project::pin_project;
use rand::Rng;
@@ -26,7 +25,6 @@ use std::{
fmt, mem,
path::{Path, PathBuf},
pin::Pin,
sync::mpsc::{Receiver, RecvTimeoutError},
task::{Context, Poll},
time::Duration,
};
@@ -34,17 +32,12 @@ use tokio::{
io::{self, AsyncRead, AsyncReadExt as _, AsyncWrite, AsyncWriteExt as _, ReadBuf},
net::{UnixListener, UnixStream},
process,
runtime::{Handle, Runtime},
};
/// A multiple of the job timeout (in CPU time) for which we are willing to wait on the host (in
/// wall clock time). This is lenient because CPU time may go slower than wall clock time.
pub const JOB_TIMEOUT_WALL_CLOCK_FACTOR: u32 = 4;
/// Some allowed overhead that we account for in the "CPU time monitor" thread's sleeps, on the
/// child process.
pub const JOB_TIMEOUT_OVERHEAD: Duration = Duration::from_millis(50);
/// This is publicly exposed only for integration tests.
#[doc(hidden)]
pub async fn spawn_with_program_path(
@@ -171,92 +164,6 @@ pub async fn tmpfile(prefix: &str) -> io::Result<PathBuf> {
tmpfile_in(prefix, &temp_dir).await
}
pub fn worker_event_loop<F, Fut>(
debug_id: &'static str,
socket_path: &str,
node_version: Option<&str>,
mut event_loop: F,
) where
F: FnMut(Handle, UnixStream) -> Fut,
Fut: futures::Future<Output = io::Result<Never>>,
{
let worker_pid = std::process::id();
gum::debug!(target: LOG_TARGET, %worker_pid, "starting pvf worker ({})", debug_id);
// Check for a mismatch between the node and worker versions.
if let Some(version) = node_version {
if version != env!("SUBSTRATE_CLI_IMPL_VERSION") {
gum::error!(
target: LOG_TARGET,
%worker_pid,
"Node and worker version mismatch, node needs restarting, forcing shutdown",
);
kill_parent_node_in_emergency();
let err: io::Result<Never> =
Err(io::Error::new(io::ErrorKind::Unsupported, "Version mismatch"));
gum::debug!(target: LOG_TARGET, %worker_pid, "quitting pvf worker({}): {:?}", debug_id, err);
return
}
}
// Run the main worker loop.
let rt = Runtime::new().expect("Creates tokio runtime. If this panics the worker will die and the host will detect that and deal with it.");
let handle = rt.handle();
let err = rt
.block_on(async move {
let stream = UnixStream::connect(socket_path).await?;
let _ = tokio::fs::remove_file(socket_path).await;
let result = event_loop(handle.clone(), stream).await;
result
})
// It's never `Ok` because it's `Ok(Never)`.
.unwrap_err();
gum::debug!(target: LOG_TARGET, %worker_pid, "quitting pvf worker ({}): {:?}", debug_id, err);
// We don't want tokio to wait for the tasks to finish. We want to bring down the worker as fast
// as possible and not wait for stalled validation to finish. This isn't strictly necessary now,
// but may be in the future.
rt.shutdown_background();
}
/// Loop that runs in the CPU time monitor thread on prepare and execute jobs. Continuously wakes up
/// and then either blocks for the remaining CPU time, or returns if we exceed the CPU timeout.
///
/// Returning `Some` indicates that we should send a `TimedOut` error to the host. Will return
/// `None` if the other thread finishes first, without us timing out.
///
/// NOTE: Sending a `TimedOut` error to the host will cause the worker, whether preparation or
/// execution, to be killed by the host. We do not kill the process here because it would interfere
/// with the proper handling of this error.
pub fn cpu_time_monitor_loop(
cpu_time_start: ProcessTime,
timeout: Duration,
finished_rx: Receiver<()>,
) -> Option<Duration> {
loop {
let cpu_time_elapsed = cpu_time_start.elapsed();
// Treat the timeout as CPU time, which is less subject to variance due to load.
if cpu_time_elapsed <= timeout {
// Sleep for the remaining CPU time, plus a bit to account for overhead. Note that the sleep
// is wall clock time. The CPU clock may be slower than the wall clock.
let sleep_interval = timeout.saturating_sub(cpu_time_elapsed) + JOB_TIMEOUT_OVERHEAD;
match finished_rx.recv_timeout(sleep_interval) {
// Received finish signal.
Ok(()) => return None,
// Timed out, restart loop.
Err(RecvTimeoutError::Timeout) => continue,
Err(RecvTimeoutError::Disconnected) => return None,
}
}
return Some(cpu_time_elapsed)
}
}
/// A struct that represents an idle worker.
///
/// This struct is supposed to be used as a token that is passed by move into a subroutine that
@@ -405,12 +312,7 @@ pub fn path_to_bytes(path: &Path) -> &[u8] {
path.to_str().expect("non-UTF-8 path").as_bytes()
}
/// Interprets the given bytes as a path. Returns `None` if the given bytes do not constitute a
/// a proper utf-8 string.
pub fn bytes_to_path(bytes: &[u8]) -> Option<PathBuf> {
std::str::from_utf8(bytes).ok().map(PathBuf::from)
}
/// Write some data prefixed by its length into `w`.
pub async fn framed_send(w: &mut (impl AsyncWrite + Unpin), buf: &[u8]) -> io::Result<()> {
let len_buf = buf.len().to_le_bytes();
w.write_all(&len_buf).await?;
@@ -418,6 +320,7 @@ pub async fn framed_send(w: &mut (impl AsyncWrite + Unpin), buf: &[u8]) -> io::R
Ok(())
}
/// Read some data prefixed by its length from `r`.
pub async fn framed_recv(r: &mut (impl AsyncRead + Unpin)) -> io::Result<Vec<u8>> {
let mut len_buf = [0u8; mem::size_of::<usize>()];
r.read_exact(&mut len_buf).await?;
@@ -426,20 +329,3 @@ pub async fn framed_recv(r: &mut (impl AsyncRead + Unpin)) -> io::Result<Vec<u8>
r.read_exact(&mut buf).await?;
Ok(buf)
}
/// In case of node and worker version mismatch (as a result of in-place upgrade), send `SIGTERM`
/// to the node to tear it down and prevent it from raising disputes on valid candidates. Node
/// restart should be handled by the node owner. As node exits, unix sockets opened to workers
/// get closed by the OS and other workers receive error on socket read and also exit. Preparation
/// jobs are written to the temporary files that are renamed to real artifacts on the node side, so
/// no leftover artifacts are possible.
fn kill_parent_node_in_emergency() {
unsafe {
// SAFETY: `getpid()` never fails but may return "no-parent" (0) or "parent-init" (1) in
// some corner cases, which is checked. `kill()` never fails.
let ppid = libc::getppid();
if ppid > 1 {
libc::kill(ppid, libc::SIGTERM);
}
}
}