diff --git a/polkadot/node/core/candidate-validation/src/lib.rs b/polkadot/node/core/candidate-validation/src/lib.rs index efdbeefe13..cf84486f32 100644 --- a/polkadot/node/core/candidate-validation/src/lib.rs +++ b/polkadot/node/core/candidate-validation/src/lib.rs @@ -24,8 +24,8 @@ #![warn(missing_docs)] use polkadot_node_core_pvf::{ - InvalidCandidate as WasmInvalidCandidate, PrepareError, PrepareStats, Pvf, - PvfWithExecutorParams, ValidationError, ValidationHost, + InvalidCandidate as WasmInvalidCandidate, PrepareError, PrepareStats, PvfWithExecutorParams, + ValidationError, ValidationHost, }; use polkadot_node_primitives::{ BlockData, InvalidCandidate, PoV, ValidationResult, POV_BOMB_LIMIT, VALIDATION_CODE_BOMB_LIMIT, @@ -334,7 +334,7 @@ where &validation_code.0, VALIDATION_CODE_BOMB_LIMIT, ) { - Ok(code) => PvfWithExecutorParams::new(Pvf::from_code(code.into_owned()), executor_params), + Ok(code) => PvfWithExecutorParams::from_code(code.into_owned(), executor_params), Err(e) => { gum::debug!(target: LOG_TARGET, err=?e, "precheck: cannot decompress validation code"); return PreCheckOutcome::Invalid @@ -683,7 +683,7 @@ trait ValidationBackend { ) -> Result { // Construct the PVF a single time, since it is an expensive operation. Cloning it is cheap. let pvf_with_params = - PvfWithExecutorParams::new(Pvf::from_code(raw_validation_code), executor_params); + PvfWithExecutorParams::from_code(raw_validation_code, executor_params); let mut validation_result = self.validate_candidate(pvf_with_params.clone(), timeout, params.encode()).await; diff --git a/polkadot/node/core/pvf/src/executor_intf.rs b/polkadot/node/core/pvf/src/executor_intf.rs index cbd76be60c..e5efc90d16 100644 --- a/polkadot/node/core/pvf/src/executor_intf.rs +++ b/polkadot/node/core/pvf/src/executor_intf.rs @@ -104,14 +104,14 @@ pub fn prevalidate(code: &[u8]) -> Result Result, sc_executor_common::error::WasmError> { let semantics = params_to_wasmtime_semantics(executor_params) .map_err(|e| sc_executor_common::error::WasmError::Other(e))?; sc_executor_wasmtime::prepare_runtime_artifact(blob, &semantics) } -fn params_to_wasmtime_semantics(par: ExecutorParams) -> Result { +fn params_to_wasmtime_semantics(par: &ExecutorParams) -> Result { let mut sem = DEFAULT_CONFIG.semantics.clone(); let mut stack_limit = if let Some(stack_limit) = sem.deterministic_stack_limit.clone() { stack_limit @@ -186,7 +186,7 @@ impl Executor { TaskSpawner::new().map_err(|e| format!("cannot create task spawner: {}", e))?; let mut config = DEFAULT_CONFIG.clone(); - config.semantics = params_to_wasmtime_semantics(params)?; + config.semantics = params_to_wasmtime_semantics(¶ms)?; Ok(Self { thread_pool, spawner, config }) } diff --git a/polkadot/node/core/pvf/src/host.rs b/polkadot/node/core/pvf/src/host.rs index b6f515b09d..93a4a59c83 100644 --- a/polkadot/node/core/pvf/src/host.rs +++ b/polkadot/node/core/pvf/src/host.rs @@ -523,7 +523,7 @@ async fn handle_execute_pvf( artifact: ArtifactPathId::new(artifact_id, cache_path), execution_timeout, params, - executor_params: pvf_with_params.executor_params(), + executor_params: (*pvf_with_params.executor_params()).clone(), result_tx, }, ) @@ -534,7 +534,7 @@ async fn handle_execute_pvf( artifact_id, execution_timeout, params, - pvf_with_params.executor_params(), + (*pvf_with_params.executor_params()).clone(), result_tx, ); }, @@ -556,7 +556,7 @@ async fn handle_execute_pvf( waiting_for_response: Vec::new(), num_failures: *num_failures, }; - let executor_params = pvf_with_params.executor_params().clone(); + let executor_params = (*pvf_with_params.executor_params()).clone(); send_prepare( prepare_queue, prepare::ToQueue::Enqueue { @@ -584,7 +584,7 @@ async fn handle_execute_pvf( } else { // Artifact is unknown: register it and enqueue a job with the corresponding priority and // PVF. - let executor_params = pvf_with_params.executor_params(); + let executor_params = (*pvf_with_params.executor_params()).clone(); artifacts.insert_preparing(artifact_id.clone(), Vec::new()); send_prepare( prepare_queue, diff --git a/polkadot/node/core/pvf/src/lib.rs b/polkadot/node/core/pvf/src/lib.rs index 3de5495a2e..9462b4cab1 100644 --- a/polkadot/node/core/pvf/src/lib.rs +++ b/polkadot/node/core/pvf/src/lib.rs @@ -110,7 +110,7 @@ pub use sp_tracing; pub use error::{InvalidCandidate, PrepareError, PrepareResult, ValidationError}; pub use prepare::PrepareStats; pub use priority::Priority; -pub use pvf::{Pvf, PvfWithExecutorParams}; +pub use pvf::PvfWithExecutorParams; pub use host::{start, Config, ValidationHost}; pub use metrics::Metrics; diff --git a/polkadot/node/core/pvf/src/prepare/pool.rs b/polkadot/node/core/pvf/src/prepare/pool.rs index 1c4f399f6e..5d1bb24818 100644 --- a/polkadot/node/core/pvf/src/prepare/pool.rs +++ b/polkadot/node/core/pvf/src/prepare/pool.rs @@ -18,6 +18,7 @@ use super::worker::{self, Outcome}; use crate::{ error::{PrepareError, PrepareResult}, metrics::Metrics, + pvf::PvfWithExecutorParams, worker_common::{IdleWorker, WorkerHandle}, LOG_TARGET, }; @@ -25,12 +26,10 @@ use always_assert::never; use futures::{ channel::mpsc, future::BoxFuture, stream::FuturesUnordered, Future, FutureExt, StreamExt, }; -use polkadot_primitives::vstaging::ExecutorParams; use slotmap::HopSlotMap; use std::{ fmt, path::{Path, PathBuf}, - sync::Arc, task::Poll, time::Duration, }; @@ -68,9 +67,8 @@ pub enum ToPool { /// sent until either `Concluded` or `Rip` message is received. StartWork { worker: Worker, - code: Arc>, + pvf_with_params: PvfWithExecutorParams, artifact_path: PathBuf, - executor_params: ExecutorParams, preparation_timeout: Duration, }, } @@ -216,7 +214,7 @@ fn handle_to_pool( metrics.prepare_worker().on_begin_spawn(); mux.push(spawn_worker_task(program_path.to_owned(), spawn_timeout).boxed()); }, - ToPool::StartWork { worker, code, artifact_path, executor_params, preparation_timeout } => { + ToPool::StartWork { worker, pvf_with_params, artifact_path, preparation_timeout } => { if let Some(data) = spawned.get_mut(worker) { if let Some(idle) = data.idle.take() { let preparation_timer = metrics.time_preparation(); @@ -225,10 +223,9 @@ fn handle_to_pool( metrics.clone(), worker, idle, - code, + pvf_with_params, cache_path.to_owned(), artifact_path, - executor_params, preparation_timeout, preparation_timer, ) @@ -275,20 +272,18 @@ async fn start_work_task( metrics: Metrics, worker: Worker, idle: IdleWorker, - code: Arc>, + pvf_with_params: PvfWithExecutorParams, cache_path: PathBuf, artifact_path: PathBuf, - executor_params: ExecutorParams, preparation_timeout: Duration, _preparation_timer: Option, ) -> PoolEvent { let outcome = worker::start_work( &metrics, idle, - code, + pvf_with_params, &cache_path, artifact_path, - executor_params, preparation_timeout, ) .await; diff --git a/polkadot/node/core/pvf/src/prepare/queue.rs b/polkadot/node/core/pvf/src/prepare/queue.rs index 939f42ea62..8eee1289ab 100644 --- a/polkadot/node/core/pvf/src/prepare/queue.rs +++ b/polkadot/node/core/pvf/src/prepare/queue.rs @@ -445,9 +445,8 @@ async fn assign(queue: &mut Queue, worker: Worker, job: Job) -> Result<(), Fatal &mut queue.to_pool_tx, pool::ToPool::StartWork { worker, - code: job_data.pvf_with_params.code(), + pvf_with_params: job_data.pvf_with_params.clone(), artifact_path, - executor_params: job_data.pvf_with_params.executor_params(), preparation_timeout: job_data.preparation_timeout, }, ) diff --git a/polkadot/node/core/pvf/src/prepare/worker.rs b/polkadot/node/core/pvf/src/prepare/worker.rs index 8fba877a93..015cd2e836 100644 --- a/polkadot/node/core/pvf/src/prepare/worker.rs +++ b/polkadot/node/core/pvf/src/prepare/worker.rs @@ -24,6 +24,7 @@ use crate::{ error::{PrepareError, PrepareResult}, metrics::Metrics, prepare::PrepareStats, + pvf::PvfWithExecutorParams, worker_common::{ bytes_to_path, cpu_time_monitor_loop, framed_recv, framed_send, path_to_bytes, spawn_with_program_path, tmpfile_in, worker_event_loop, IdleWorker, SpawnErr, WorkerHandle, @@ -34,12 +35,12 @@ use crate::{ use cpu_time::ProcessTime; use futures::{pin_mut, select_biased, FutureExt}; use parity_scale_codec::{Decode, Encode}; -use polkadot_primitives::vstaging::ExecutorParams; + use sp_core::hexdisplay::HexDisplay; use std::{ panic, path::{Path, PathBuf}, - sync::{mpsc::channel, Arc}, + sync::mpsc::channel, time::Duration, }; use tokio::{io, net::UnixStream}; @@ -83,10 +84,9 @@ pub enum Outcome { pub async fn start_work( metrics: &Metrics, worker: IdleWorker, - code: Arc>, + pvf_with_params: PvfWithExecutorParams, cache_path: &Path, artifact_path: PathBuf, - executor_params: ExecutorParams, preparation_timeout: Duration, ) -> Outcome { let IdleWorker { stream, pid } = worker; @@ -100,7 +100,7 @@ pub async fn start_work( with_tmp_file(stream, pid, cache_path, |tmp_file, mut stream| async move { if let Err(err) = - send_request(&mut stream, code, &tmp_file, &executor_params, preparation_timeout).await + send_request(&mut stream, pvf_with_params, &tmp_file, preparation_timeout).await { gum::warn!( target: LOG_TARGET, @@ -273,22 +273,27 @@ where async fn send_request( stream: &mut UnixStream, - code: Arc>, + pvf_with_params: PvfWithExecutorParams, tmp_file: &Path, - executor_params: &ExecutorParams, preparation_timeout: Duration, ) -> io::Result<()> { - framed_send(stream, &code).await?; + framed_send(stream, &pvf_with_params.encode()).await?; framed_send(stream, path_to_bytes(tmp_file)).await?; - framed_send(stream, &executor_params.encode()).await?; framed_send(stream, &preparation_timeout.encode()).await?; Ok(()) } async fn recv_request( stream: &mut UnixStream, -) -> io::Result<(Vec, PathBuf, ExecutorParams, Duration)> { - let code = framed_recv(stream).await?; +) -> io::Result<(PvfWithExecutorParams, PathBuf, Duration)> { + let pvf_with_params = framed_recv(stream).await?; + let pvf_with_params = + PvfWithExecutorParams::decode(&mut &pvf_with_params[..]).map_err(|e| { + io::Error::new( + io::ErrorKind::Other, + format!("prepare pvf recv_request: failed to decode PvfWithExecutorParams: {}", e), + ) + })?; let tmp_file = framed_recv(stream).await?; let tmp_file = bytes_to_path(&tmp_file).ok_or_else(|| { io::Error::new( @@ -296,13 +301,6 @@ async fn recv_request( "prepare pvf recv_request: non utf-8 artifact path".to_string(), ) })?; - let executor_params_enc = framed_recv(stream).await?; - let executor_params = ExecutorParams::decode(&mut &executor_params_enc[..]).map_err(|_| { - io::Error::new( - io::ErrorKind::Other, - "prepare pvf recv_request: failed to decode ExecutorParams".to_string(), - ) - })?; let preparation_timeout = framed_recv(stream).await?; let preparation_timeout = Duration::decode(&mut &preparation_timeout[..]).map_err(|e| { io::Error::new( @@ -310,7 +308,7 @@ async fn recv_request( format!("prepare pvf recv_request: failed to decode duration: {:?}", e), ) })?; - Ok((code, tmp_file, executor_params, preparation_timeout)) + Ok((pvf_with_params, tmp_file, preparation_timeout)) } async fn send_response(stream: &mut UnixStream, result: PrepareResult) -> io::Result<()> { @@ -362,8 +360,7 @@ pub fn worker_entrypoint(socket_path: &str) { worker_event_loop("prepare", socket_path, |rt_handle, mut stream| async move { loop { let worker_pid = std::process::id(); - let (code, dest, executor_params, preparation_timeout) = - recv_request(&mut stream).await?; + let (pvf_with_params, dest, preparation_timeout) = recv_request(&mut stream).await?; gum::debug!( target: LOG_TARGET, %worker_pid, @@ -388,7 +385,7 @@ pub fn worker_entrypoint(socket_path: &str) { // Spawn another thread for preparation. let prepare_fut = rt_handle .spawn_blocking(move || { - let result = prepare_artifact(&code, executor_params); + let result = prepare_artifact(pvf_with_params); // Get the `ru_maxrss` stat. If supported, call getrusage for the thread. #[cfg(target_os = "linux")] @@ -471,16 +468,15 @@ pub fn worker_entrypoint(socket_path: &str) { } fn prepare_artifact( - code: &[u8], - executor_params: ExecutorParams, + pvf_with_params: PvfWithExecutorParams, ) -> Result { panic::catch_unwind(|| { - let blob = match crate::executor_intf::prevalidate(code) { + let blob = match crate::executor_intf::prevalidate(&pvf_with_params.code()) { Err(err) => return Err(PrepareError::Prevalidation(format!("{:?}", err))), Ok(b) => b, }; - match crate::executor_intf::prepare(blob, executor_params) { + match crate::executor_intf::prepare(blob, &pvf_with_params.executor_params()) { Ok(compiled_artifact) => Ok(CompiledArtifact::new(compiled_artifact)), Err(err) => Err(PrepareError::Preparation(format!("{:?}", err))), } diff --git a/polkadot/node/core/pvf/src/pvf.rs b/polkadot/node/core/pvf/src/pvf.rs index e0284a2608..439e36a373 100644 --- a/polkadot/node/core/pvf/src/pvf.rs +++ b/polkadot/node/core/pvf/src/pvf.rs @@ -15,81 +15,79 @@ // along with Polkadot. If not, see . use crate::artifacts::ArtifactId; +use parity_scale_codec::{Decode, Encode}; use polkadot_parachain::primitives::ValidationCodeHash; use polkadot_primitives::vstaging::ExecutorParams; use sp_core::blake2_256; -use std::{fmt, sync::Arc}; +use std::{ + cmp::{Eq, PartialEq}, + fmt, + sync::Arc, +}; -/// A struct that carries code of a parachain validation function and its hash. +/// A struct that carries code of a parachain validation function, its hash, and a corresponding +/// set of executor parameters. /// /// Should be cheap to clone. -#[derive(Clone)] -pub struct Pvf { +#[derive(Clone, Encode, Decode)] +pub struct PvfWithExecutorParams { pub(crate) code: Arc>, pub(crate) code_hash: ValidationCodeHash, -} - -impl fmt::Debug for Pvf { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "Pvf {{ code, code_hash: {:?} }}", self.code_hash) - } -} - -impl Pvf { - /// Returns an instance of the PVF out of the given PVF code. - pub fn from_code(code: Vec) -> Self { - let code = Arc::new(code); - let code_hash = blake2_256(&code).into(); - Self { code, code_hash } - } - - /// Creates a new PVF which artifact id can be uniquely identified by the given number. - #[cfg(test)] - pub(crate) fn from_discriminator(num: u32) -> Self { - let descriminator_buf = num.to_le_bytes().to_vec(); - Pvf::from_code(descriminator_buf) - } -} - -/// Coupling PVF code with executor params -#[derive(Debug, Clone)] -pub struct PvfWithExecutorParams { - pvf: Pvf, - executor_params: Arc, + pub(crate) executor_params: Arc, } impl PvfWithExecutorParams { - /// Creates a new PVF-ExecutorParams pair structure - pub fn new(pvf: Pvf, executor_params: ExecutorParams) -> Self { - Self { pvf, executor_params: Arc::new(executor_params) } + /// Returns an instance of the PVF out of the given PVF code and executor params. + pub fn from_code(code: Vec, executor_params: ExecutorParams) -> Self { + let code = Arc::new(code); + let code_hash = blake2_256(&code).into(); + let executor_params = Arc::new(executor_params); + Self { code, code_hash, executor_params } } /// Returns artifact ID that corresponds to the PVF with given executor params pub(crate) fn as_artifact_id(&self) -> ArtifactId { - ArtifactId::new(self.pvf.code_hash, self.executor_params.hash()) + ArtifactId::new(self.code_hash, self.executor_params.hash()) } /// Returns validation code hash for the PVF pub(crate) fn code_hash(&self) -> ValidationCodeHash { - self.pvf.code_hash + self.code_hash } /// Returns PVF code pub(crate) fn code(&self) -> Arc> { - self.pvf.code.clone() + self.code.clone() } /// Returns executor params - pub(crate) fn executor_params(&self) -> ExecutorParams { - (*self.executor_params).clone() + pub(crate) fn executor_params(&self) -> Arc { + self.executor_params.clone() } /// Creates a structure for tests #[cfg(test)] pub(crate) fn from_discriminator(num: u32) -> Self { - Self { - pvf: Pvf::from_discriminator(num), - executor_params: Arc::new(ExecutorParams::default()), - } + let descriminator_buf = num.to_le_bytes().to_vec(); + Self::from_code(descriminator_buf, ExecutorParams::default()) } } + +impl fmt::Debug for PvfWithExecutorParams { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "Pvf {{ code, code_hash: {:?}, executor_params: {:?} }}", + self.code_hash, self.executor_params + ) + } +} + +impl PartialEq for PvfWithExecutorParams { + fn eq(&self, other: &Self) -> bool { + self.code_hash == other.code_hash && + self.executor_params.hash() == other.executor_params.hash() + } +} + +impl Eq for PvfWithExecutorParams {} diff --git a/polkadot/node/core/pvf/src/testing.rs b/polkadot/node/core/pvf/src/testing.rs index 2abc1d07a8..e41b769440 100644 --- a/polkadot/node/core/pvf/src/testing.rs +++ b/polkadot/node/core/pvf/src/testing.rs @@ -37,7 +37,7 @@ pub fn validate_candidate( .expect("Decompressing code failed"); let blob = prevalidate(&code)?; - let artifact = prepare(blob, ExecutorParams::default())?; + let artifact = prepare(blob, &ExecutorParams::default())?; let tmpdir = tempfile::tempdir()?; let artifact_path = tmpdir.path().join("blob"); std::fs::write(&artifact_path, &artifact)?; diff --git a/polkadot/node/core/pvf/tests/it/main.rs b/polkadot/node/core/pvf/tests/it/main.rs index b540230c47..9fc329d591 100644 --- a/polkadot/node/core/pvf/tests/it/main.rs +++ b/polkadot/node/core/pvf/tests/it/main.rs @@ -17,7 +17,7 @@ use assert_matches::assert_matches; use parity_scale_codec::Encode as _; use polkadot_node_core_pvf::{ - start, Config, InvalidCandidate, Metrics, Pvf, PvfWithExecutorParams, ValidationError, + start, Config, InvalidCandidate, Metrics, PvfWithExecutorParams, ValidationError, ValidationHost, JOB_TIMEOUT_WALL_CLOCK_FACTOR, }; use polkadot_parachain::primitives::{BlockData, ValidationParams, ValidationResult}; @@ -69,7 +69,7 @@ impl TestHost { .lock() .await .execute_pvf( - PvfWithExecutorParams::new(Pvf::from_code(code.into()), executor_params), + PvfWithExecutorParams::from_code(code.into(), executor_params), TEST_EXECUTION_TIMEOUT, params.encode(), polkadot_node_core_pvf::Priority::Normal, diff --git a/polkadot/node/test/performance-test/src/lib.rs b/polkadot/node/test/performance-test/src/lib.rs index 9d01121b95..4ba86014e1 100644 --- a/polkadot/node/test/performance-test/src/lib.rs +++ b/polkadot/node/test/performance-test/src/lib.rs @@ -67,7 +67,7 @@ pub fn measure_pvf_prepare(wasm_code: &[u8]) -> Result // Recreate the pipeline from the pvf prepare worker. let blob = polkadot_node_core_pvf::prevalidate(code.as_ref()).map_err(PerfCheckError::from)?; - polkadot_node_core_pvf::prepare(blob, ExecutorParams::default()) + polkadot_node_core_pvf::prepare(blob, &ExecutorParams::default()) .map_err(PerfCheckError::from)?; Ok(start.elapsed())