mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-05-30 11:41:02 +00:00
Remote execution with additional logging (#767)
This commit is contained in:
committed by
Bastian Köcher
parent
9ec65c94d7
commit
95afc7c2fd
@@ -36,6 +36,10 @@ const WORKER_ARGS: &[&'static str] = &[WORKER_ARG];
|
|||||||
const NUM_HOSTS: usize = 8;
|
const NUM_HOSTS: usize = 8;
|
||||||
|
|
||||||
/// Execution timeout in seconds;
|
/// Execution timeout in seconds;
|
||||||
|
#[cfg(debug_assertions)]
|
||||||
|
pub const EXECUTION_TIMEOUT_SEC: u64 = 30;
|
||||||
|
|
||||||
|
#[cfg(not(debug_assertions))]
|
||||||
pub const EXECUTION_TIMEOUT_SEC: u64 = 5;
|
pub const EXECUTION_TIMEOUT_SEC: u64 = 5;
|
||||||
|
|
||||||
#[derive(Default)]
|
#[derive(Default)]
|
||||||
@@ -85,7 +89,7 @@ pub fn run_worker(mem_id: &str) -> Result<(), String> {
|
|||||||
let mut memory = match SharedMem::open(mem_id) {
|
let mut memory = match SharedMem::open(mem_id) {
|
||||||
Ok(memory) => memory,
|
Ok(memory) => memory,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
debug!("Error opening shared memory: {:?}", e);
|
debug!("{} Error opening shared memory: {:?}", process::id(), e);
|
||||||
return Err(format!("Error opening shared memory: {:?}", e));
|
return Err(format!("Error opening shared memory: {:?}", e));
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
@@ -98,32 +102,32 @@ pub fn run_worker(mem_id: &str) -> Result<(), String> {
|
|||||||
std::thread::spawn(move || {
|
std::thread::spawn(move || {
|
||||||
use std::io::Read;
|
use std::io::Read;
|
||||||
let mut in_data = Vec::new();
|
let mut in_data = Vec::new();
|
||||||
// pipe terminates when parent process exits
|
// pipe terminates when parent process exits
|
||||||
std::io::stdin().read_to_end(&mut in_data).ok();
|
std::io::stdin().read_to_end(&mut in_data).ok();
|
||||||
debug!("Parent process is dead. Exiting");
|
debug!("{} Parent process is dead. Exiting", process::id());
|
||||||
exit.store(true, atomic::Ordering::Relaxed);
|
exit.store(true, atomic::Ordering::Relaxed);
|
||||||
});
|
});
|
||||||
|
|
||||||
memory.set(Event::WorkerReady as usize, EventState::Signaled)
|
memory.set(Event::WorkerReady as usize, EventState::Signaled)
|
||||||
.map_err(|e| format!("Error setting shared event: {:?}", e))?;
|
.map_err(|e| format!("{} Error setting shared event: {:?}", process::id(), e))?;
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
if watch_exit.load(atomic::Ordering::Relaxed) {
|
if watch_exit.load(atomic::Ordering::Relaxed) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
debug!("Waiting for candidate");
|
debug!("{} Waiting for candidate", process::id());
|
||||||
match memory.wait(Event::CandidateReady as usize, shared_memory::Timeout::Sec(1)) {
|
match memory.wait(Event::CandidateReady as usize, shared_memory::Timeout::Sec(3)) {
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
// Timeout
|
// Timeout
|
||||||
trace!("Timeout waiting for candidate: {:?}", e);
|
trace!("{} Timeout waiting for candidate: {:?}", process::id(), e);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
Ok(()) => {}
|
Ok(()) => {}
|
||||||
}
|
}
|
||||||
|
|
||||||
{
|
{
|
||||||
debug!("Processing candidate");
|
debug!("{} Processing candidate", process::id());
|
||||||
// we have candidate data
|
// we have candidate data
|
||||||
let mut slice = memory.wlock_as_slice(0)
|
let mut slice = memory.wlock_as_slice(0)
|
||||||
.map_err(|e| format!("Error locking shared memory: {:?}", e))?;
|
.map_err(|e| format!("Error locking shared memory: {:?}", e))?;
|
||||||
@@ -134,7 +138,7 @@ pub fn run_worker(mem_id: &str) -> Result<(), String> {
|
|||||||
let mut header_buf: &[u8] = header_buf;
|
let mut header_buf: &[u8] = header_buf;
|
||||||
let header = ValidationHeader::decode(&mut header_buf)
|
let header = ValidationHeader::decode(&mut header_buf)
|
||||||
.map_err(|_| format!("Error decoding validation request."))?;
|
.map_err(|_| format!("Error decoding validation request."))?;
|
||||||
debug!("Candidate header: {:?}", header);
|
debug!("{} Candidate header: {:?}", process::id(), header);
|
||||||
let (code, rest) = rest.split_at_mut(MAX_CODE_MEM);
|
let (code, rest) = rest.split_at_mut(MAX_CODE_MEM);
|
||||||
let (code, _) = code.split_at_mut(header.code_size as usize);
|
let (code, _) = code.split_at_mut(header.code_size as usize);
|
||||||
let (call_data, rest) = rest.split_at_mut(MAX_RUNTIME_MEM);
|
let (call_data, rest) = rest.split_at_mut(MAX_RUNTIME_MEM);
|
||||||
@@ -142,7 +146,7 @@ pub fn run_worker(mem_id: &str) -> Result<(), String> {
|
|||||||
let message_data = rest;
|
let message_data = rest;
|
||||||
|
|
||||||
let result = validate_candidate_internal(code, call_data, worker_ext.clone());
|
let result = validate_candidate_internal(code, call_data, worker_ext.clone());
|
||||||
debug!("Candidate validated: {:?}", result);
|
debug!("{} Candidate validated: {:?}", process::id(), result);
|
||||||
|
|
||||||
match result {
|
match result {
|
||||||
Ok(r) => {
|
Ok(r) => {
|
||||||
@@ -168,7 +172,7 @@ pub fn run_worker(mem_id: &str) -> Result<(), String> {
|
|||||||
let mut data: &mut[u8] = &mut **slice;
|
let mut data: &mut[u8] = &mut **slice;
|
||||||
result.encode_to(&mut data);
|
result.encode_to(&mut data);
|
||||||
}
|
}
|
||||||
debug!("Signaling result");
|
debug!("{} Signaling result", process::id());
|
||||||
memory.set(Event::ResultReady as usize, EventState::Signaled)
|
memory.set(Event::ResultReady as usize, EventState::Signaled)
|
||||||
.map_err(|e| format!("Error setting shared event: {:?}", e))?;
|
.map_err(|e| format!("Error setting shared event: {:?}", e))?;
|
||||||
}
|
}
|
||||||
@@ -194,6 +198,7 @@ unsafe impl Send for ValidationHost {}
|
|||||||
struct ValidationHost {
|
struct ValidationHost {
|
||||||
worker: Option<process::Child>,
|
worker: Option<process::Child>,
|
||||||
memory: Option<SharedMem>,
|
memory: Option<SharedMem>,
|
||||||
|
id: u32,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Validate a candidate under the given validation code.
|
/// Validate a candidate under the given validation code.
|
||||||
@@ -253,6 +258,7 @@ impl ValidationHost {
|
|||||||
.args(args)
|
.args(args)
|
||||||
.stdin(process::Stdio::piped())
|
.stdin(process::Stdio::piped())
|
||||||
.spawn()?;
|
.spawn()?;
|
||||||
|
self.id = worker.id();
|
||||||
self.worker = Some(worker);
|
self.worker = Some(worker);
|
||||||
|
|
||||||
memory.wait(
|
memory.wait(
|
||||||
@@ -302,11 +308,11 @@ impl ValidationHost {
|
|||||||
header.encode_to(&mut header_buf);
|
header.encode_to(&mut header_buf);
|
||||||
}
|
}
|
||||||
|
|
||||||
debug!("Signaling candidate");
|
debug!("{} Signaling candidate", self.id);
|
||||||
memory.set(Event::CandidateReady as usize, EventState::Signaled)?;
|
memory.set(Event::CandidateReady as usize, EventState::Signaled)?;
|
||||||
|
|
||||||
debug!("Waiting for results");
|
debug!("{} Waiting for results", self.id);
|
||||||
match memory.wait(Event::ResultReady as usize, shared_memory::Timeout::Sec(5)) {
|
match memory.wait(Event::ResultReady as usize, shared_memory::Timeout::Sec(EXECUTION_TIMEOUT_SEC as usize)) {
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
debug!("Worker timeout: {:?}", e);
|
debug!("Worker timeout: {:?}", e);
|
||||||
if let Some(mut worker) = self.worker.take() {
|
if let Some(mut worker) = self.worker.take() {
|
||||||
@@ -318,6 +324,7 @@ impl ValidationHost {
|
|||||||
}
|
}
|
||||||
|
|
||||||
{
|
{
|
||||||
|
debug!("{} Reading results", self.id);
|
||||||
let data: &[u8] = &**memory.wlock_as_slice(0)?;
|
let data: &[u8] = &**memory.wlock_as_slice(0)?;
|
||||||
let (header_buf, rest) = data.split_at(1024);
|
let (header_buf, rest) = data.split_at(1024);
|
||||||
let (_, rest) = rest.split_at(MAX_CODE_MEM);
|
let (_, rest) = rest.split_at(MAX_CODE_MEM);
|
||||||
@@ -346,6 +353,7 @@ impl ValidationHost {
|
|||||||
Ok(result)
|
Ok(result)
|
||||||
}
|
}
|
||||||
ValidationResultHeader::Error(message) => {
|
ValidationResultHeader::Error(message) => {
|
||||||
|
debug!("{} Validation error: {}", self.id, message);
|
||||||
Err(Error::External(message).into())
|
Err(Error::External(message).into())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -461,7 +461,7 @@ fn do_validation<P>(
|
|||||||
&validation_code,
|
&validation_code,
|
||||||
params,
|
params,
|
||||||
ext.clone(),
|
ext.clone(),
|
||||||
ExecutionMode::Local,
|
ExecutionMode::Remote,
|
||||||
) {
|
) {
|
||||||
Ok(result) => {
|
Ok(result) => {
|
||||||
if result.head_data == head_data.0 {
|
if result.head_data == head_data.0 {
|
||||||
|
|||||||
Reference in New Issue
Block a user