From 95afc7c2fd200e863f1c706ea7595c0ab0985fa4 Mon Sep 17 00:00:00 2001 From: Arkadiy Paronyan Date: Wed, 15 Jan 2020 16:18:59 +0100 Subject: [PATCH] Remote execution with additional logging (#767) --- .../src/wasm_executor/validation_host.rs | 36 +++++++++++-------- polkadot/validation/src/collation.rs | 2 +- 2 files changed, 23 insertions(+), 15 deletions(-) diff --git a/polkadot/parachain/src/wasm_executor/validation_host.rs b/polkadot/parachain/src/wasm_executor/validation_host.rs index a92a00f561..711d223b41 100644 --- a/polkadot/parachain/src/wasm_executor/validation_host.rs +++ b/polkadot/parachain/src/wasm_executor/validation_host.rs @@ -36,6 +36,10 @@ const WORKER_ARGS: &[&'static str] = &[WORKER_ARG]; const NUM_HOSTS: usize = 8; /// Execution timeout in seconds; +#[cfg(debug_assertions)] +pub const EXECUTION_TIMEOUT_SEC: u64 = 30; + +#[cfg(not(debug_assertions))] pub const EXECUTION_TIMEOUT_SEC: u64 = 5; #[derive(Default)] @@ -85,7 +89,7 @@ pub fn run_worker(mem_id: &str) -> Result<(), String> { let mut memory = match SharedMem::open(mem_id) { Ok(memory) => memory, Err(e) => { - debug!("Error opening shared memory: {:?}", e); + debug!("{} Error opening shared memory: {:?}", process::id(), e); return Err(format!("Error opening shared memory: {:?}", e)); } }; @@ -98,32 +102,32 @@ pub fn run_worker(mem_id: &str) -> Result<(), String> { std::thread::spawn(move || { use std::io::Read; let mut in_data = Vec::new(); - // pipe terminates when parent process exits + // pipe terminates when parent process exits std::io::stdin().read_to_end(&mut in_data).ok(); - debug!("Parent process is dead. Exiting"); + debug!("{} Parent process is dead. Exiting", process::id()); exit.store(true, atomic::Ordering::Relaxed); }); memory.set(Event::WorkerReady as usize, EventState::Signaled) - .map_err(|e| format!("Error setting shared event: {:?}", e))?; + .map_err(|e| format!("{} Error setting shared event: {:?}", process::id(), e))?; loop { if watch_exit.load(atomic::Ordering::Relaxed) { break; } - debug!("Waiting for candidate"); - match memory.wait(Event::CandidateReady as usize, shared_memory::Timeout::Sec(1)) { + debug!("{} Waiting for candidate", process::id()); + match memory.wait(Event::CandidateReady as usize, shared_memory::Timeout::Sec(3)) { Err(e) => { // Timeout - trace!("Timeout waiting for candidate: {:?}", e); + trace!("{} Timeout waiting for candidate: {:?}", process::id(), e); continue; } Ok(()) => {} } { - debug!("Processing candidate"); + debug!("{} Processing candidate", process::id()); // we have candidate data let mut slice = memory.wlock_as_slice(0) .map_err(|e| format!("Error locking shared memory: {:?}", e))?; @@ -134,7 +138,7 @@ pub fn run_worker(mem_id: &str) -> Result<(), String> { let mut header_buf: &[u8] = header_buf; let header = ValidationHeader::decode(&mut header_buf) .map_err(|_| format!("Error decoding validation request."))?; - debug!("Candidate header: {:?}", header); + debug!("{} Candidate header: {:?}", process::id(), header); let (code, rest) = rest.split_at_mut(MAX_CODE_MEM); let (code, _) = code.split_at_mut(header.code_size as usize); let (call_data, rest) = rest.split_at_mut(MAX_RUNTIME_MEM); @@ -142,7 +146,7 @@ pub fn run_worker(mem_id: &str) -> Result<(), String> { let message_data = rest; let result = validate_candidate_internal(code, call_data, worker_ext.clone()); - debug!("Candidate validated: {:?}", result); + debug!("{} Candidate validated: {:?}", process::id(), result); match result { Ok(r) => { @@ -168,7 +172,7 @@ pub fn run_worker(mem_id: &str) -> Result<(), String> { let mut data: &mut[u8] = &mut **slice; result.encode_to(&mut data); } - debug!("Signaling result"); + debug!("{} Signaling result", process::id()); memory.set(Event::ResultReady as usize, EventState::Signaled) .map_err(|e| format!("Error setting shared event: {:?}", e))?; } @@ -194,6 +198,7 @@ unsafe impl Send for ValidationHost {} struct ValidationHost { worker: Option, memory: Option, + id: u32, } /// Validate a candidate under the given validation code. @@ -253,6 +258,7 @@ impl ValidationHost { .args(args) .stdin(process::Stdio::piped()) .spawn()?; + self.id = worker.id(); self.worker = Some(worker); memory.wait( @@ -302,11 +308,11 @@ impl ValidationHost { header.encode_to(&mut header_buf); } - debug!("Signaling candidate"); + debug!("{} Signaling candidate", self.id); memory.set(Event::CandidateReady as usize, EventState::Signaled)?; - debug!("Waiting for results"); - match memory.wait(Event::ResultReady as usize, shared_memory::Timeout::Sec(5)) { + debug!("{} Waiting for results", self.id); + match memory.wait(Event::ResultReady as usize, shared_memory::Timeout::Sec(EXECUTION_TIMEOUT_SEC as usize)) { Err(e) => { debug!("Worker timeout: {:?}", e); if let Some(mut worker) = self.worker.take() { @@ -318,6 +324,7 @@ impl ValidationHost { } { + debug!("{} Reading results", self.id); let data: &[u8] = &**memory.wlock_as_slice(0)?; let (header_buf, rest) = data.split_at(1024); let (_, rest) = rest.split_at(MAX_CODE_MEM); @@ -346,6 +353,7 @@ impl ValidationHost { Ok(result) } ValidationResultHeader::Error(message) => { + debug!("{} Validation error: {}", self.id, message); Err(Error::External(message).into()) } } diff --git a/polkadot/validation/src/collation.rs b/polkadot/validation/src/collation.rs index 8b8e45cc1f..6adc87db71 100644 --- a/polkadot/validation/src/collation.rs +++ b/polkadot/validation/src/collation.rs @@ -461,7 +461,7 @@ fn do_validation

( &validation_code, params, ext.clone(), - ExecutionMode::Local, + ExecutionMode::Remote, ) { Ok(result) => { if result.head_data == head_data.0 {