mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-09 20:11:09 +00:00
pvf: ensure enough stack space (#5712)
* pvf: ensure enough stack space * fix typos Co-authored-by: Andronik <write@reusable.software> * Use rayon to cache the thread Co-authored-by: Andronik <write@reusable.software>
This commit is contained in:
Generated
+4
-3
@@ -1348,12 +1348,12 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "crossbeam-utils"
|
||||
version = "0.8.5"
|
||||
version = "0.8.9"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d82cfc11ce7f2c3faef78d8a684447b40d503d9681acebed6cb728d45940c4db"
|
||||
checksum = "8ff1f980957787286a554052d03c7aee98d99cc32e09f6d45f0a814133c87978"
|
||||
dependencies = [
|
||||
"cfg-if 1.0.0",
|
||||
"lazy_static",
|
||||
"once_cell",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -6768,6 +6768,7 @@ dependencies = [
|
||||
"polkadot-node-subsystem-util",
|
||||
"polkadot-parachain",
|
||||
"rand 0.8.5",
|
||||
"rayon",
|
||||
"sc-executor",
|
||||
"sc-executor-common",
|
||||
"sc-executor-wasmtime",
|
||||
|
||||
@@ -20,6 +20,7 @@ gum = { package = "tracing-gum", path = "../../gum" }
|
||||
pin-project = "1.0.9"
|
||||
rand = "0.8.5"
|
||||
tempfile = "3.3.0"
|
||||
rayon = "1.5.1"
|
||||
parity-scale-codec = { version = "3.1.2", default-features = false, features = ["derive"] }
|
||||
polkadot-parachain = { path = "../../../parachain" }
|
||||
polkadot-core-primitives = { path = "../../../core-primitives" }
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
use parity_scale_codec::{Decode, Encode};
|
||||
use std::any::Any;
|
||||
|
||||
/// Result of PVF preparation performed by the validation host.
|
||||
pub type PrepareResult = Result<(), PrepareError>;
|
||||
@@ -108,3 +109,17 @@ impl From<PrepareError> for ValidationError {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Attempt to convert an opaque panic payload to a string.
|
||||
///
|
||||
/// This is a best effort, and is not guaranteed to provide the most accurate value.
|
||||
pub(crate) fn stringify_panic_payload(payload: Box<dyn Any + Send + 'static>) -> String {
|
||||
match payload.downcast::<&'static str>() {
|
||||
Ok(msg) => msg.to_string(),
|
||||
Err(payload) => match payload.downcast::<String>() {
|
||||
Ok(msg) => *msg,
|
||||
// At least we tried...
|
||||
Err(_) => "unknown panic payload".to_string(),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
@@ -16,7 +16,7 @@
|
||||
|
||||
use crate::{
|
||||
artifacts::ArtifactPathId,
|
||||
executor_intf::TaskExecutor,
|
||||
executor_intf::Executor,
|
||||
worker_common::{
|
||||
bytes_to_path, framed_recv, framed_send, path_to_bytes, spawn_with_program_path,
|
||||
worker_event_loop, IdleWorker, SpawnErr, WorkerHandle,
|
||||
@@ -184,8 +184,8 @@ impl Response {
|
||||
/// the path to the socket used to communicate with the host.
|
||||
pub fn worker_entrypoint(socket_path: &str) {
|
||||
worker_event_loop("execute", socket_path, |mut stream| async move {
|
||||
let executor = TaskExecutor::new().map_err(|e| {
|
||||
io::Error::new(io::ErrorKind::Other, format!("cannot create task executor: {}", e))
|
||||
let executor = Executor::new().map_err(|e| {
|
||||
io::Error::new(io::ErrorKind::Other, format!("cannot create executor: {}", e))
|
||||
})?;
|
||||
loop {
|
||||
let (artifact_path, params) = recv_request(&mut stream).await?;
|
||||
@@ -204,14 +204,14 @@ pub fn worker_entrypoint(socket_path: &str) {
|
||||
async fn validate_using_artifact(
|
||||
artifact_path: &Path,
|
||||
params: &[u8],
|
||||
spawner: &TaskExecutor,
|
||||
executor: &Executor,
|
||||
) -> Response {
|
||||
let validation_started_at = Instant::now();
|
||||
let descriptor_bytes = match unsafe {
|
||||
// SAFETY: this should be safe since the compiled artifact passed here comes from the
|
||||
// file created by the prepare workers. These files are obtained by calling
|
||||
// [`executor_intf::prepare`].
|
||||
crate::executor_intf::execute(artifact_path.as_ref(), params, spawner.clone())
|
||||
executor.execute(artifact_path.as_ref(), params)
|
||||
} {
|
||||
Err(err) => return Response::format_invalid("execute", &err.to_string()),
|
||||
Ok(d) => d,
|
||||
|
||||
@@ -43,6 +43,9 @@ use std::{
|
||||
const DEFAULT_HEAP_PAGES_ESTIMATE: u64 = 32;
|
||||
const EXTRA_HEAP_PAGES: u64 = 2048;
|
||||
|
||||
/// The number of bytes devoted for the stack during wasm execution of a PVF.
|
||||
const NATIVE_STACK_MAX: u32 = 256 * 1024 * 1024;
|
||||
|
||||
const CONFIG: Config = Config {
|
||||
allow_missing_func_imports: true,
|
||||
cache_path: None,
|
||||
@@ -69,7 +72,7 @@ const CONFIG: Config = Config {
|
||||
// the stack limit set by the wasmtime.
|
||||
deterministic_stack_limit: Some(DeterministicStackLimit {
|
||||
logical_max: 65536,
|
||||
native_stack_max: 256 * 1024 * 1024,
|
||||
native_stack_max: NATIVE_STACK_MAX,
|
||||
}),
|
||||
canonicalize_nans: true,
|
||||
// Rationale for turning the multi-threaded compilation off is to make the preparation time
|
||||
@@ -98,20 +101,99 @@ pub fn prepare(blob: RuntimeBlob) -> Result<Vec<u8>, sc_executor_common::error::
|
||||
sc_executor_wasmtime::prepare_runtime_artifact(blob, &CONFIG.semantics)
|
||||
}
|
||||
|
||||
/// Executes the given PVF in the form of a compiled artifact and returns the result of execution
|
||||
/// upon success.
|
||||
///
|
||||
/// # Safety
|
||||
///
|
||||
/// The caller must ensure that the compiled artifact passed here was:
|
||||
/// 1) produced by [`prepare`],
|
||||
/// 2) written to the disk as a file,
|
||||
/// 3) was not modified,
|
||||
/// 4) will not be modified while any runtime using this artifact is alive, or is being
|
||||
/// instantiated.
|
||||
///
|
||||
/// Failure to adhere to these requirements might lead to crashes and arbitrary code execution.
|
||||
pub unsafe fn execute(
|
||||
pub struct Executor {
|
||||
thread_pool: rayon::ThreadPool,
|
||||
spawner: TaskSpawner,
|
||||
}
|
||||
|
||||
impl Executor {
|
||||
pub fn new() -> Result<Self, String> {
|
||||
// Wasmtime powers the Substrate Executor. It compiles the wasm bytecode into native code.
|
||||
// That native code does not create any stacks and just reuses the stack of the thread that
|
||||
// wasmtime was invoked from.
|
||||
//
|
||||
// Also, we configure the executor to provide the deterministic stack and that requires
|
||||
// supplying the amount of the native stack space that wasm is allowed to use. This is
|
||||
// realized by supplying the limit into `wasmtime::Config::max_wasm_stack`.
|
||||
//
|
||||
// There are quirks to that configuration knob:
|
||||
//
|
||||
// 1. It only limits the amount of stack space consumed by wasm but does not ensure nor check
|
||||
// that the stack space is actually available.
|
||||
//
|
||||
// That means, if the calling thread has 1 MiB of stack space left and the wasm code consumes
|
||||
// more, then the wasmtime limit will **not** trigger. Instead, the wasm code will hit the
|
||||
// guard page and the Rust stack overflow handler will be triggered. That leads to an
|
||||
// **abort**.
|
||||
//
|
||||
// 2. It cannot and does not limit the stack space consumed by Rust code.
|
||||
//
|
||||
// Meaning that if the wasm code leaves no stack space for Rust code, then the Rust code
|
||||
// and that will abort the process as well.
|
||||
//
|
||||
// Typically on Linux the main thread gets the stack size specified by the `ulimit` and
|
||||
// typically it's configured to 8 MiB. Rust's spawned threads are 2 MiB. OTOH, the
|
||||
// NATIVE_STACK_MAX is set to 256 MiB. Not nearly enough.
|
||||
//
|
||||
// Hence we need to increase it.
|
||||
//
|
||||
// The simplest way to fix that is to spawn a thread with the desired stack limit. In order
|
||||
// to avoid costs of creating a thread, we use a thread pool. The execution is
|
||||
// single-threaded hence the thread pool has only one thread.
|
||||
//
|
||||
// The reasoning why we pick this particular size is:
|
||||
//
|
||||
// The default Rust thread stack limit 2 MiB + 256 MiB wasm stack.
|
||||
let thread_stack_size = 2 * 1024 * 1024 + NATIVE_STACK_MAX as usize;
|
||||
let thread_pool = rayon::ThreadPoolBuilder::new()
|
||||
.num_threads(1)
|
||||
.stack_size(thread_stack_size)
|
||||
.build()
|
||||
.map_err(|e| format!("Failed to create thread pool: {:?}", e))?;
|
||||
|
||||
let spawner =
|
||||
TaskSpawner::new().map_err(|e| format!("cannot create task spawner: {}", e))?;
|
||||
|
||||
Ok(Self { thread_pool, spawner })
|
||||
}
|
||||
|
||||
/// Executes the given PVF in the form of a compiled artifact and returns the result of execution
|
||||
/// upon success.
|
||||
///
|
||||
/// # Safety
|
||||
///
|
||||
/// The caller must ensure that the compiled artifact passed here was:
|
||||
/// 1) produced by [`prepare`],
|
||||
/// 2) written to the disk as a file,
|
||||
/// 3) was not modified,
|
||||
/// 4) will not be modified while any runtime using this artifact is alive, or is being
|
||||
/// instantiated.
|
||||
///
|
||||
/// Failure to adhere to these requirements might lead to crashes and arbitrary code execution.
|
||||
pub unsafe fn execute(
|
||||
&self,
|
||||
compiled_artifact_path: &Path,
|
||||
params: &[u8],
|
||||
) -> Result<Vec<u8>, String> {
|
||||
let spawner = self.spawner.clone();
|
||||
let mut result = None;
|
||||
self.thread_pool.scope({
|
||||
let result = &mut result;
|
||||
move |s| {
|
||||
s.spawn(move |_| {
|
||||
// spawn does not return a value, so we need to use a variable to pass the result.
|
||||
*result = Some(
|
||||
do_execute(compiled_artifact_path, params, spawner)
|
||||
.map_err(|err| format!("execute error: {:?}", err)),
|
||||
);
|
||||
});
|
||||
}
|
||||
});
|
||||
result.unwrap_or_else(|| Err("rayon thread pool spawn failed".to_string()))
|
||||
}
|
||||
}
|
||||
|
||||
unsafe fn do_execute(
|
||||
compiled_artifact_path: &Path,
|
||||
params: &[u8],
|
||||
spawner: impl sp_core::traits::SpawnNamed + 'static,
|
||||
@@ -291,9 +373,9 @@ impl sp_externalities::ExtensionStore for ValidationExternalities {
|
||||
///
|
||||
/// This is a light handle meaning it will only clone the handle not create a new thread pool.
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct TaskExecutor(futures::executor::ThreadPool);
|
||||
pub(crate) struct TaskSpawner(futures::executor::ThreadPool);
|
||||
|
||||
impl TaskExecutor {
|
||||
impl TaskSpawner {
|
||||
pub(crate) fn new() -> Result<Self, String> {
|
||||
futures::executor::ThreadPoolBuilder::new()
|
||||
.pool_size(4)
|
||||
@@ -304,7 +386,7 @@ impl TaskExecutor {
|
||||
}
|
||||
}
|
||||
|
||||
impl sp_core::traits::SpawnNamed for TaskExecutor {
|
||||
impl sp_core::traits::SpawnNamed for TaskSpawner {
|
||||
fn spawn_blocking(
|
||||
&self,
|
||||
_task_name: &'static str,
|
||||
|
||||
@@ -30,7 +30,7 @@ use async_std::{
|
||||
};
|
||||
use parity_scale_codec::{Decode, Encode};
|
||||
use sp_core::hexdisplay::HexDisplay;
|
||||
use std::{any::Any, panic, sync::Arc, time::Duration};
|
||||
use std::{panic, sync::Arc, time::Duration};
|
||||
|
||||
/// The time period after which the preparation worker is considered unresponsive and will be killed.
|
||||
// NOTE: If you change this make sure to fix the buckets of `pvf_preparation_time` metric.
|
||||
@@ -294,20 +294,8 @@ fn prepare_artifact(code: &[u8]) -> Result<CompiledArtifact, PrepareError> {
|
||||
Err(err) => Err(PrepareError::Preparation(format!("{:?}", err))),
|
||||
}
|
||||
})
|
||||
.map_err(|panic_payload| PrepareError::Panic(stringify_panic_payload(panic_payload)))
|
||||
.map_err(|panic_payload| {
|
||||
PrepareError::Panic(crate::error::stringify_panic_payload(panic_payload))
|
||||
})
|
||||
.and_then(|inner_result| inner_result)
|
||||
}
|
||||
|
||||
/// Attempt to convert an opaque panic payload to a string.
|
||||
///
|
||||
/// This is a best effort, and is not guaranteed to provide the most accurate value.
|
||||
fn stringify_panic_payload(payload: Box<dyn Any + Send + 'static>) -> String {
|
||||
match payload.downcast::<&'static str>() {
|
||||
Ok(msg) => msg.to_string(),
|
||||
Err(payload) => match payload.downcast::<String>() {
|
||||
Ok(msg) => *msg,
|
||||
// At least we tried...
|
||||
Err(_) => "unkown panic payload".to_string(),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
@@ -29,7 +29,7 @@ pub fn validate_candidate(
|
||||
code: &[u8],
|
||||
params: &[u8],
|
||||
) -> Result<Vec<u8>, Box<dyn std::error::Error>> {
|
||||
use crate::executor_intf::{execute, prepare, prevalidate, TaskExecutor};
|
||||
use crate::executor_intf::{prepare, prevalidate, Executor};
|
||||
|
||||
let code = sp_maybe_compressed_blob::decompress(code, 10 * 1024 * 1024)
|
||||
.expect("Decompressing code failed");
|
||||
@@ -40,11 +40,11 @@ pub fn validate_candidate(
|
||||
let artifact_path = tmpdir.path().join("blob");
|
||||
std::fs::write(&artifact_path, &artifact)?;
|
||||
|
||||
let executor = TaskExecutor::new()?;
|
||||
let executor = Executor::new()?;
|
||||
let result = unsafe {
|
||||
// SAFETY: This is trivially safe since the artifact is obtained by calling `prepare`
|
||||
// and is written into a temporary directory in an unmodified state.
|
||||
execute(&artifact_path, params, executor)?
|
||||
executor.execute(&artifact_path, params)?
|
||||
};
|
||||
|
||||
Ok(result)
|
||||
|
||||
Reference in New Issue
Block a user