mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-05-30 12:51:02 +00:00
Allow parallel parachain validation (#370)
* Allow parallel parachain validation * Fixed test interference * Switch to pooled implementation * Apply suggestions from code review Co-Authored-By: Bastian Köcher <bkchr@users.noreply.github.com> * Update validation_host.rs * Minor cleanups * Fixed build
This commit is contained in:
committed by
Robert Habermeier
parent
7ae4f606c1
commit
908473c26b
@@ -27,10 +27,13 @@ use wasmi::{
|
|||||||
ModuleImportResolver, RuntimeValue, Externals, Error as WasmError, ValueType,
|
ModuleImportResolver, RuntimeValue, Externals, Error as WasmError, ValueType,
|
||||||
memory_units::{self, Bytes, Pages, RoundUpTo}
|
memory_units::{self, Bytes, Pages, RoundUpTo}
|
||||||
};
|
};
|
||||||
use super::{ValidationParams, ValidationResult, MessageRef, UpwardMessageRef, UpwardMessage, IncomingMessage};
|
use super::{
|
||||||
|
ValidationParams, ValidationResult, MessageRef, UpwardMessageRef,
|
||||||
|
UpwardMessage, IncomingMessage};
|
||||||
|
|
||||||
#[cfg(not(target_os = "unknown"))]
|
#[cfg(not(target_os = "unknown"))]
|
||||||
pub use validation_host::run_worker;
|
pub use validation_host::run_worker;
|
||||||
|
pub use validation_host::EXECUTION_TIMEOUT_SEC;
|
||||||
|
|
||||||
mod validation_host;
|
mod validation_host;
|
||||||
|
|
||||||
@@ -336,10 +339,10 @@ pub fn validate_candidate<E: Externalities>(
|
|||||||
},
|
},
|
||||||
#[cfg(not(target_os = "unknown"))]
|
#[cfg(not(target_os = "unknown"))]
|
||||||
ExecutionMode::Remote =>
|
ExecutionMode::Remote =>
|
||||||
validation_host::HOST.lock().validate_candidate(validation_code, params, externalities, false),
|
validation_host::validate_candidate(validation_code, params, externalities, false),
|
||||||
#[cfg(not(target_os = "unknown"))]
|
#[cfg(not(target_os = "unknown"))]
|
||||||
ExecutionMode::RemoteTest =>
|
ExecutionMode::RemoteTest =>
|
||||||
validation_host::HOST.lock().validate_candidate(validation_code, params, externalities, true),
|
validation_host::validate_candidate(validation_code, params, externalities, true),
|
||||||
#[cfg(target_os = "unknown")]
|
#[cfg(target_os = "unknown")]
|
||||||
ExecutionMode::Remote =>
|
ExecutionMode::Remote =>
|
||||||
Err(Error::System("Remote validator not available".to_string().into())),
|
Err(Error::System("Remote validator not available".to_string().into())),
|
||||||
|
|||||||
@@ -18,7 +18,8 @@
|
|||||||
|
|
||||||
use std::{process, env, sync::Arc, sync::atomic};
|
use std::{process, env, sync::Arc, sync::atomic};
|
||||||
use crate::codec::{Decode, Encode};
|
use crate::codec::{Decode, Encode};
|
||||||
use crate::{ValidationParams, ValidationResult, MessageRef, UpwardMessageRef, UpwardMessage, IncomingMessage};
|
use crate::{ValidationParams, ValidationResult, MessageRef,
|
||||||
|
UpwardMessageRef, UpwardMessage, IncomingMessage};
|
||||||
use super::{validate_candidate_internal, Error, Externalities, WorkerExternalities};
|
use super::{validate_candidate_internal, Error, Externalities, WorkerExternalities};
|
||||||
use super::{MAX_CODE_MEM, MAX_RUNTIME_MEM};
|
use super::{MAX_CODE_MEM, MAX_RUNTIME_MEM};
|
||||||
use shared_memory::{SharedMem, SharedMemConf, EventState, WriteLockable, EventWait, EventSet};
|
use shared_memory::{SharedMem, SharedMemConf, EventState, WriteLockable, EventWait, EventSet};
|
||||||
@@ -33,6 +34,11 @@ const WORKER_ARGS_TEST: &[&'static str] = &["--nocapture", "validation_worker"];
|
|||||||
const WORKER_ARG: &'static str = "validation-worker";
|
const WORKER_ARG: &'static str = "validation-worker";
|
||||||
const WORKER_ARGS: &[&'static str] = &[WORKER_ARG];
|
const WORKER_ARGS: &[&'static str] = &[WORKER_ARG];
|
||||||
|
|
||||||
|
const NUM_HOSTS: usize = 8;
|
||||||
|
|
||||||
|
/// Execution timeout in seconds;
|
||||||
|
pub const EXECUTION_TIMEOUT_SEC: u64 = 5;
|
||||||
|
|
||||||
enum Event {
|
enum Event {
|
||||||
CandidateReady = 0,
|
CandidateReady = 0,
|
||||||
ResultReady = 1,
|
ResultReady = 1,
|
||||||
@@ -40,7 +46,7 @@ enum Event {
|
|||||||
}
|
}
|
||||||
|
|
||||||
lazy_static::lazy_static! {
|
lazy_static::lazy_static! {
|
||||||
pub static ref HOST: Mutex<ValidationHost> = Mutex::new(ValidationHost::new());
|
static ref HOSTS: [Mutex<ValidationHost>; NUM_HOSTS] = Default::default();
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Validation worker process entry point. Runs a loop waiting for canidates to validate
|
/// Validation worker process entry point. Runs a loop waiting for canidates to validate
|
||||||
@@ -154,12 +160,32 @@ pub enum ValidationResultHeader {
|
|||||||
|
|
||||||
unsafe impl Send for ValidationHost {}
|
unsafe impl Send for ValidationHost {}
|
||||||
|
|
||||||
pub struct ValidationHost {
|
#[derive(Default)]
|
||||||
|
struct ValidationHost {
|
||||||
worker: Option<process::Child>,
|
worker: Option<process::Child>,
|
||||||
memory: Option<SharedMem>,
|
memory: Option<SharedMem>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/// Validate a candidate under the given validation code.
|
||||||
|
///
|
||||||
|
/// This will fail if the validation code is not a proper parachain validation module.
|
||||||
|
pub fn validate_candidate<E: Externalities>(
|
||||||
|
validation_code: &[u8],
|
||||||
|
params: ValidationParams,
|
||||||
|
externalities: &mut E,
|
||||||
|
test_mode: bool,
|
||||||
|
) -> Result<ValidationResult, Error> {
|
||||||
|
for host in HOSTS.iter() {
|
||||||
|
if let Some(mut host) = host.try_lock() {
|
||||||
|
return host.validate_candidate(validation_code, params, externalities, test_mode);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// all workers are busy, just wait for the first one
|
||||||
|
HOSTS[0].lock().validate_candidate(validation_code, params, externalities, test_mode)
|
||||||
|
}
|
||||||
|
|
||||||
impl Drop for ValidationHost {
|
impl Drop for ValidationHost {
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
if let Some(ref mut worker) = &mut self.worker {
|
if let Some(ref mut worker) = &mut self.worker {
|
||||||
@@ -176,18 +202,11 @@ impl ValidationHost {
|
|||||||
.add_lock(shared_memory::LockType::Mutex, 0, mem_size)?
|
.add_lock(shared_memory::LockType::Mutex, 0, mem_size)?
|
||||||
.add_event(shared_memory::EventType::Auto)? // Event::CandidateReady
|
.add_event(shared_memory::EventType::Auto)? // Event::CandidateReady
|
||||||
.add_event(shared_memory::EventType::Auto)? // Event::ResultReady
|
.add_event(shared_memory::EventType::Auto)? // Event::ResultReady
|
||||||
.add_event(shared_memory::EventType::Auto)?; // Evebt::WorkerReady
|
.add_event(shared_memory::EventType::Auto)?; // Event::WorkerReady
|
||||||
|
|
||||||
Ok(mem_config.create()?)
|
Ok(mem_config.create()?)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn new() -> ValidationHost {
|
|
||||||
ValidationHost {
|
|
||||||
worker: None,
|
|
||||||
memory: None,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn start_worker(&mut self, test_mode: bool) -> Result<(), Error> {
|
fn start_worker(&mut self, test_mode: bool) -> Result<(), Error> {
|
||||||
if let Some(ref mut worker) = self.worker {
|
if let Some(ref mut worker) = self.worker {
|
||||||
// Check if still alive
|
// Check if still alive
|
||||||
@@ -207,7 +226,7 @@ impl ValidationHost {
|
|||||||
.spawn()?;
|
.spawn()?;
|
||||||
self.worker = Some(worker);
|
self.worker = Some(worker);
|
||||||
|
|
||||||
memory.wait(Event::WorkerReady as usize, shared_memory::Timeout::Sec(5))?;
|
memory.wait(Event::WorkerReady as usize, shared_memory::Timeout::Sec(EXECUTION_TIMEOUT_SEC as usize))?;
|
||||||
self.memory = Some(memory);
|
self.memory = Some(memory);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@@ -221,8 +240,7 @@ impl ValidationHost {
|
|||||||
params: ValidationParams,
|
params: ValidationParams,
|
||||||
externalities: &mut E,
|
externalities: &mut E,
|
||||||
test_mode: bool,
|
test_mode: bool,
|
||||||
) -> Result<ValidationResult, Error>
|
) -> Result<ValidationResult, Error> {
|
||||||
{
|
|
||||||
if validation_code.len() > MAX_CODE_MEM {
|
if validation_code.len() > MAX_CODE_MEM {
|
||||||
return Err(Error::CodeTooLarge(validation_code.len()));
|
return Err(Error::CodeTooLarge(validation_code.len()));
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -18,7 +18,7 @@
|
|||||||
|
|
||||||
use polkadot_parachain as parachain;
|
use polkadot_parachain as parachain;
|
||||||
use crate::{adder, DummyExt};
|
use crate::{adder, DummyExt};
|
||||||
use crate::parachain::ValidationParams;
|
use crate::parachain::{ValidationParams, wasm_executor::EXECUTION_TIMEOUT_SEC};
|
||||||
|
|
||||||
// Code that exposes `validate_block` and loops infinitely
|
// Code that exposes `validate_block` and loops infinitely
|
||||||
const INFINITE_LOOP_CODE: &[u8] = halt::WASM_BINARY;
|
const INFINITE_LOOP_CODE: &[u8] = halt::WASM_BINARY;
|
||||||
@@ -43,3 +43,35 @@ fn terminates_on_timeout() {
|
|||||||
// check that another parachain can validate normaly
|
// check that another parachain can validate normaly
|
||||||
adder::execute_good_on_parent();
|
adder::execute_good_on_parent();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn parallel_execution() {
|
||||||
|
let start = std::time::Instant::now();
|
||||||
|
let thread = std::thread::spawn(move ||
|
||||||
|
parachain::wasm_executor::validate_candidate(
|
||||||
|
INFINITE_LOOP_CODE,
|
||||||
|
ValidationParams {
|
||||||
|
parent_head: Default::default(),
|
||||||
|
block_data: Vec::new(),
|
||||||
|
ingress: Vec::new(),
|
||||||
|
},
|
||||||
|
&mut DummyExt,
|
||||||
|
parachain::wasm_executor::ExecutionMode::RemoteTest,
|
||||||
|
).ok());
|
||||||
|
let _ = parachain::wasm_executor::validate_candidate(
|
||||||
|
INFINITE_LOOP_CODE,
|
||||||
|
ValidationParams {
|
||||||
|
parent_head: Default::default(),
|
||||||
|
block_data: Vec::new(),
|
||||||
|
ingress: Vec::new(),
|
||||||
|
},
|
||||||
|
&mut DummyExt,
|
||||||
|
parachain::wasm_executor::ExecutionMode::RemoteTest,
|
||||||
|
);
|
||||||
|
thread.join().unwrap();
|
||||||
|
// total time should be < 2 x EXECUTION_TIMEOUT_SEC
|
||||||
|
assert!(
|
||||||
|
std::time::Instant::now().duration_since(start)
|
||||||
|
< std::time::Duration::from_secs(EXECUTION_TIMEOUT_SEC * 2)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user