mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-04-29 07:58:00 +00:00
Make ValidationPool accepts execution mode to run custom command or in process validation (#1622)
* Initial commit Forked at:cc19f13468Parent branch: origin/master * Propagate test mode all the way down to ValidationPool * Update validation/src/validation_service/mod.rs * Fix test * WIP Forked at:cc19f13468Parent branch: origin/master * Update service/src/lib.rs Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com> * Adapt code to review suggestions * Run validation inside the same process * Add test * CLEANUP Forked at:cc19f13468Parent branch: origin/master Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com>
This commit is contained in:
@@ -35,8 +35,10 @@ use polkadot_primitives::v1::{
|
||||
ValidationCode, PoV, CandidateDescriptor, ValidationData, PersistedValidationData,
|
||||
TransientValidationData, OccupiedCoreAssumption, Hash,
|
||||
};
|
||||
use polkadot_parachain::wasm_executor::{self, ValidationPool, ExecutionMode, ValidationError,
|
||||
InvalidCandidate as WasmInvalidCandidate};
|
||||
use polkadot_parachain::wasm_executor::{
|
||||
self, ValidationPool, ExecutionMode, ValidationError,
|
||||
InvalidCandidate as WasmInvalidCandidate, ValidationExecutionMode,
|
||||
};
|
||||
use polkadot_parachain::primitives::{ValidationResult as WasmValidationResult, ValidationParams};
|
||||
|
||||
use parity_scale_codec::Encode;
|
||||
@@ -128,7 +130,7 @@ async fn run(
|
||||
)
|
||||
-> SubsystemResult<()>
|
||||
{
|
||||
let pool = ValidationPool::new();
|
||||
let pool = ValidationPool::new(ValidationExecutionMode::ExternalProcessSelfHost);
|
||||
|
||||
loop {
|
||||
match ctx.recv().await? {
|
||||
|
||||
@@ -28,7 +28,7 @@ use sp_externalities::Extensions;
|
||||
use sp_wasm_interface::HostFunctions as _;
|
||||
|
||||
#[cfg(not(any(target_os = "android", target_os = "unknown")))]
|
||||
pub use validation_host::{run_worker, ValidationPool, EXECUTION_TIMEOUT_SEC};
|
||||
pub use validation_host::{run_worker, ValidationPool, EXECUTION_TIMEOUT_SEC, ValidationExecutionMode};
|
||||
|
||||
mod validation_host;
|
||||
|
||||
@@ -66,8 +66,6 @@ pub enum ExecutionMode<'a> {
|
||||
Local,
|
||||
/// Remote execution in a spawned process.
|
||||
Remote(&'a ValidationPool),
|
||||
/// Remote execution in a spawned test runner.
|
||||
RemoteTest(&'a ValidationPool),
|
||||
}
|
||||
|
||||
#[derive(Debug, derive_more::Display, derive_more::From)]
|
||||
@@ -143,11 +141,7 @@ pub fn validate_candidate(
|
||||
},
|
||||
#[cfg(not(any(target_os = "android", target_os = "unknown")))]
|
||||
ExecutionMode::Remote(pool) => {
|
||||
pool.validate_candidate(validation_code, params, false)
|
||||
},
|
||||
#[cfg(not(any(target_os = "android", target_os = "unknown")))]
|
||||
ExecutionMode::RemoteTest(pool) => {
|
||||
pool.validate_candidate(validation_code, params, true)
|
||||
pool.validate_candidate(validation_code, params)
|
||||
},
|
||||
#[cfg(any(target_os = "android", target_os = "unknown"))]
|
||||
ExecutionMode::Remote(_pool) =>
|
||||
@@ -156,13 +150,6 @@ pub fn validate_candidate(
|
||||
"Remote validator not available".to_string()
|
||||
) as Box<_>
|
||||
))),
|
||||
#[cfg(any(target_os = "android", target_os = "unknown"))]
|
||||
ExecutionMode::RemoteTest(_pool) =>
|
||||
Err(ValidationError::Internal(InternalError::System(
|
||||
Box::<dyn std::error::Error + Send + Sync>::from(
|
||||
"Remote validator not available".to_string()
|
||||
) as Box<_>
|
||||
))),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -16,7 +16,7 @@
|
||||
|
||||
#![cfg(not(any(target_os = "android", target_os = "unknown")))]
|
||||
|
||||
use std::{process, env, sync::Arc, sync::atomic};
|
||||
use std::{process, env, sync::Arc, sync::atomic, path::PathBuf};
|
||||
use codec::{Decode, Encode};
|
||||
use crate::primitives::{ValidationParams, ValidationResult};
|
||||
use super::{
|
||||
@@ -29,7 +29,6 @@ use log::{debug, trace};
|
||||
use futures::executor::ThreadPool;
|
||||
use sp_core::traits::SpawnNamed;
|
||||
|
||||
const WORKER_ARGS_TEST: &[&'static str] = &["--nocapture", "validation_worker"];
|
||||
/// CLI Argument to start in validation worker mode.
|
||||
const WORKER_ARG: &'static str = "validation-worker";
|
||||
const WORKER_ARGS: &[&'static str] = &[WORKER_ARG];
|
||||
@@ -66,19 +65,40 @@ impl SpawnNamed for TaskExecutor {
|
||||
}
|
||||
}
|
||||
|
||||
/// The execution mode for the `ValidationPool`.
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum ValidationExecutionMode {
|
||||
/// The validation worker is ran in a thread inside the same process.
|
||||
InProcess,
|
||||
/// The validation worker is ran using the process' executable and the subcommand `validation-worker` is passed
|
||||
/// following by the address of the shared memory.
|
||||
ExternalProcessSelfHost,
|
||||
/// The validation worker is ran using the command provided and the argument provided. The address of the shared
|
||||
/// memory is added at the end of the arguments.
|
||||
ExternalProcessCustomHost {
|
||||
/// Path to the validation worker. The file must exists and be executable.
|
||||
binary: PathBuf,
|
||||
/// List of arguments passed to the validation worker. The address of the shared memory will be automatically
|
||||
/// added after the arguments.
|
||||
args: Vec<String>,
|
||||
},
|
||||
}
|
||||
|
||||
/// A pool of hosts.
|
||||
#[derive(Clone)]
|
||||
pub struct ValidationPool {
|
||||
hosts: Arc<Vec<Mutex<ValidationHost>>>,
|
||||
execution_mode: ValidationExecutionMode,
|
||||
}
|
||||
|
||||
const DEFAULT_NUM_HOSTS: usize = 8;
|
||||
|
||||
impl ValidationPool {
|
||||
/// Creates a validation pool with the default configuration.
|
||||
pub fn new() -> ValidationPool {
|
||||
pub fn new(execution_mode: ValidationExecutionMode) -> ValidationPool {
|
||||
ValidationPool {
|
||||
hosts: Arc::new((0..DEFAULT_NUM_HOSTS).map(|_| Default::default()).collect()),
|
||||
execution_mode,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -90,16 +110,15 @@ impl ValidationPool {
|
||||
&self,
|
||||
validation_code: &[u8],
|
||||
params: ValidationParams,
|
||||
test_mode: bool,
|
||||
) -> Result<ValidationResult, ValidationError> {
|
||||
for host in self.hosts.iter() {
|
||||
if let Some(mut host) = host.try_lock() {
|
||||
return host.validate_candidate(validation_code, params, test_mode);
|
||||
return host.validate_candidate(validation_code, params, self.execution_mode.clone());
|
||||
}
|
||||
}
|
||||
|
||||
// all workers are busy, just wait for the first one
|
||||
self.hosts[0].lock().validate_candidate(validation_code, params, test_mode)
|
||||
self.hosts[0].lock().validate_candidate(validation_code, params, self.execution_mode.clone())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -208,6 +227,7 @@ unsafe impl Send for ValidationHost {}
|
||||
#[derive(Default)]
|
||||
struct ValidationHost {
|
||||
worker: Option<process::Child>,
|
||||
worker_thread: Option<std::thread::JoinHandle<Result<(), String>>>,
|
||||
memory: Option<SharedMem>,
|
||||
id: u32,
|
||||
}
|
||||
@@ -233,7 +253,7 @@ impl ValidationHost {
|
||||
Ok(mem_config.create()?)
|
||||
}
|
||||
|
||||
fn start_worker(&mut self, test_mode: bool) -> Result<(), InternalError> {
|
||||
fn start_worker(&mut self, execution_mode: ValidationExecutionMode) -> Result<(), InternalError> {
|
||||
if let Some(ref mut worker) = self.worker {
|
||||
// Check if still alive
|
||||
if let Ok(None) = worker.try_wait() {
|
||||
@@ -241,17 +261,38 @@ impl ValidationHost {
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
if self.worker_thread.is_some() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let memory = Self::create_memory()?;
|
||||
let self_path = env::current_exe()?;
|
||||
debug!("Starting worker at {:?}", self_path);
|
||||
let mut args = if test_mode { WORKER_ARGS_TEST.to_vec() } else { WORKER_ARGS.to_vec() };
|
||||
args.push(memory.get_os_path());
|
||||
let worker = process::Command::new(self_path)
|
||||
.args(args)
|
||||
.stdin(process::Stdio::piped())
|
||||
.spawn()?;
|
||||
self.id = worker.id();
|
||||
self.worker = Some(worker);
|
||||
|
||||
let mut run_worker_process = |cmd: PathBuf, args: Vec<String>| -> Result<(), std::io::Error> {
|
||||
debug!("Starting worker at {:?} with arguments: {:?} and {:?}", cmd, args, memory.get_os_path());
|
||||
let worker = process::Command::new(cmd)
|
||||
.args(args)
|
||||
.arg(memory.get_os_path())
|
||||
.stdin(process::Stdio::piped())
|
||||
.spawn()?;
|
||||
self.id = worker.id();
|
||||
self.worker = Some(worker);
|
||||
Ok(())
|
||||
};
|
||||
|
||||
match execution_mode {
|
||||
ValidationExecutionMode::InProcess => {
|
||||
let mem_id = memory.get_os_path().to_string();
|
||||
self.worker_thread = Some(std::thread::spawn(move || run_worker(mem_id.as_str())));
|
||||
},
|
||||
ValidationExecutionMode::ExternalProcessSelfHost => run_worker_process(
|
||||
env::current_exe()?,
|
||||
WORKER_ARGS.iter().map(|x| x.to_string()).collect(),
|
||||
)?,
|
||||
ValidationExecutionMode::ExternalProcessCustomHost { binary, args } => run_worker_process(
|
||||
binary,
|
||||
args,
|
||||
)?,
|
||||
};
|
||||
|
||||
memory.wait(
|
||||
Event::WorkerReady as usize,
|
||||
@@ -268,13 +309,13 @@ impl ValidationHost {
|
||||
&mut self,
|
||||
validation_code: &[u8],
|
||||
params: ValidationParams,
|
||||
test_mode: bool,
|
||||
execution_mode: ValidationExecutionMode,
|
||||
) -> Result<ValidationResult, ValidationError> {
|
||||
if validation_code.len() > MAX_CODE_MEM {
|
||||
return Err(ValidationError::InvalidCandidate(InvalidCandidate::CodeTooLarge(validation_code.len())));
|
||||
}
|
||||
// First, check if need to spawn the child process
|
||||
self.start_worker(test_mode)?;
|
||||
self.start_worker(execution_mode)?;
|
||||
let memory = self.memory.as_mut()
|
||||
.expect("memory is always `Some` after `start_worker` completes successfully");
|
||||
{
|
||||
|
||||
@@ -16,11 +16,16 @@
|
||||
|
||||
//! Basic parachain that adds a number as part of its state.
|
||||
|
||||
use parachain::primitives::{
|
||||
RelayChainBlockNumber,
|
||||
BlockData as GenericBlockData,
|
||||
HeadData as GenericHeadData,
|
||||
ValidationParams,
|
||||
const WORKER_ARGS_TEST: &[&'static str] = &["--nocapture", "validation_worker"];
|
||||
|
||||
use parachain::{
|
||||
primitives::{
|
||||
RelayChainBlockNumber,
|
||||
BlockData as GenericBlockData,
|
||||
HeadData as GenericHeadData,
|
||||
ValidationParams,
|
||||
},
|
||||
wasm_executor::{ValidationPool, ValidationExecutionMode}
|
||||
};
|
||||
use codec::{Decode, Encode};
|
||||
|
||||
@@ -52,8 +57,28 @@ fn hash_head(head: &HeadData) -> [u8; 32] {
|
||||
tiny_keccak::keccak256(head.encode().as_slice())
|
||||
}
|
||||
|
||||
fn validation_pool() -> ValidationPool {
|
||||
let execution_mode = ValidationExecutionMode::ExternalProcessCustomHost {
|
||||
binary: std::env::current_exe().unwrap(),
|
||||
args: WORKER_ARGS_TEST.iter().map(|x| x.to_string()).collect(),
|
||||
};
|
||||
|
||||
ValidationPool::new(execution_mode)
|
||||
}
|
||||
|
||||
#[test]
|
||||
pub fn execute_good_on_parent() {
|
||||
fn execute_good_on_parent_with_inprocess_validation() {
|
||||
let pool = ValidationPool::new(ValidationExecutionMode::InProcess);
|
||||
execute_good_on_parent(pool);
|
||||
}
|
||||
|
||||
#[test]
|
||||
pub fn execute_good_on_parent_with_external_process_validation() {
|
||||
let pool = validation_pool();
|
||||
execute_good_on_parent(pool);
|
||||
}
|
||||
|
||||
fn execute_good_on_parent(pool: ValidationPool) {
|
||||
let parent_head = HeadData {
|
||||
number: 0,
|
||||
parent_hash: [0; 32],
|
||||
@@ -65,7 +90,6 @@ pub fn execute_good_on_parent() {
|
||||
add: 512,
|
||||
};
|
||||
|
||||
let pool = parachain::wasm_executor::ValidationPool::new();
|
||||
|
||||
let ret = parachain::wasm_executor::validate_candidate(
|
||||
adder::wasm_binary_unwrap(),
|
||||
@@ -75,7 +99,7 @@ pub fn execute_good_on_parent() {
|
||||
relay_chain_height: 1,
|
||||
hrmp_mqc_heads: Vec::new(),
|
||||
},
|
||||
parachain::wasm_executor::ExecutionMode::RemoteTest(&pool),
|
||||
parachain::wasm_executor::ExecutionMode::Remote(&pool),
|
||||
sp_core::testing::TaskExecutor::new(),
|
||||
).unwrap();
|
||||
|
||||
@@ -91,7 +115,7 @@ fn execute_good_chain_on_parent() {
|
||||
let mut number = 0;
|
||||
let mut parent_hash = [0; 32];
|
||||
let mut last_state = 0;
|
||||
let pool = parachain::wasm_executor::ValidationPool::new();
|
||||
let pool = validation_pool();
|
||||
|
||||
for add in 0..10 {
|
||||
let parent_head = HeadData {
|
||||
@@ -113,7 +137,7 @@ fn execute_good_chain_on_parent() {
|
||||
relay_chain_height: number as RelayChainBlockNumber + 1,
|
||||
hrmp_mqc_heads: Vec::new(),
|
||||
},
|
||||
parachain::wasm_executor::ExecutionMode::RemoteTest(&pool),
|
||||
parachain::wasm_executor::ExecutionMode::Remote(&pool),
|
||||
sp_core::testing::TaskExecutor::new(),
|
||||
).unwrap();
|
||||
|
||||
@@ -131,7 +155,7 @@ fn execute_good_chain_on_parent() {
|
||||
|
||||
#[test]
|
||||
fn execute_bad_on_parent() {
|
||||
let pool = parachain::wasm_executor::ValidationPool::new();
|
||||
let pool = validation_pool();
|
||||
|
||||
let parent_head = HeadData {
|
||||
number: 0,
|
||||
@@ -152,7 +176,7 @@ fn execute_bad_on_parent() {
|
||||
relay_chain_height: 1,
|
||||
hrmp_mqc_heads: Vec::new(),
|
||||
},
|
||||
parachain::wasm_executor::ExecutionMode::RemoteTest(&pool),
|
||||
parachain::wasm_executor::ExecutionMode::Remote(&pool),
|
||||
sp_core::testing::TaskExecutor::new(),
|
||||
).unwrap_err();
|
||||
}
|
||||
|
||||
@@ -16,15 +16,26 @@
|
||||
|
||||
//! Basic parachain that adds a number as part of its state.
|
||||
|
||||
const WORKER_ARGS_TEST: &[&'static str] = &["--nocapture", "validation_worker"];
|
||||
|
||||
use crate::adder;
|
||||
use parachain::{
|
||||
primitives::{BlockData, ValidationParams},
|
||||
wasm_executor::{ValidationError, InvalidCandidate, EXECUTION_TIMEOUT_SEC},
|
||||
wasm_executor::{ValidationError, InvalidCandidate, EXECUTION_TIMEOUT_SEC, ValidationExecutionMode, ValidationPool},
|
||||
};
|
||||
|
||||
fn validation_pool() -> ValidationPool {
|
||||
let execution_mode = ValidationExecutionMode::ExternalProcessCustomHost {
|
||||
binary: std::env::current_exe().unwrap(),
|
||||
args: WORKER_ARGS_TEST.iter().map(|x| x.to_string()).collect(),
|
||||
};
|
||||
|
||||
ValidationPool::new(execution_mode)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn terminates_on_timeout() {
|
||||
let pool = parachain::wasm_executor::ValidationPool::new();
|
||||
let pool = validation_pool();
|
||||
|
||||
let result = parachain::wasm_executor::validate_candidate(
|
||||
halt::wasm_binary_unwrap(),
|
||||
@@ -34,7 +45,7 @@ fn terminates_on_timeout() {
|
||||
relay_chain_height: 1,
|
||||
hrmp_mqc_heads: Vec::new(),
|
||||
},
|
||||
parachain::wasm_executor::ExecutionMode::RemoteTest(&pool),
|
||||
parachain::wasm_executor::ExecutionMode::Remote(&pool),
|
||||
sp_core::testing::TaskExecutor::new(),
|
||||
);
|
||||
match result {
|
||||
@@ -43,12 +54,12 @@ fn terminates_on_timeout() {
|
||||
}
|
||||
|
||||
// check that another parachain can validate normaly
|
||||
adder::execute_good_on_parent();
|
||||
adder::execute_good_on_parent_with_external_process_validation();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parallel_execution() {
|
||||
let pool = parachain::wasm_executor::ValidationPool::new();
|
||||
let pool = validation_pool();
|
||||
|
||||
let start = std::time::Instant::now();
|
||||
|
||||
@@ -62,7 +73,7 @@ fn parallel_execution() {
|
||||
relay_chain_height: 1,
|
||||
hrmp_mqc_heads: Vec::new(),
|
||||
},
|
||||
parachain::wasm_executor::ExecutionMode::RemoteTest(&pool2),
|
||||
parachain::wasm_executor::ExecutionMode::Remote(&pool2),
|
||||
sp_core::testing::TaskExecutor::new(),
|
||||
).ok());
|
||||
let _ = parachain::wasm_executor::validate_candidate(
|
||||
@@ -73,7 +84,7 @@ fn parallel_execution() {
|
||||
relay_chain_height: 1,
|
||||
hrmp_mqc_heads: Vec::new(),
|
||||
},
|
||||
parachain::wasm_executor::ExecutionMode::RemoteTest(&pool),
|
||||
parachain::wasm_executor::ExecutionMode::Remote(&pool),
|
||||
sp_core::testing::TaskExecutor::new(),
|
||||
);
|
||||
thread.join().unwrap();
|
||||
|
||||
Reference in New Issue
Block a user