mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-05-30 10:31:03 +00:00
Parachain validation moved to external process (#325)
* Improved execution & tests * Style * Made CLI arg const * Moved Upwards message * CLI subcommand for validation worker * Build halting parachain * Build halting parachain * Made stuff private * Reorganized parachain tests * Comment * Whitespace * Apply suggestions from code review Co-Authored-By: Bastian Köcher <bkchr@users.noreply.github.com> * Fixed call data size check and introduced an enum * Apply suggestions from code review Co-Authored-By: Bastian Köcher <bkchr@users.noreply.github.com>
This commit is contained in:
committed by
Bastian Köcher
parent
157cd9a217
commit
f1fdb0cb83
@@ -207,3 +207,13 @@ pub struct UpwardMessageRef<'a> {
|
||||
/// Underlying data of the message.
|
||||
pub data: &'a [u8],
|
||||
}
|
||||
|
||||
/// A message from a parachain to its Relay Chain.
|
||||
#[derive(Clone, PartialEq, Eq, Encode, Decode)]
|
||||
#[cfg_attr(feature = "std", derive(Debug))]
|
||||
pub struct UpwardMessage {
|
||||
/// The origin for the message to be sent from.
|
||||
pub origin: ParachainDispatchOrigin,
|
||||
/// The message data.
|
||||
pub data: Vec<u8>,
|
||||
}
|
||||
|
||||
@@ -20,14 +20,39 @@
|
||||
//! Assuming the parameters are correct, this module provides a wrapper around
|
||||
//! a WASM VM for re-execution of a parachain candidate.
|
||||
|
||||
use std::{cell::RefCell, fmt, convert::TryInto};
|
||||
use std::{cell::RefCell, fmt, convert::TryInto, process, env};
|
||||
use std::sync::{Arc, atomic};
|
||||
use crate::codec::{Decode, Encode};
|
||||
use wasmi::{
|
||||
self, Module, ModuleInstance, Trap, MemoryInstance, MemoryDescriptor, MemoryRef,
|
||||
ModuleImportResolver, RuntimeValue, Externals, Error as WasmError, ValueType,
|
||||
memory_units::{self, Bytes, Pages, RoundUpTo}
|
||||
};
|
||||
use super::{ValidationParams, ValidationResult, MessageRef, UpwardMessageRef};
|
||||
use super::{ValidationParams, ValidationResult, MessageRef, UpwardMessageRef, UpwardMessage, IncomingMessage};
|
||||
use shared_memory::{SharedMem, SharedMemConf, EventState, WriteLockable, EventWait, EventSet};
|
||||
use parking_lot::Mutex;
|
||||
use log::{trace, debug};
|
||||
|
||||
// maximum memory in bytes
|
||||
const MAX_RUNTIME_MEM: usize = 1024 * 1024 * 1024; // 1 GiB
|
||||
const MAX_CODE_MEM: usize = 16 * 1024 * 1024; // 16 MiB
|
||||
// Message data limit
|
||||
const MAX_MESSAGE_MEM: usize = 16 * 1024 * 1024; // 16 MiB
|
||||
|
||||
const WORKER_ARGS_TEST: &[&'static str] = &["--nocapture", "validation_worker"];
|
||||
/// CLI Argument to start in validation worker mode.
|
||||
const WORKER_ARG: &'static str = "validation-worker";
|
||||
const WORKER_ARGS: &[&'static str] = &[WORKER_ARG];
|
||||
|
||||
enum Event {
|
||||
CandidateReady = 0,
|
||||
ResultReady = 1,
|
||||
WorkerReady = 2,
|
||||
}
|
||||
|
||||
lazy_static::lazy_static! {
|
||||
static ref HOST: Mutex<ValidationHost> = Mutex::new(ValidationHost::new());
|
||||
}
|
||||
|
||||
mod ids {
|
||||
/// Post a message to another parachain.
|
||||
@@ -37,6 +62,16 @@ mod ids {
|
||||
pub const POST_UPWARDS_MESSAGE: usize = 2;
|
||||
}
|
||||
|
||||
/// WASM code execution mode.
|
||||
pub enum ExecutionMode {
|
||||
/// Execute in-process. The execution can not be interrupted or aborted.
|
||||
Local,
|
||||
/// Remote execution in a spawned process.
|
||||
Remote,
|
||||
/// Remote execution in a spawned test runner.
|
||||
RemoteTest,
|
||||
}
|
||||
|
||||
/// Error type for the wasm executor
|
||||
#[derive(Debug, derive_more::Display, derive_more::From)]
|
||||
pub enum Error {
|
||||
@@ -44,12 +79,23 @@ pub enum Error {
|
||||
Wasm(WasmError),
|
||||
/// Externalities error
|
||||
Externalities(ExternalitiesError),
|
||||
/// Call data too big. WASM32 only has a 32-bit address space.
|
||||
#[display(fmt = "Validation parameters took up {} bytes, max allowed by WASM is {}", _0, i32::max_value())]
|
||||
/// Code size it too large.
|
||||
#[display(fmt = "WASM code is {} bytes, max allowed is {}", _0, MAX_CODE_MEM)]
|
||||
CodeTooLarge(usize),
|
||||
/// Call data is too large.
|
||||
#[display(fmt = "Validation parameters are {} bytes, max allowed is {}", _0, MAX_RUNTIME_MEM)]
|
||||
ParamsTooLarge(usize),
|
||||
/// Bad return data or type.
|
||||
#[display(fmt = "Validation function returned invalid data.")]
|
||||
BadReturn,
|
||||
#[display(fmt = "Validation function timeout.")]
|
||||
Timeout,
|
||||
#[display(fmt = "IO error: {}", _0)]
|
||||
Io(std::io::Error),
|
||||
#[display(fmt = "System error: {}", _0)]
|
||||
System(Box<dyn std::error::Error>),
|
||||
#[display(fmt = "WASM worker error: {}", _0)]
|
||||
External(String),
|
||||
}
|
||||
|
||||
impl std::error::Error for Error {
|
||||
@@ -57,6 +103,8 @@ impl std::error::Error for Error {
|
||||
match self {
|
||||
Error::Wasm(ref err) => Some(err),
|
||||
Error::Externalities(ref err) => Some(err),
|
||||
Error::Io(ref err) => Some(err),
|
||||
Error::System(ref err) => Some(&**err),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
@@ -238,6 +286,296 @@ impl<'a, E: 'a + Externalities> Externals for ValidationExternals<'a, E> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Params header in shared memory. All offsets should be aligned to WASM page size.
|
||||
#[derive(Encode, Decode, Debug)]
|
||||
struct ValidationHeader {
|
||||
code_size: u64,
|
||||
params_size: u64,
|
||||
}
|
||||
|
||||
#[derive(Encode, Decode, Debug)]
|
||||
pub enum ValidationResultHeader {
|
||||
Ok {
|
||||
result: ValidationResult,
|
||||
egress_message_count: u64,
|
||||
up_message_count: u64,
|
||||
},
|
||||
Error(String),
|
||||
}
|
||||
|
||||
|
||||
#[derive(Default)]
|
||||
struct WorkerExternalities {
|
||||
egress_data: Vec<u8>,
|
||||
egress_message_count: usize,
|
||||
up_data: Vec<u8>,
|
||||
up_message_count: usize,
|
||||
}
|
||||
|
||||
impl Externalities for WorkerExternalities {
|
||||
fn post_message(&mut self, message: MessageRef) -> Result<(), ExternalitiesError> {
|
||||
IncomingMessage {
|
||||
source: message.target,
|
||||
data: message.data.to_vec(),
|
||||
}
|
||||
.encode_to(&mut self.egress_data);
|
||||
self.egress_message_count += 1;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn post_upward_message(&mut self, message: UpwardMessageRef) -> Result<(), ExternalitiesError> {
|
||||
UpwardMessage {
|
||||
origin: message.origin,
|
||||
data: message.data.to_vec(),
|
||||
}
|
||||
.encode_to(&mut self.up_data);
|
||||
self.up_message_count += 1;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Validation worker process entry point. Runs a loop waiting for canidates to validate
|
||||
/// and sends back results via shared memory.
|
||||
pub fn run_worker(mem_id: &str) -> Result<(), String> {
|
||||
let mut memory = match SharedMem::open(mem_id) {
|
||||
Ok(memory) => memory,
|
||||
Err(e) => {
|
||||
debug!("Error opening shared memory: {:?}", e);
|
||||
return Err(format!("Error opening shared memory: {:?}", e));
|
||||
}
|
||||
};
|
||||
let mut externalities = WorkerExternalities::default();
|
||||
|
||||
let exit = Arc::new(atomic::AtomicBool::new(false));
|
||||
// spawn parent monitor thread
|
||||
let watch_exit = exit.clone();
|
||||
std::thread::spawn(move || {
|
||||
use std::io::Read;
|
||||
let mut in_data = Vec::new();
|
||||
std::io::stdin().read_to_end(&mut in_data).ok(); // pipe terminates when parent process exits
|
||||
debug!("Parent process is dead. Exiting");
|
||||
exit.store(true, atomic::Ordering::Relaxed);
|
||||
});
|
||||
|
||||
memory.set(Event::WorkerReady as usize, EventState::Signaled)
|
||||
.map_err(|e| format!("Error setting shared event: {:?}", e))?;
|
||||
|
||||
loop {
|
||||
if watch_exit.load(atomic::Ordering::Relaxed) {
|
||||
break;
|
||||
}
|
||||
|
||||
debug!("Waiting for candidate");
|
||||
match memory.wait(Event::CandidateReady as usize, shared_memory::Timeout::Sec(1)) {
|
||||
Err(e) => {
|
||||
// Timeout
|
||||
trace!("Timeout waiting for candidate: {:?}", e);
|
||||
continue;
|
||||
}
|
||||
Ok(()) => {}
|
||||
}
|
||||
|
||||
{
|
||||
debug!("Processing candidate");
|
||||
// we have candidate data
|
||||
let mut slice = memory.wlock_as_slice(0)
|
||||
.map_err(|e| format!("Error locking shared memory: {:?}", e))?;
|
||||
|
||||
let result = {
|
||||
let data: &mut[u8] = &mut **slice;
|
||||
let (header_buf, rest) = data.split_at_mut(1024);
|
||||
let mut header_buf: &[u8] = header_buf;
|
||||
let header = ValidationHeader::decode(&mut header_buf)
|
||||
.ok_or_else(|| format!("Error decoding validation request."))?;
|
||||
debug!("Candidate header: {:?}", header);
|
||||
let (code, rest) = rest.split_at_mut(MAX_CODE_MEM);
|
||||
let (code, _) = code.split_at_mut(header.code_size as usize);
|
||||
let (call_data, rest) = rest.split_at_mut(MAX_RUNTIME_MEM);
|
||||
let (call_data, _) = call_data.split_at_mut(header.params_size as usize);
|
||||
let message_data = rest;
|
||||
|
||||
let result = validate_candidate_internal(code, call_data, &mut externalities);
|
||||
debug!("Candidate validated: {:?}", result);
|
||||
|
||||
match result {
|
||||
Ok(r) => {
|
||||
if externalities.egress_data.len() + externalities.up_data.len() > MAX_MESSAGE_MEM {
|
||||
ValidationResultHeader::Error("Message data is too large".into())
|
||||
} else {
|
||||
let e_len = externalities.egress_data.len();
|
||||
let up_len = externalities.up_data.len();
|
||||
message_data[0..e_len].copy_from_slice(&externalities.egress_data);
|
||||
message_data[e_len..(e_len + up_len)].copy_from_slice(&externalities.up_data);
|
||||
ValidationResultHeader::Ok {
|
||||
result: r,
|
||||
egress_message_count: externalities.egress_message_count as u64,
|
||||
up_message_count: externalities.up_message_count as u64,
|
||||
}
|
||||
}
|
||||
},
|
||||
Err(e) => ValidationResultHeader::Error(e.to_string()),
|
||||
}
|
||||
};
|
||||
let mut data: &mut[u8] = &mut **slice;
|
||||
result.encode_to(&mut data);
|
||||
}
|
||||
debug!("Signaling result");
|
||||
memory.set(Event::ResultReady as usize, EventState::Signaled)
|
||||
.map_err(|e| format!("Error setting shared event: {:?}", e))?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
unsafe impl Send for ValidationHost {}
|
||||
|
||||
struct ValidationHost {
|
||||
worker: Option<process::Child>,
|
||||
memory: Option<SharedMem>,
|
||||
}
|
||||
|
||||
|
||||
impl Drop for ValidationHost {
|
||||
fn drop(&mut self) {
|
||||
if let Some(ref mut worker) = &mut self.worker {
|
||||
worker.kill().ok();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ValidationHost {
|
||||
fn create_memory() -> Result<SharedMem, Error> {
|
||||
let mem_size = MAX_RUNTIME_MEM + MAX_CODE_MEM + MAX_MESSAGE_MEM + 1024;
|
||||
let mem_config = SharedMemConf::new()
|
||||
.set_size(mem_size)
|
||||
.add_lock(shared_memory::LockType::Mutex, 0, mem_size)?
|
||||
.add_event(shared_memory::EventType::Auto)? // Event::CandidateReady
|
||||
.add_event(shared_memory::EventType::Auto)? // Event::ResultReady
|
||||
.add_event(shared_memory::EventType::Auto)?; // Evebt::WorkerReady
|
||||
|
||||
Ok(mem_config.create()?)
|
||||
}
|
||||
|
||||
fn new() -> ValidationHost {
|
||||
ValidationHost {
|
||||
worker: None,
|
||||
memory: None,
|
||||
}
|
||||
}
|
||||
|
||||
fn start_worker(&mut self, test_mode: bool) -> Result<(), Error> {
|
||||
if let Some(ref mut worker) = self.worker {
|
||||
// Check if still alive
|
||||
if let Ok(None) = worker.try_wait() {
|
||||
// Still running
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
let memory = Self::create_memory()?;
|
||||
let self_path = env::current_exe()?;
|
||||
debug!("Starting worker at {:?}", self_path);
|
||||
let mut args = if test_mode { WORKER_ARGS_TEST.to_vec() } else { WORKER_ARGS.to_vec() };
|
||||
args.push(memory.get_os_path());
|
||||
let worker = process::Command::new(self_path)
|
||||
.args(args)
|
||||
.stdin(process::Stdio::piped())
|
||||
.spawn()?;
|
||||
self.worker = Some(worker);
|
||||
|
||||
memory.wait(Event::WorkerReady as usize, shared_memory::Timeout::Sec(5))?;
|
||||
self.memory = Some(memory);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Validate a candidate under the given validation code.
|
||||
///
|
||||
/// This will fail if the validation code is not a proper parachain validation module.
|
||||
fn validate_candidate<E: Externalities>(
|
||||
&mut self,
|
||||
validation_code: &[u8],
|
||||
params: ValidationParams,
|
||||
externalities: &mut E,
|
||||
test_mode: bool,
|
||||
) -> Result<ValidationResult, Error>
|
||||
{
|
||||
if validation_code.len() > MAX_CODE_MEM {
|
||||
return Err(Error::CodeTooLarge(validation_code.len()));
|
||||
}
|
||||
// First, check if need to spawn the child process
|
||||
self.start_worker(test_mode)?;
|
||||
let memory = self.memory.as_mut().expect("memory is always `Some` after `start_worker` completes successfully");
|
||||
{
|
||||
// Put data in shared mem
|
||||
let data: &mut[u8] = &mut **memory.wlock_as_slice(0)?;
|
||||
let (mut header_buf, rest) = data.split_at_mut(1024);
|
||||
let (code, rest) = rest.split_at_mut(MAX_CODE_MEM);
|
||||
let (code, _) = code.split_at_mut(validation_code.len());
|
||||
let (call_data, _) = rest.split_at_mut(MAX_RUNTIME_MEM);
|
||||
code[..validation_code.len()].copy_from_slice(validation_code);
|
||||
let encoded_params = params.encode();
|
||||
if encoded_params.len() >= MAX_RUNTIME_MEM {
|
||||
return Err(Error::ParamsTooLarge(MAX_RUNTIME_MEM));
|
||||
}
|
||||
call_data[..encoded_params.len()].copy_from_slice(&encoded_params);
|
||||
|
||||
let header = ValidationHeader {
|
||||
code_size: validation_code.len() as u64,
|
||||
params_size: encoded_params.len() as u64,
|
||||
};
|
||||
|
||||
header.encode_to(&mut header_buf);
|
||||
}
|
||||
|
||||
debug!("Signaling candidate");
|
||||
memory.set(Event::CandidateReady as usize, EventState::Signaled)?;
|
||||
|
||||
debug!("Waiting for results");
|
||||
match memory.wait(Event::ResultReady as usize, shared_memory::Timeout::Sec(5)) {
|
||||
Err(e) => {
|
||||
debug!("Worker timeout: {:?}", e);
|
||||
if let Some(mut worker) = self.worker.take() {
|
||||
worker.kill().ok();
|
||||
}
|
||||
return Err(Error::Timeout.into());
|
||||
}
|
||||
Ok(()) => {}
|
||||
}
|
||||
|
||||
{
|
||||
let data: &[u8] = &**memory.wlock_as_slice(0)?;
|
||||
let (header_buf, rest) = data.split_at(1024);
|
||||
let (_, rest) = rest.split_at(MAX_CODE_MEM);
|
||||
let (_, message_data) = rest.split_at(MAX_RUNTIME_MEM);
|
||||
let mut header_buf: &[u8] = header_buf;
|
||||
let mut message_data: &[u8] = message_data;
|
||||
let header = ValidationResultHeader::decode(&mut header_buf).unwrap();
|
||||
match header {
|
||||
ValidationResultHeader::Ok { result, egress_message_count, up_message_count } => {
|
||||
for _ in 0 .. egress_message_count {
|
||||
let message = IncomingMessage::decode(&mut message_data).unwrap();
|
||||
let message_ref = MessageRef {
|
||||
target: message.source,
|
||||
data: &message.data,
|
||||
};
|
||||
externalities.post_message(message_ref)?;
|
||||
}
|
||||
for _ in 0 .. up_message_count {
|
||||
let message = UpwardMessage::decode(&mut message_data).unwrap();
|
||||
let message_ref = UpwardMessageRef {
|
||||
origin: message.origin,
|
||||
data: &message.data,
|
||||
};
|
||||
externalities.post_upward_message(message_ref)?;
|
||||
}
|
||||
Ok(result)
|
||||
}
|
||||
ValidationResultHeader::Error(message) => {
|
||||
Err(Error::External(message).into())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Validate a candidate under the given validation code.
|
||||
///
|
||||
/// This will fail if the validation code is not a proper parachain validation module.
|
||||
@@ -245,11 +583,29 @@ pub fn validate_candidate<E: Externalities>(
|
||||
validation_code: &[u8],
|
||||
params: ValidationParams,
|
||||
externalities: &mut E,
|
||||
) -> Result<ValidationResult, Error> {
|
||||
use wasmi::LINEAR_MEMORY_PAGE_SIZE;
|
||||
options: ExecutionMode,
|
||||
) -> Result<ValidationResult, Error>
|
||||
{
|
||||
match options {
|
||||
ExecutionMode::Local => {
|
||||
validate_candidate_internal(validation_code, ¶ms.encode(), externalities)
|
||||
},
|
||||
ExecutionMode::Remote =>
|
||||
HOST.lock().validate_candidate(validation_code, params, externalities, false),
|
||||
ExecutionMode::RemoteTest =>
|
||||
HOST.lock().validate_candidate(validation_code, params, externalities, true),
|
||||
}
|
||||
}
|
||||
|
||||
// maximum memory in bytes
|
||||
const MAX_MEM: u32 = 1024 * 1024 * 1024; // 1 GiB
|
||||
/// Validate a candidate under the given validation code.
|
||||
///
|
||||
/// This will fail if the validation code is not a proper parachain validation module.
|
||||
pub fn validate_candidate_internal<E: Externalities>(
|
||||
validation_code: &[u8],
|
||||
encoded_call_data: &[u8],
|
||||
externalities: &mut E,
|
||||
) -> Result<ValidationResult, Error> {
|
||||
use wasmi::LINEAR_MEMORY_PAGE_SIZE;
|
||||
|
||||
// instantiate the module.
|
||||
let memory;
|
||||
@@ -258,7 +614,7 @@ pub fn validate_candidate<E: Externalities>(
|
||||
let module = Module::from_buffer(validation_code)?;
|
||||
|
||||
let module_resolver = Resolver {
|
||||
max_memory: MAX_MEM / LINEAR_MEMORY_PAGE_SIZE.0 as u32,
|
||||
max_memory: (MAX_RUNTIME_MEM / LINEAR_MEMORY_PAGE_SIZE.0) as u32,
|
||||
memory: RefCell::new(None),
|
||||
};
|
||||
|
||||
@@ -285,8 +641,6 @@ pub fn validate_candidate<E: Externalities>(
|
||||
// - `offset` has alignment at least of 8,
|
||||
// - `len` is not zero.
|
||||
let (offset, len) = {
|
||||
let encoded_call_data = params.encode();
|
||||
|
||||
// hard limit from WASM.
|
||||
if encoded_call_data.len() > i32::max_value() as usize {
|
||||
return Err(Error::ParamsTooLarge(encoded_call_data.len()));
|
||||
|
||||
Reference in New Issue
Block a user