From 84f2ab3d7f0baba04f57a2b70c2b0bf25b69fefa Mon Sep 17 00:00:00 2001 From: Cecile Tonglet Date: Fri, 11 Sep 2020 14:41:31 +0200 Subject: [PATCH] Forwardport: Validation: don't detect STDIN closing when running in process (#1695) (#1703) * Initial commit Forked at: 8b595446601be5c4569bcceafbc1f04c1a22e0c5 Parent branch: origin/master * Validation: don't detect STDIN closing when running in process (#1695) --- .../node/core/candidate-validation/src/lib.rs | 28 ++-- polkadot/parachain/src/wasm_executor/mod.rs | 48 +++++-- .../src/wasm_executor/validation_host.rs | 131 +++++++++--------- .../test-parachains/tests/adder/mod.rs | 31 ++--- .../tests/wasm_executor/mod.rs | 23 ++- 5 files changed, 137 insertions(+), 124 deletions(-) diff --git a/polkadot/node/core/candidate-validation/src/lib.rs b/polkadot/node/core/candidate-validation/src/lib.rs index f562a5ea6f..fa61325b9c 100644 --- a/polkadot/node/core/candidate-validation/src/lib.rs +++ b/polkadot/node/core/candidate-validation/src/lib.rs @@ -37,7 +37,7 @@ use polkadot_primitives::v1::{ }; use polkadot_parachain::wasm_executor::{ self, ValidationPool, ExecutionMode, ValidationError, - InvalidCandidate as WasmInvalidCandidate, ValidationExecutionMode, + InvalidCandidate as WasmInvalidCandidate, }; use polkadot_parachain::primitives::{ValidationResult as WasmValidationResult, ValidationParams}; @@ -130,7 +130,7 @@ async fn run( ) -> SubsystemResult<()> { - let pool = ValidationPool::new(ValidationExecutionMode::ExternalProcessSelfHost); + let execution_mode = ExecutionMode::ExternalProcessSelfHost(ValidationPool::new()); loop { match ctx.recv().await? { @@ -145,7 +145,7 @@ async fn run( ) => { let res = spawn_validate_from_chain_state( &mut ctx, - Some(pool.clone()), + execution_mode.clone(), descriptor, pov, spawn.clone(), @@ -169,7 +169,7 @@ async fn run( ) => { let res = spawn_validate_exhaustive( &mut ctx, - Some(pool.clone()), + execution_mode.clone(), persisted_validation_data, transient_validation_data, validation_code, @@ -271,7 +271,7 @@ async fn check_assumption_validation_data( async fn spawn_validate_from_chain_state( ctx: &mut impl SubsystemContext, - validation_pool: Option, + execution_mode: ExecutionMode, descriptor: CandidateDescriptor, pov: Arc, spawn: impl SpawnNamed + 'static, @@ -288,7 +288,7 @@ async fn spawn_validate_from_chain_state( AssumptionCheckOutcome::Matches(validation_data, validation_code) => { return spawn_validate_exhaustive( ctx, - validation_pool, + execution_mode, validation_data.persisted, Some(validation_data.transient), validation_code, @@ -309,7 +309,7 @@ async fn spawn_validate_from_chain_state( AssumptionCheckOutcome::Matches(validation_data, validation_code) => { return spawn_validate_exhaustive( ctx, - validation_pool, + execution_mode, validation_data.persisted, Some(validation_data.transient), validation_code, @@ -330,7 +330,7 @@ async fn spawn_validate_from_chain_state( async fn spawn_validate_exhaustive( ctx: &mut impl SubsystemContext, - validation_pool: Option, + execution_mode: ExecutionMode, persisted_validation_data: PersistedValidationData, transient_validation_data: Option, validation_code: ValidationCode, @@ -341,7 +341,7 @@ async fn spawn_validate_exhaustive( let (tx, rx) = oneshot::channel(); let fut = async move { let res = validate_candidate_exhaustive::( - validation_pool, + execution_mode, persisted_validation_data, transient_validation_data, validation_code, @@ -422,22 +422,18 @@ trait ValidationBackend { struct RealValidationBackend; impl ValidationBackend for RealValidationBackend { - type Arg = Option; + type Arg = ExecutionMode; fn validate( - pool: Option, + execution_mode: ExecutionMode, validation_code: &ValidationCode, params: ValidationParams, spawn: S, ) -> Result { - let execution_mode = pool.as_ref() - .map(ExecutionMode::Remote) - .unwrap_or(ExecutionMode::Local); - wasm_executor::validate_candidate( &validation_code.0, params, - execution_mode, + &execution_mode, spawn, ) } diff --git a/polkadot/parachain/src/wasm_executor/mod.rs b/polkadot/parachain/src/wasm_executor/mod.rs index 32002c3ca4..1dde237f03 100644 --- a/polkadot/parachain/src/wasm_executor/mod.rs +++ b/polkadot/parachain/src/wasm_executor/mod.rs @@ -20,7 +20,7 @@ //! Assuming the parameters are correct, this module provides a wrapper around //! a WASM VM for re-execution of a parachain candidate. -use std::any::{TypeId, Any}; +use std::{any::{TypeId, Any}, path::PathBuf}; use crate::primitives::{ValidationParams, ValidationResult}; use codec::{Decode, Encode}; use sp_core::{storage::{ChildInfo, TrackedStorageKey}, traits::{CallInWasm, SpawnNamed}}; @@ -28,7 +28,7 @@ use sp_externalities::Extensions; use sp_wasm_interface::HostFunctions as _; #[cfg(not(any(target_os = "android", target_os = "unknown")))] -pub use validation_host::{run_worker, ValidationPool, EXECUTION_TIMEOUT_SEC, ValidationExecutionMode}; +pub use validation_host::{run_worker, ValidationPool, EXECUTION_TIMEOUT_SEC, WORKER_ARGS}; mod validation_host; @@ -58,16 +58,29 @@ pub fn run_worker(_: &str) -> Result<(), String> { Err("Cannot run validation worker on this platform".to_string()) } -/// WASM code execution mode. -/// -/// > Note: When compiling for WASM, the `Remote` variants are not available. -pub enum ExecutionMode<'a> { - /// Execute in-process. The execution can not be interrupted or aborted. - Local, - /// Remote execution in a spawned process. - Remote(&'a ValidationPool), +/// The execution mode for the `ValidationPool`. +#[derive(Clone)] +#[cfg_attr(not(any(target_os = "android", target_os = "unknown")), derive(Debug))] +pub enum ExecutionMode { + /// The validation worker is ran in a thread inside the same process. + InProcess, + /// The validation worker is ran using the process' executable and the subcommand `validation-worker` is passed + /// following by the address of the shared memory. + ExternalProcessSelfHost(ValidationPool), + /// The validation worker is ran using the command provided and the argument provided. The address of the shared + /// memory is added at the end of the arguments. + ExternalProcessCustomHost { + /// Validation pool. + pool: ValidationPool, + /// Path to the validation worker. The file must exists and be executable. + binary: PathBuf, + /// List of arguments passed to the validation worker. The address of the shared memory will be automatically + /// added after the arguments. + args: Vec, + }, } + #[derive(Debug, derive_more::Display, derive_more::From)] /// Candidate validation error. pub enum ValidationError { @@ -132,19 +145,24 @@ impl std::error::Error for ValidationError { pub fn validate_candidate( validation_code: &[u8], params: ValidationParams, - options: ExecutionMode<'_>, + execution_mode: &ExecutionMode, spawner: impl SpawnNamed + 'static, ) -> Result { - match options { - ExecutionMode::Local => { + match execution_mode { + ExecutionMode::InProcess => { validate_candidate_internal(validation_code, ¶ms.encode(), spawner) }, #[cfg(not(any(target_os = "android", target_os = "unknown")))] - ExecutionMode::Remote(pool) => { + ExecutionMode::ExternalProcessSelfHost(pool) => { pool.validate_candidate(validation_code, params) }, + #[cfg(not(any(target_os = "android", target_os = "unknown")))] + ExecutionMode::ExternalProcessCustomHost { pool, binary, args } => { + let args: Vec<&str> = args.iter().map(|x| x.as_str()).collect(); + pool.validate_candidate_custom(validation_code, params, binary, &args) + }, #[cfg(any(target_os = "android", target_os = "unknown"))] - ExecutionMode::Remote(_pool) => + ExecutionMode::ExternalProcessSelfHost(_) | ExecutionMode::ExternalProcessCustomHost { .. } => Err(ValidationError::Internal(InternalError::System( Box::::from( "Remote validator not available".to_string() diff --git a/polkadot/parachain/src/wasm_executor/validation_host.rs b/polkadot/parachain/src/wasm_executor/validation_host.rs index 249ad96467..07367d3318 100644 --- a/polkadot/parachain/src/wasm_executor/validation_host.rs +++ b/polkadot/parachain/src/wasm_executor/validation_host.rs @@ -29,9 +29,9 @@ use log::{debug, trace}; use futures::executor::ThreadPool; use sp_core::traits::SpawnNamed; -/// CLI Argument to start in validation worker mode. const WORKER_ARG: &'static str = "validation-worker"; -const WORKER_ARGS: &[&'static str] = &[WORKER_ARG]; +/// CLI Argument to start in validation worker mode. +pub const WORKER_ARGS: &[&'static str] = &[WORKER_ARG]; /// Execution timeout in seconds; #[cfg(debug_assertions)] @@ -65,60 +65,60 @@ impl SpawnNamed for TaskExecutor { } } -/// The execution mode for the `ValidationPool`. -#[derive(Debug, Clone)] -pub enum ValidationExecutionMode { - /// The validation worker is ran in a thread inside the same process. - InProcess, - /// The validation worker is ran using the process' executable and the subcommand `validation-worker` is passed - /// following by the address of the shared memory. - ExternalProcessSelfHost, - /// The validation worker is ran using the command provided and the argument provided. The address of the shared - /// memory is added at the end of the arguments. - ExternalProcessCustomHost { - /// Path to the validation worker. The file must exists and be executable. - binary: PathBuf, - /// List of arguments passed to the validation worker. The address of the shared memory will be automatically - /// added after the arguments. - args: Vec, - }, -} - /// A pool of hosts. -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct ValidationPool { hosts: Arc>>, - execution_mode: ValidationExecutionMode, } const DEFAULT_NUM_HOSTS: usize = 8; impl ValidationPool { /// Creates a validation pool with the default configuration. - pub fn new(execution_mode: ValidationExecutionMode) -> ValidationPool { + pub fn new() -> ValidationPool { ValidationPool { hosts: Arc::new((0..DEFAULT_NUM_HOSTS).map(|_| Default::default()).collect()), - execution_mode, } } - /// Validate a candidate under the given validation code using the next - /// free validation host. + /// Validate a candidate under the given validation code using the next free validation host. /// /// This will fail if the validation code is not a proper parachain validation module. + /// + /// This function will use `std::env::current_exe()` with the default arguments [`WORKER_ARGS`] to run the worker. pub fn validate_candidate( &self, validation_code: &[u8], params: ValidationParams, + ) -> Result { + self.validate_candidate_custom( + validation_code, + params, + &env::current_exe().map_err(|err| ValidationError::Internal(err.into()))?, + WORKER_ARGS, + ) + } + + /// Validate a candidate under the given validation code using the next free validation host. + /// + /// This will fail if the validation code is not a proper parachain validation module. + /// + /// This function will use the command and the arguments provided in the function's arguments to run the worker. + pub fn validate_candidate_custom( + &self, + validation_code: &[u8], + params: ValidationParams, + command: &PathBuf, + args: &[&str], ) -> Result { for host in self.hosts.iter() { if let Some(mut host) = host.try_lock() { - return host.validate_candidate(validation_code, params, self.execution_mode.clone()); + return host.validate_candidate(validation_code, params, command, args) } } // all workers are busy, just wait for the first one - self.hosts[0].lock().validate_candidate(validation_code, params, self.execution_mode.clone()) + self.hosts[0].lock().validate_candidate(validation_code, params, command, args) } } @@ -224,11 +224,32 @@ enum ValidationResultHeader { unsafe impl Send for ValidationHost {} -#[derive(Default)] +struct ValidationHostMemory(SharedMem); + +impl std::fmt::Debug for ValidationHostMemory { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "ValidationHostMemory") + } +} + +impl std::ops::Deref for ValidationHostMemory { + type Target = SharedMem; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl std::ops::DerefMut for ValidationHostMemory { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +#[derive(Default, Debug)] struct ValidationHost { worker: Option, - worker_thread: Option>>, - memory: Option, + memory: Option, id: u32, } @@ -253,7 +274,7 @@ impl ValidationHost { Ok(mem_config.create()?) } - fn start_worker(&mut self, execution_mode: ValidationExecutionMode) -> Result<(), InternalError> { + fn start_worker(&mut self, cmd: &PathBuf, args: &[&str]) -> Result<(), InternalError> { if let Some(ref mut worker) = self.worker { // Check if still alive if let Ok(None) = worker.try_wait() { @@ -261,44 +282,23 @@ impl ValidationHost { return Ok(()); } } - if self.worker_thread.is_some() { - return Ok(()); - } let memory = Self::create_memory()?; - let mut run_worker_process = |cmd: PathBuf, args: Vec| -> Result<(), std::io::Error> { - debug!("Starting worker at {:?} with arguments: {:?} and {:?}", cmd, args, memory.get_os_path()); - let worker = process::Command::new(cmd) - .args(args) - .arg(memory.get_os_path()) - .stdin(process::Stdio::piped()) - .spawn()?; - self.id = worker.id(); - self.worker = Some(worker); - Ok(()) - }; - - match execution_mode { - ValidationExecutionMode::InProcess => { - let mem_id = memory.get_os_path().to_string(); - self.worker_thread = Some(std::thread::spawn(move || run_worker(mem_id.as_str()))); - }, - ValidationExecutionMode::ExternalProcessSelfHost => run_worker_process( - env::current_exe()?, - WORKER_ARGS.iter().map(|x| x.to_string()).collect(), - )?, - ValidationExecutionMode::ExternalProcessCustomHost { binary, args } => run_worker_process( - binary, - args, - )?, - }; + debug!("Starting worker at {:?} with arguments: {:?} and {:?}", cmd, args, memory.get_os_path()); + let worker = process::Command::new(cmd) + .args(args) + .arg(memory.get_os_path()) + .stdin(process::Stdio::piped()) + .spawn()?; + self.id = worker.id(); + self.worker = Some(worker); memory.wait( Event::WorkerReady as usize, shared_memory::Timeout::Sec(EXECUTION_TIMEOUT_SEC as usize), )?; - self.memory = Some(memory); + self.memory = Some(ValidationHostMemory(memory)); Ok(()) } @@ -309,13 +309,14 @@ impl ValidationHost { &mut self, validation_code: &[u8], params: ValidationParams, - execution_mode: ValidationExecutionMode, + binary: &PathBuf, + args: &[&str], ) -> Result { if validation_code.len() > MAX_CODE_MEM { return Err(ValidationError::InvalidCandidate(InvalidCandidate::CodeTooLarge(validation_code.len()))); } // First, check if need to spawn the child process - self.start_worker(execution_mode)?; + self.start_worker(binary, args)?; let memory = self.memory.as_mut() .expect("memory is always `Some` after `start_worker` completes successfully"); { diff --git a/polkadot/parachain/test-parachains/tests/adder/mod.rs b/polkadot/parachain/test-parachains/tests/adder/mod.rs index a052bf34b6..2d8d228c86 100644 --- a/polkadot/parachain/test-parachains/tests/adder/mod.rs +++ b/polkadot/parachain/test-parachains/tests/adder/mod.rs @@ -25,7 +25,7 @@ use parachain::{ HeadData as GenericHeadData, ValidationParams, }, - wasm_executor::{ValidationPool, ValidationExecutionMode} + wasm_executor::{ValidationPool, ExecutionMode} }; use codec::{Decode, Encode}; @@ -57,28 +57,27 @@ fn hash_head(head: &HeadData) -> [u8; 32] { tiny_keccak::keccak256(head.encode().as_slice()) } -fn validation_pool() -> ValidationPool { - let execution_mode = ValidationExecutionMode::ExternalProcessCustomHost { +fn execution_mode() -> ExecutionMode { + ExecutionMode::ExternalProcessCustomHost { + pool: ValidationPool::new(), binary: std::env::current_exe().unwrap(), args: WORKER_ARGS_TEST.iter().map(|x| x.to_string()).collect(), - }; - - ValidationPool::new(execution_mode) + } } #[test] fn execute_good_on_parent_with_inprocess_validation() { - let pool = ValidationPool::new(ValidationExecutionMode::InProcess); - execute_good_on_parent(pool); + let execution_mode = ExecutionMode::InProcess; + execute_good_on_parent(execution_mode); } #[test] pub fn execute_good_on_parent_with_external_process_validation() { - let pool = validation_pool(); - execute_good_on_parent(pool); + let execution_mode = execution_mode(); + execute_good_on_parent(execution_mode); } -fn execute_good_on_parent(pool: ValidationPool) { +fn execute_good_on_parent(execution_mode: ExecutionMode) { let parent_head = HeadData { number: 0, parent_hash: [0; 32], @@ -99,7 +98,7 @@ fn execute_good_on_parent(pool: ValidationPool) { relay_chain_height: 1, hrmp_mqc_heads: Vec::new(), }, - parachain::wasm_executor::ExecutionMode::Remote(&pool), + &execution_mode, sp_core::testing::TaskExecutor::new(), ).unwrap(); @@ -115,7 +114,7 @@ fn execute_good_chain_on_parent() { let mut number = 0; let mut parent_hash = [0; 32]; let mut last_state = 0; - let pool = validation_pool(); + let execution_mode = execution_mode(); for add in 0..10 { let parent_head = HeadData { @@ -137,7 +136,7 @@ fn execute_good_chain_on_parent() { relay_chain_height: number as RelayChainBlockNumber + 1, hrmp_mqc_heads: Vec::new(), }, - parachain::wasm_executor::ExecutionMode::Remote(&pool), + &execution_mode, sp_core::testing::TaskExecutor::new(), ).unwrap(); @@ -155,7 +154,7 @@ fn execute_good_chain_on_parent() { #[test] fn execute_bad_on_parent() { - let pool = validation_pool(); + let execution_mode = execution_mode(); let parent_head = HeadData { number: 0, @@ -176,7 +175,7 @@ fn execute_bad_on_parent() { relay_chain_height: 1, hrmp_mqc_heads: Vec::new(), }, - parachain::wasm_executor::ExecutionMode::Remote(&pool), + &execution_mode, sp_core::testing::TaskExecutor::new(), ).unwrap_err(); } diff --git a/polkadot/parachain/test-parachains/tests/wasm_executor/mod.rs b/polkadot/parachain/test-parachains/tests/wasm_executor/mod.rs index ddae34a6df..600b25b9ee 100644 --- a/polkadot/parachain/test-parachains/tests/wasm_executor/mod.rs +++ b/polkadot/parachain/test-parachains/tests/wasm_executor/mod.rs @@ -21,21 +21,20 @@ const WORKER_ARGS_TEST: &[&'static str] = &["--nocapture", "validation_worker"]; use crate::adder; use parachain::{ primitives::{BlockData, ValidationParams}, - wasm_executor::{ValidationError, InvalidCandidate, EXECUTION_TIMEOUT_SEC, ValidationExecutionMode, ValidationPool}, + wasm_executor::{ValidationError, InvalidCandidate, EXECUTION_TIMEOUT_SEC, ExecutionMode, ValidationPool}, }; -fn validation_pool() -> ValidationPool { - let execution_mode = ValidationExecutionMode::ExternalProcessCustomHost { +fn execution_mode() -> ExecutionMode { + ExecutionMode::ExternalProcessCustomHost { + pool: ValidationPool::new(), binary: std::env::current_exe().unwrap(), args: WORKER_ARGS_TEST.iter().map(|x| x.to_string()).collect(), - }; - - ValidationPool::new(execution_mode) + } } #[test] fn terminates_on_timeout() { - let pool = validation_pool(); + let execution_mode = execution_mode(); let result = parachain::wasm_executor::validate_candidate( halt::wasm_binary_unwrap(), @@ -45,7 +44,7 @@ fn terminates_on_timeout() { relay_chain_height: 1, hrmp_mqc_heads: Vec::new(), }, - parachain::wasm_executor::ExecutionMode::Remote(&pool), + &execution_mode, sp_core::testing::TaskExecutor::new(), ); match result { @@ -59,11 +58,11 @@ fn terminates_on_timeout() { #[test] fn parallel_execution() { - let pool = validation_pool(); + let execution_mode = execution_mode(); let start = std::time::Instant::now(); - let pool2 = pool.clone(); + let execution_mode2 = execution_mode.clone(); let thread = std::thread::spawn(move || parachain::wasm_executor::validate_candidate( halt::wasm_binary_unwrap(), @@ -73,7 +72,7 @@ fn parallel_execution() { relay_chain_height: 1, hrmp_mqc_heads: Vec::new(), }, - parachain::wasm_executor::ExecutionMode::Remote(&pool2), + &execution_mode, sp_core::testing::TaskExecutor::new(), ).ok()); let _ = parachain::wasm_executor::validate_candidate( @@ -84,7 +83,7 @@ fn parallel_execution() { relay_chain_height: 1, hrmp_mqc_heads: Vec::new(), }, - parachain::wasm_executor::ExecutionMode::Remote(&pool), + &execution_mode2, sp_core::testing::TaskExecutor::new(), ); thread.join().unwrap();