PVF: Instantiate wasm in pre-checking (#7246)

* PVF: Instantiate wasm in pre-checking

* Move `runtime_construction_check` to prepare thread, use bytes

* [minor] Update comment

* Fix compile error

* Update Cargo.lock

* Update docs

* Add some missing docs!
This commit is contained in:
Marcin S
2023-06-02 08:04:04 -04:00
committed by GitHub
parent 2d73e39d2f
commit 5bbb87c46b
16 changed files with 437 additions and 355 deletions
@@ -30,8 +30,9 @@ use crate::memory_stats::memory_tracker::{get_memory_tracker_loop_stats, memory_
use parity_scale_codec::{Decode, Encode};
use polkadot_node_core_pvf_common::{
error::{PrepareError, PrepareResult},
executor_intf::Executor,
framed_recv, framed_send,
prepare::{MemoryStats, PrepareStats},
prepare::{MemoryStats, PrepareJobKind, PrepareStats},
pvf::PvfPrepData,
worker::{
bytes_to_path, cpu_time_monitor_loop, stringify_panic_payload,
@@ -40,6 +41,7 @@ use polkadot_node_core_pvf_common::{
},
ProcessTime,
};
use polkadot_primitives::ExecutorParams;
use std::{
path::PathBuf,
sync::{mpsc::channel, Arc},
@@ -117,7 +119,7 @@ pub fn worker_entrypoint(socket_path: &str, node_version: Option<&str>) {
let worker_pid = std::process::id();
loop {
let (pvf, dest) = recv_request(&mut stream).await?;
let (pvf, temp_artifact_dest) = recv_request(&mut stream).await?;
gum::debug!(
target: LOG_TARGET,
%worker_pid,
@@ -125,6 +127,8 @@ pub fn worker_entrypoint(socket_path: &str, node_version: Option<&str>) {
);
let preparation_timeout = pvf.prep_timeout();
let prepare_job_kind = pvf.prep_kind();
let executor_params = (*pvf.executor_params()).clone();
// Conditional variable to notify us when a thread is done.
let condvar = thread::get_condvar();
@@ -151,11 +155,23 @@ pub fn worker_entrypoint(socket_path: &str, node_version: Option<&str>) {
let prepare_thread = thread::spawn_worker_thread(
"prepare thread",
move || {
let result = prepare_artifact(pvf, cpu_time_start);
#[allow(unused_mut)]
let mut result = prepare_artifact(pvf, cpu_time_start);
// Get the `ru_maxrss` stat. If supported, call getrusage for the thread.
#[cfg(target_os = "linux")]
let result = result.map(|(artifact, elapsed)| (artifact, elapsed, get_max_rss_thread()));
let mut result = result.map(|(artifact, elapsed)| (artifact, elapsed, get_max_rss_thread()));
// If we are pre-checking, check for runtime construction errors.
//
// As pre-checking is more strict than just preparation in terms of memory and
// time, it is okay to do extra checks here. This takes negligible time anyway.
if let PrepareJobKind::Prechecking = prepare_job_kind {
result = result.and_then(|output| {
runtime_construction_check(output.0.as_ref(), executor_params)?;
Ok(output)
});
}
result
},
@@ -203,9 +219,9 @@ pub fn worker_entrypoint(socket_path: &str, node_version: Option<&str>) {
target: LOG_TARGET,
%worker_pid,
"worker: writing artifact to {}",
dest.display(),
temp_artifact_dest.display(),
);
tokio::fs::write(&dest, &artifact).await?;
tokio::fs::write(&temp_artifact_dest, &artifact).await?;
Ok(PrepareStats { cpu_time_elapsed, memory_stats })
},
@@ -257,3 +273,18 @@ fn prepare_artifact(
}
.map(|artifact| (artifact, cpu_time_start.elapsed()))
}
/// Try constructing the runtime to catch any instantiation errors during pre-checking.
fn runtime_construction_check(
artifact_bytes: &[u8],
executor_params: ExecutorParams,
) -> Result<(), PrepareError> {
let executor = Executor::new(executor_params)
.map_err(|e| PrepareError::RuntimeConstruction(format!("cannot create executor: {}", e)))?;
// SAFETY: We just compiled this artifact.
let result = unsafe { executor.create_runtime_from_bytes(&artifact_bytes) };
result
.map(|_runtime| ())
.map_err(|err| PrepareError::RuntimeConstruction(format!("{:?}", err)))
}