From 5338b23ca3e86dfafd4a83da7b006c7702e2ae53 Mon Sep 17 00:00:00 2001 From: Cecile Tonglet Date: Wed, 9 Sep 2020 12:17:22 +0200 Subject: [PATCH] Make ValidationPool accepts execution mode to run custom command or in process validation (#1622) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Initial commit Forked at: cc19f134689c5c34de0056286bb8818c91001bfc Parent branch: origin/master * Propagate test mode all the way down to ValidationPool * Update validation/src/validation_service/mod.rs * Fix test * WIP Forked at: cc19f134689c5c34de0056286bb8818c91001bfc Parent branch: origin/master * Update service/src/lib.rs Co-authored-by: Bastian Köcher * Adapt code to review suggestions * Run validation inside the same process * Add test * CLEANUP Forked at: cc19f134689c5c34de0056286bb8818c91001bfc Parent branch: origin/master Co-authored-by: Bastian Köcher --- .../node/core/candidate-validation/src/lib.rs | 8 +- polkadot/parachain/src/wasm_executor/mod.rs | 17 +--- .../src/wasm_executor/validation_host.rs | 79 ++++++++++++++----- .../test-parachains/tests/adder/mod.rs | 48 ++++++++--- .../tests/wasm_executor/mod.rs | 25 ++++-- 5 files changed, 121 insertions(+), 56 deletions(-) diff --git a/polkadot/node/core/candidate-validation/src/lib.rs b/polkadot/node/core/candidate-validation/src/lib.rs index 600dde2c4e..f562a5ea6f 100644 --- a/polkadot/node/core/candidate-validation/src/lib.rs +++ b/polkadot/node/core/candidate-validation/src/lib.rs @@ -35,8 +35,10 @@ use polkadot_primitives::v1::{ ValidationCode, PoV, CandidateDescriptor, ValidationData, PersistedValidationData, TransientValidationData, OccupiedCoreAssumption, Hash, }; -use polkadot_parachain::wasm_executor::{self, ValidationPool, ExecutionMode, ValidationError, - InvalidCandidate as WasmInvalidCandidate}; +use polkadot_parachain::wasm_executor::{ + self, ValidationPool, ExecutionMode, ValidationError, + InvalidCandidate as WasmInvalidCandidate, ValidationExecutionMode, +}; use polkadot_parachain::primitives::{ValidationResult as WasmValidationResult, ValidationParams}; use parity_scale_codec::Encode; @@ -128,7 +130,7 @@ async fn run( ) -> SubsystemResult<()> { - let pool = ValidationPool::new(); + let pool = ValidationPool::new(ValidationExecutionMode::ExternalProcessSelfHost); loop { match ctx.recv().await? { diff --git a/polkadot/parachain/src/wasm_executor/mod.rs b/polkadot/parachain/src/wasm_executor/mod.rs index c9ab7587bf..32002c3ca4 100644 --- a/polkadot/parachain/src/wasm_executor/mod.rs +++ b/polkadot/parachain/src/wasm_executor/mod.rs @@ -28,7 +28,7 @@ use sp_externalities::Extensions; use sp_wasm_interface::HostFunctions as _; #[cfg(not(any(target_os = "android", target_os = "unknown")))] -pub use validation_host::{run_worker, ValidationPool, EXECUTION_TIMEOUT_SEC}; +pub use validation_host::{run_worker, ValidationPool, EXECUTION_TIMEOUT_SEC, ValidationExecutionMode}; mod validation_host; @@ -66,8 +66,6 @@ pub enum ExecutionMode<'a> { Local, /// Remote execution in a spawned process. Remote(&'a ValidationPool), - /// Remote execution in a spawned test runner. - RemoteTest(&'a ValidationPool), } #[derive(Debug, derive_more::Display, derive_more::From)] @@ -143,11 +141,7 @@ pub fn validate_candidate( }, #[cfg(not(any(target_os = "android", target_os = "unknown")))] ExecutionMode::Remote(pool) => { - pool.validate_candidate(validation_code, params, false) - }, - #[cfg(not(any(target_os = "android", target_os = "unknown")))] - ExecutionMode::RemoteTest(pool) => { - pool.validate_candidate(validation_code, params, true) + pool.validate_candidate(validation_code, params) }, #[cfg(any(target_os = "android", target_os = "unknown"))] ExecutionMode::Remote(_pool) => @@ -156,13 +150,6 @@ pub fn validate_candidate( "Remote validator not available".to_string() ) as Box<_> ))), - #[cfg(any(target_os = "android", target_os = "unknown"))] - ExecutionMode::RemoteTest(_pool) => - Err(ValidationError::Internal(InternalError::System( - Box::::from( - "Remote validator not available".to_string() - ) as Box<_> - ))), } } diff --git a/polkadot/parachain/src/wasm_executor/validation_host.rs b/polkadot/parachain/src/wasm_executor/validation_host.rs index 238ed65c7a..249ad96467 100644 --- a/polkadot/parachain/src/wasm_executor/validation_host.rs +++ b/polkadot/parachain/src/wasm_executor/validation_host.rs @@ -16,7 +16,7 @@ #![cfg(not(any(target_os = "android", target_os = "unknown")))] -use std::{process, env, sync::Arc, sync::atomic}; +use std::{process, env, sync::Arc, sync::atomic, path::PathBuf}; use codec::{Decode, Encode}; use crate::primitives::{ValidationParams, ValidationResult}; use super::{ @@ -29,7 +29,6 @@ use log::{debug, trace}; use futures::executor::ThreadPool; use sp_core::traits::SpawnNamed; -const WORKER_ARGS_TEST: &[&'static str] = &["--nocapture", "validation_worker"]; /// CLI Argument to start in validation worker mode. const WORKER_ARG: &'static str = "validation-worker"; const WORKER_ARGS: &[&'static str] = &[WORKER_ARG]; @@ -66,19 +65,40 @@ impl SpawnNamed for TaskExecutor { } } +/// The execution mode for the `ValidationPool`. +#[derive(Debug, Clone)] +pub enum ValidationExecutionMode { + /// The validation worker is ran in a thread inside the same process. + InProcess, + /// The validation worker is ran using the process' executable and the subcommand `validation-worker` is passed + /// following by the address of the shared memory. + ExternalProcessSelfHost, + /// The validation worker is ran using the command provided and the argument provided. The address of the shared + /// memory is added at the end of the arguments. + ExternalProcessCustomHost { + /// Path to the validation worker. The file must exists and be executable. + binary: PathBuf, + /// List of arguments passed to the validation worker. The address of the shared memory will be automatically + /// added after the arguments. + args: Vec, + }, +} + /// A pool of hosts. #[derive(Clone)] pub struct ValidationPool { hosts: Arc>>, + execution_mode: ValidationExecutionMode, } const DEFAULT_NUM_HOSTS: usize = 8; impl ValidationPool { /// Creates a validation pool with the default configuration. - pub fn new() -> ValidationPool { + pub fn new(execution_mode: ValidationExecutionMode) -> ValidationPool { ValidationPool { hosts: Arc::new((0..DEFAULT_NUM_HOSTS).map(|_| Default::default()).collect()), + execution_mode, } } @@ -90,16 +110,15 @@ impl ValidationPool { &self, validation_code: &[u8], params: ValidationParams, - test_mode: bool, ) -> Result { for host in self.hosts.iter() { if let Some(mut host) = host.try_lock() { - return host.validate_candidate(validation_code, params, test_mode); + return host.validate_candidate(validation_code, params, self.execution_mode.clone()); } } // all workers are busy, just wait for the first one - self.hosts[0].lock().validate_candidate(validation_code, params, test_mode) + self.hosts[0].lock().validate_candidate(validation_code, params, self.execution_mode.clone()) } } @@ -208,6 +227,7 @@ unsafe impl Send for ValidationHost {} #[derive(Default)] struct ValidationHost { worker: Option, + worker_thread: Option>>, memory: Option, id: u32, } @@ -233,7 +253,7 @@ impl ValidationHost { Ok(mem_config.create()?) } - fn start_worker(&mut self, test_mode: bool) -> Result<(), InternalError> { + fn start_worker(&mut self, execution_mode: ValidationExecutionMode) -> Result<(), InternalError> { if let Some(ref mut worker) = self.worker { // Check if still alive if let Ok(None) = worker.try_wait() { @@ -241,17 +261,38 @@ impl ValidationHost { return Ok(()); } } + if self.worker_thread.is_some() { + return Ok(()); + } + let memory = Self::create_memory()?; - let self_path = env::current_exe()?; - debug!("Starting worker at {:?}", self_path); - let mut args = if test_mode { WORKER_ARGS_TEST.to_vec() } else { WORKER_ARGS.to_vec() }; - args.push(memory.get_os_path()); - let worker = process::Command::new(self_path) - .args(args) - .stdin(process::Stdio::piped()) - .spawn()?; - self.id = worker.id(); - self.worker = Some(worker); + + let mut run_worker_process = |cmd: PathBuf, args: Vec| -> Result<(), std::io::Error> { + debug!("Starting worker at {:?} with arguments: {:?} and {:?}", cmd, args, memory.get_os_path()); + let worker = process::Command::new(cmd) + .args(args) + .arg(memory.get_os_path()) + .stdin(process::Stdio::piped()) + .spawn()?; + self.id = worker.id(); + self.worker = Some(worker); + Ok(()) + }; + + match execution_mode { + ValidationExecutionMode::InProcess => { + let mem_id = memory.get_os_path().to_string(); + self.worker_thread = Some(std::thread::spawn(move || run_worker(mem_id.as_str()))); + }, + ValidationExecutionMode::ExternalProcessSelfHost => run_worker_process( + env::current_exe()?, + WORKER_ARGS.iter().map(|x| x.to_string()).collect(), + )?, + ValidationExecutionMode::ExternalProcessCustomHost { binary, args } => run_worker_process( + binary, + args, + )?, + }; memory.wait( Event::WorkerReady as usize, @@ -268,13 +309,13 @@ impl ValidationHost { &mut self, validation_code: &[u8], params: ValidationParams, - test_mode: bool, + execution_mode: ValidationExecutionMode, ) -> Result { if validation_code.len() > MAX_CODE_MEM { return Err(ValidationError::InvalidCandidate(InvalidCandidate::CodeTooLarge(validation_code.len()))); } // First, check if need to spawn the child process - self.start_worker(test_mode)?; + self.start_worker(execution_mode)?; let memory = self.memory.as_mut() .expect("memory is always `Some` after `start_worker` completes successfully"); { diff --git a/polkadot/parachain/test-parachains/tests/adder/mod.rs b/polkadot/parachain/test-parachains/tests/adder/mod.rs index 76924551ba..a052bf34b6 100644 --- a/polkadot/parachain/test-parachains/tests/adder/mod.rs +++ b/polkadot/parachain/test-parachains/tests/adder/mod.rs @@ -16,11 +16,16 @@ //! Basic parachain that adds a number as part of its state. -use parachain::primitives::{ - RelayChainBlockNumber, - BlockData as GenericBlockData, - HeadData as GenericHeadData, - ValidationParams, +const WORKER_ARGS_TEST: &[&'static str] = &["--nocapture", "validation_worker"]; + +use parachain::{ + primitives::{ + RelayChainBlockNumber, + BlockData as GenericBlockData, + HeadData as GenericHeadData, + ValidationParams, + }, + wasm_executor::{ValidationPool, ValidationExecutionMode} }; use codec::{Decode, Encode}; @@ -52,8 +57,28 @@ fn hash_head(head: &HeadData) -> [u8; 32] { tiny_keccak::keccak256(head.encode().as_slice()) } +fn validation_pool() -> ValidationPool { + let execution_mode = ValidationExecutionMode::ExternalProcessCustomHost { + binary: std::env::current_exe().unwrap(), + args: WORKER_ARGS_TEST.iter().map(|x| x.to_string()).collect(), + }; + + ValidationPool::new(execution_mode) +} + #[test] -pub fn execute_good_on_parent() { +fn execute_good_on_parent_with_inprocess_validation() { + let pool = ValidationPool::new(ValidationExecutionMode::InProcess); + execute_good_on_parent(pool); +} + +#[test] +pub fn execute_good_on_parent_with_external_process_validation() { + let pool = validation_pool(); + execute_good_on_parent(pool); +} + +fn execute_good_on_parent(pool: ValidationPool) { let parent_head = HeadData { number: 0, parent_hash: [0; 32], @@ -65,7 +90,6 @@ pub fn execute_good_on_parent() { add: 512, }; - let pool = parachain::wasm_executor::ValidationPool::new(); let ret = parachain::wasm_executor::validate_candidate( adder::wasm_binary_unwrap(), @@ -75,7 +99,7 @@ pub fn execute_good_on_parent() { relay_chain_height: 1, hrmp_mqc_heads: Vec::new(), }, - parachain::wasm_executor::ExecutionMode::RemoteTest(&pool), + parachain::wasm_executor::ExecutionMode::Remote(&pool), sp_core::testing::TaskExecutor::new(), ).unwrap(); @@ -91,7 +115,7 @@ fn execute_good_chain_on_parent() { let mut number = 0; let mut parent_hash = [0; 32]; let mut last_state = 0; - let pool = parachain::wasm_executor::ValidationPool::new(); + let pool = validation_pool(); for add in 0..10 { let parent_head = HeadData { @@ -113,7 +137,7 @@ fn execute_good_chain_on_parent() { relay_chain_height: number as RelayChainBlockNumber + 1, hrmp_mqc_heads: Vec::new(), }, - parachain::wasm_executor::ExecutionMode::RemoteTest(&pool), + parachain::wasm_executor::ExecutionMode::Remote(&pool), sp_core::testing::TaskExecutor::new(), ).unwrap(); @@ -131,7 +155,7 @@ fn execute_good_chain_on_parent() { #[test] fn execute_bad_on_parent() { - let pool = parachain::wasm_executor::ValidationPool::new(); + let pool = validation_pool(); let parent_head = HeadData { number: 0, @@ -152,7 +176,7 @@ fn execute_bad_on_parent() { relay_chain_height: 1, hrmp_mqc_heads: Vec::new(), }, - parachain::wasm_executor::ExecutionMode::RemoteTest(&pool), + parachain::wasm_executor::ExecutionMode::Remote(&pool), sp_core::testing::TaskExecutor::new(), ).unwrap_err(); } diff --git a/polkadot/parachain/test-parachains/tests/wasm_executor/mod.rs b/polkadot/parachain/test-parachains/tests/wasm_executor/mod.rs index b4f2211baa..ddae34a6df 100644 --- a/polkadot/parachain/test-parachains/tests/wasm_executor/mod.rs +++ b/polkadot/parachain/test-parachains/tests/wasm_executor/mod.rs @@ -16,15 +16,26 @@ //! Basic parachain that adds a number as part of its state. +const WORKER_ARGS_TEST: &[&'static str] = &["--nocapture", "validation_worker"]; + use crate::adder; use parachain::{ primitives::{BlockData, ValidationParams}, - wasm_executor::{ValidationError, InvalidCandidate, EXECUTION_TIMEOUT_SEC}, + wasm_executor::{ValidationError, InvalidCandidate, EXECUTION_TIMEOUT_SEC, ValidationExecutionMode, ValidationPool}, }; +fn validation_pool() -> ValidationPool { + let execution_mode = ValidationExecutionMode::ExternalProcessCustomHost { + binary: std::env::current_exe().unwrap(), + args: WORKER_ARGS_TEST.iter().map(|x| x.to_string()).collect(), + }; + + ValidationPool::new(execution_mode) +} + #[test] fn terminates_on_timeout() { - let pool = parachain::wasm_executor::ValidationPool::new(); + let pool = validation_pool(); let result = parachain::wasm_executor::validate_candidate( halt::wasm_binary_unwrap(), @@ -34,7 +45,7 @@ fn terminates_on_timeout() { relay_chain_height: 1, hrmp_mqc_heads: Vec::new(), }, - parachain::wasm_executor::ExecutionMode::RemoteTest(&pool), + parachain::wasm_executor::ExecutionMode::Remote(&pool), sp_core::testing::TaskExecutor::new(), ); match result { @@ -43,12 +54,12 @@ fn terminates_on_timeout() { } // check that another parachain can validate normaly - adder::execute_good_on_parent(); + adder::execute_good_on_parent_with_external_process_validation(); } #[test] fn parallel_execution() { - let pool = parachain::wasm_executor::ValidationPool::new(); + let pool = validation_pool(); let start = std::time::Instant::now(); @@ -62,7 +73,7 @@ fn parallel_execution() { relay_chain_height: 1, hrmp_mqc_heads: Vec::new(), }, - parachain::wasm_executor::ExecutionMode::RemoteTest(&pool2), + parachain::wasm_executor::ExecutionMode::Remote(&pool2), sp_core::testing::TaskExecutor::new(), ).ok()); let _ = parachain::wasm_executor::validate_candidate( @@ -73,7 +84,7 @@ fn parallel_execution() { relay_chain_height: 1, hrmp_mqc_heads: Vec::new(), }, - parachain::wasm_executor::ExecutionMode::RemoteTest(&pool), + parachain::wasm_executor::ExecutionMode::Remote(&pool), sp_core::testing::TaskExecutor::new(), ); thread.join().unwrap();