mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-15 08:01:09 +00:00
PVF: Remove rayon and some uses of tokio (#7153)
This commit is contained in:
@@ -18,15 +18,12 @@ use crate::LOG_TARGET;
|
|||||||
use cpu_time::ProcessTime;
|
use cpu_time::ProcessTime;
|
||||||
use futures::never::Never;
|
use futures::never::Never;
|
||||||
use std::{
|
use std::{
|
||||||
|
any::Any,
|
||||||
path::PathBuf,
|
path::PathBuf,
|
||||||
sync::mpsc::{Receiver, RecvTimeoutError},
|
sync::mpsc::{Receiver, RecvTimeoutError},
|
||||||
time::Duration,
|
time::Duration,
|
||||||
};
|
};
|
||||||
use tokio::{
|
use tokio::{io, net::UnixStream, runtime::Runtime};
|
||||||
io,
|
|
||||||
net::UnixStream,
|
|
||||||
runtime::{Handle, Runtime},
|
|
||||||
};
|
|
||||||
|
|
||||||
/// Some allowed overhead that we account for in the "CPU time monitor" thread's sleeps, on the
|
/// Some allowed overhead that we account for in the "CPU time monitor" thread's sleeps, on the
|
||||||
/// child process.
|
/// child process.
|
||||||
@@ -44,7 +41,7 @@ pub fn worker_event_loop<F, Fut>(
|
|||||||
node_version: Option<&str>,
|
node_version: Option<&str>,
|
||||||
mut event_loop: F,
|
mut event_loop: F,
|
||||||
) where
|
) where
|
||||||
F: FnMut(Handle, UnixStream) -> Fut,
|
F: FnMut(UnixStream) -> Fut,
|
||||||
Fut: futures::Future<Output = io::Result<Never>>,
|
Fut: futures::Future<Output = io::Result<Never>>,
|
||||||
{
|
{
|
||||||
let worker_pid = std::process::id();
|
let worker_pid = std::process::id();
|
||||||
@@ -68,13 +65,12 @@ pub fn worker_event_loop<F, Fut>(
|
|||||||
|
|
||||||
// Run the main worker loop.
|
// Run the main worker loop.
|
||||||
let rt = Runtime::new().expect("Creates tokio runtime. If this panics the worker will die and the host will detect that and deal with it.");
|
let rt = Runtime::new().expect("Creates tokio runtime. If this panics the worker will die and the host will detect that and deal with it.");
|
||||||
let handle = rt.handle();
|
|
||||||
let err = rt
|
let err = rt
|
||||||
.block_on(async move {
|
.block_on(async move {
|
||||||
let stream = UnixStream::connect(socket_path).await?;
|
let stream = UnixStream::connect(socket_path).await?;
|
||||||
let _ = tokio::fs::remove_file(socket_path).await;
|
let _ = tokio::fs::remove_file(socket_path).await;
|
||||||
|
|
||||||
let result = event_loop(handle.clone(), stream).await;
|
let result = event_loop(stream).await;
|
||||||
|
|
||||||
result
|
result
|
||||||
})
|
})
|
||||||
@@ -108,8 +104,10 @@ pub fn cpu_time_monitor_loop(
|
|||||||
|
|
||||||
// Treat the timeout as CPU time, which is less subject to variance due to load.
|
// Treat the timeout as CPU time, which is less subject to variance due to load.
|
||||||
if cpu_time_elapsed <= timeout {
|
if cpu_time_elapsed <= timeout {
|
||||||
// Sleep for the remaining CPU time, plus a bit to account for overhead. Note that the sleep
|
// Sleep for the remaining CPU time, plus a bit to account for overhead. (And we don't
|
||||||
// is wall clock time. The CPU clock may be slower than the wall clock.
|
// want to wake up too often -- so, since we just want to halt the worker thread if it
|
||||||
|
// stalled, we can sleep longer than necessary.) Note that the sleep is wall clock time.
|
||||||
|
// The CPU clock may be slower than the wall clock.
|
||||||
let sleep_interval = timeout.saturating_sub(cpu_time_elapsed) + JOB_TIMEOUT_OVERHEAD;
|
let sleep_interval = timeout.saturating_sub(cpu_time_elapsed) + JOB_TIMEOUT_OVERHEAD;
|
||||||
match finished_rx.recv_timeout(sleep_interval) {
|
match finished_rx.recv_timeout(sleep_interval) {
|
||||||
// Received finish signal.
|
// Received finish signal.
|
||||||
@@ -124,6 +122,20 @@ pub fn cpu_time_monitor_loop(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Attempt to convert an opaque panic payload to a string.
|
||||||
|
///
|
||||||
|
/// This is a best effort, and is not guaranteed to provide the most accurate value.
|
||||||
|
pub fn stringify_panic_payload(payload: Box<dyn Any + Send + 'static>) -> String {
|
||||||
|
match payload.downcast::<&'static str>() {
|
||||||
|
Ok(msg) => msg.to_string(),
|
||||||
|
Err(payload) => match payload.downcast::<String>() {
|
||||||
|
Ok(msg) => *msg,
|
||||||
|
// At least we tried...
|
||||||
|
Err(_) => "unknown panic payload".to_string(),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// In case of node and worker version mismatch (as a result of in-place upgrade), send `SIGTERM`
|
/// In case of node and worker version mismatch (as a result of in-place upgrade), send `SIGTERM`
|
||||||
/// to the node to tear it down and prevent it from raising disputes on valid candidates. Node
|
/// to the node to tear it down and prevent it from raising disputes on valid candidates. Node
|
||||||
/// restart should be handled by the node owner. As node exits, unix sockets opened to workers
|
/// restart should be handled by the node owner. As node exits, unix sockets opened to workers
|
||||||
@@ -140,3 +152,123 @@ fn kill_parent_node_in_emergency() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Functionality related to threads spawned by the workers.
|
||||||
|
///
|
||||||
|
/// The motivation for this module is to coordinate worker threads without using async Rust.
|
||||||
|
pub mod thread {
|
||||||
|
use std::{
|
||||||
|
panic,
|
||||||
|
sync::{Arc, Condvar, Mutex},
|
||||||
|
thread,
|
||||||
|
time::Duration,
|
||||||
|
};
|
||||||
|
|
||||||
|
/// Contains the outcome of waiting on threads, or `Pending` if none are ready.
|
||||||
|
#[derive(Clone, Copy)]
|
||||||
|
pub enum WaitOutcome {
|
||||||
|
Finished,
|
||||||
|
TimedOut,
|
||||||
|
Pending,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl WaitOutcome {
|
||||||
|
pub fn is_pending(&self) -> bool {
|
||||||
|
matches!(self, Self::Pending)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Helper type.
|
||||||
|
pub type Cond = Arc<(Mutex<WaitOutcome>, Condvar)>;
|
||||||
|
|
||||||
|
/// Gets a condvar initialized to `Pending`.
|
||||||
|
pub fn get_condvar() -> Cond {
|
||||||
|
Arc::new((Mutex::new(WaitOutcome::Pending), Condvar::new()))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Runs a thread, afterwards notifying the threads waiting on the condvar. Catches panics and
|
||||||
|
/// resumes them after triggering the condvar, so that the waiting thread is notified on panics.
|
||||||
|
pub fn spawn_worker_thread<F, R>(
|
||||||
|
name: &str,
|
||||||
|
f: F,
|
||||||
|
cond: Cond,
|
||||||
|
outcome: WaitOutcome,
|
||||||
|
) -> std::io::Result<thread::JoinHandle<R>>
|
||||||
|
where
|
||||||
|
F: FnOnce() -> R,
|
||||||
|
F: Send + 'static + panic::UnwindSafe,
|
||||||
|
R: Send + 'static,
|
||||||
|
{
|
||||||
|
thread::Builder::new()
|
||||||
|
.name(name.into())
|
||||||
|
.spawn(move || cond_notify_on_done(f, cond, outcome))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Runs a worker thread with the given stack size. See [`spawn_worker_thread`].
|
||||||
|
pub fn spawn_worker_thread_with_stack_size<F, R>(
|
||||||
|
name: &str,
|
||||||
|
f: F,
|
||||||
|
cond: Cond,
|
||||||
|
outcome: WaitOutcome,
|
||||||
|
stack_size: usize,
|
||||||
|
) -> std::io::Result<thread::JoinHandle<R>>
|
||||||
|
where
|
||||||
|
F: FnOnce() -> R,
|
||||||
|
F: Send + 'static + panic::UnwindSafe,
|
||||||
|
R: Send + 'static,
|
||||||
|
{
|
||||||
|
thread::Builder::new()
|
||||||
|
.name(name.into())
|
||||||
|
.stack_size(stack_size)
|
||||||
|
.spawn(move || cond_notify_on_done(f, cond, outcome))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Runs a function, afterwards notifying the threads waiting on the condvar. Catches panics and
|
||||||
|
/// resumes them after triggering the condvar, so that the waiting thread is notified on panics.
|
||||||
|
fn cond_notify_on_done<F, R>(f: F, cond: Cond, outcome: WaitOutcome) -> R
|
||||||
|
where
|
||||||
|
F: FnOnce() -> R,
|
||||||
|
F: panic::UnwindSafe,
|
||||||
|
{
|
||||||
|
let result = panic::catch_unwind(|| f());
|
||||||
|
cond_notify_all(cond, outcome);
|
||||||
|
match result {
|
||||||
|
Ok(inner) => return inner,
|
||||||
|
Err(err) => panic::resume_unwind(err),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Helper function to notify all threads waiting on this condvar.
|
||||||
|
fn cond_notify_all(cond: Cond, outcome: WaitOutcome) {
|
||||||
|
let (lock, cvar) = &*cond;
|
||||||
|
let mut flag = lock.lock().unwrap();
|
||||||
|
if !flag.is_pending() {
|
||||||
|
// Someone else already triggered the condvar.
|
||||||
|
return
|
||||||
|
}
|
||||||
|
*flag = outcome;
|
||||||
|
cvar.notify_all();
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Block the thread while it waits on the condvar.
|
||||||
|
pub fn wait_for_threads(cond: Cond) -> WaitOutcome {
|
||||||
|
let (lock, cvar) = &*cond;
|
||||||
|
let guard = cvar.wait_while(lock.lock().unwrap(), |flag| flag.is_pending()).unwrap();
|
||||||
|
*guard
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Block the thread while it waits on the condvar or on a timeout. If the timeout is hit,
|
||||||
|
/// returns `None`.
|
||||||
|
#[cfg_attr(not(any(target_os = "linux", feature = "jemalloc-allocator")), allow(dead_code))]
|
||||||
|
pub fn wait_for_threads_with_timeout(cond: &Cond, dur: Duration) -> Option<WaitOutcome> {
|
||||||
|
let (lock, cvar) = &**cond;
|
||||||
|
let result = cvar
|
||||||
|
.wait_timeout_while(lock.lock().unwrap(), dur, |flag| flag.is_pending())
|
||||||
|
.unwrap();
|
||||||
|
if result.1.timed_out() {
|
||||||
|
None
|
||||||
|
} else {
|
||||||
|
Some(*result.0)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -15,12 +15,15 @@
|
|||||||
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
|
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
common::{bytes_to_path, cpu_time_monitor_loop, worker_event_loop},
|
common::{
|
||||||
executor_intf::Executor,
|
bytes_to_path, cpu_time_monitor_loop, stringify_panic_payload,
|
||||||
|
thread::{self, WaitOutcome},
|
||||||
|
worker_event_loop,
|
||||||
|
},
|
||||||
|
executor_intf::{Executor, EXECUTE_THREAD_STACK_SIZE},
|
||||||
LOG_TARGET,
|
LOG_TARGET,
|
||||||
};
|
};
|
||||||
use cpu_time::ProcessTime;
|
use cpu_time::ProcessTime;
|
||||||
use futures::{pin_mut, select_biased, FutureExt};
|
|
||||||
use parity_scale_codec::{Decode, Encode};
|
use parity_scale_codec::{Decode, Encode};
|
||||||
use polkadot_node_core_pvf::{
|
use polkadot_node_core_pvf::{
|
||||||
framed_recv, framed_send, ExecuteHandshake as Handshake, ExecuteResponse as Response,
|
framed_recv, framed_send, ExecuteHandshake as Handshake, ExecuteResponse as Response,
|
||||||
@@ -67,18 +70,22 @@ async fn send_response(stream: &mut UnixStream, response: Response) -> io::Resul
|
|||||||
framed_send(stream, &response.encode()).await
|
framed_send(stream, &response.encode()).await
|
||||||
}
|
}
|
||||||
|
|
||||||
/// The entrypoint that the spawned execute worker should start with. The `socket_path` specifies
|
/// The entrypoint that the spawned execute worker should start with.
|
||||||
/// the path to the socket used to communicate with the host. The `node_version`, if `Some`,
|
///
|
||||||
/// is checked against the worker version. A mismatch results in immediate worker termination.
|
/// # Parameters
|
||||||
/// `None` is used for tests and in other situations when version check is not necessary.
|
///
|
||||||
|
/// The `socket_path` specifies the path to the socket used to communicate with the host. The
|
||||||
|
/// `node_version`, if `Some`, is checked against the worker version. A mismatch results in
|
||||||
|
/// immediate worker termination. `None` is used for tests and in other situations when version
|
||||||
|
/// check is not necessary.
|
||||||
pub fn worker_entrypoint(socket_path: &str, node_version: Option<&str>) {
|
pub fn worker_entrypoint(socket_path: &str, node_version: Option<&str>) {
|
||||||
worker_event_loop("execute", socket_path, node_version, |rt_handle, mut stream| async move {
|
worker_event_loop("execute", socket_path, node_version, |mut stream| async move {
|
||||||
let worker_pid = std::process::id();
|
let worker_pid = std::process::id();
|
||||||
|
|
||||||
let handshake = recv_handshake(&mut stream).await?;
|
let handshake = recv_handshake(&mut stream).await?;
|
||||||
let executor = Arc::new(Executor::new(handshake.executor_params).map_err(|e| {
|
let executor = Executor::new(handshake.executor_params).map_err(|e| {
|
||||||
io::Error::new(io::ErrorKind::Other, format!("cannot create executor: {}", e))
|
io::Error::new(io::ErrorKind::Other, format!("cannot create executor: {}", e))
|
||||||
})?);
|
})?;
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
let (artifact_path, params, execution_timeout) = recv_request(&mut stream).await?;
|
let (artifact_path, params, execution_timeout) = recv_request(&mut stream).await?;
|
||||||
@@ -89,31 +96,49 @@ pub fn worker_entrypoint(socket_path: &str, node_version: Option<&str>) {
|
|||||||
artifact_path.display(),
|
artifact_path.display(),
|
||||||
);
|
);
|
||||||
|
|
||||||
// Used to signal to the cpu time monitor thread that it can finish.
|
// Conditional variable to notify us when a thread is done.
|
||||||
let (finished_tx, finished_rx) = channel::<()>();
|
let condvar = thread::get_condvar();
|
||||||
|
|
||||||
let cpu_time_start = ProcessTime::now();
|
let cpu_time_start = ProcessTime::now();
|
||||||
|
|
||||||
// Spawn a new thread that runs the CPU time monitor.
|
// Spawn a new thread that runs the CPU time monitor.
|
||||||
let cpu_time_monitor_fut = rt_handle
|
let (cpu_time_monitor_tx, cpu_time_monitor_rx) = channel::<()>();
|
||||||
.spawn_blocking(move || {
|
let cpu_time_monitor_thread = thread::spawn_worker_thread(
|
||||||
cpu_time_monitor_loop(cpu_time_start, execution_timeout, finished_rx)
|
"cpu time monitor thread",
|
||||||
})
|
move || {
|
||||||
.fuse();
|
cpu_time_monitor_loop(cpu_time_start, execution_timeout, cpu_time_monitor_rx)
|
||||||
|
},
|
||||||
|
Arc::clone(&condvar),
|
||||||
|
WaitOutcome::TimedOut,
|
||||||
|
)?;
|
||||||
let executor_2 = executor.clone();
|
let executor_2 = executor.clone();
|
||||||
let execute_fut = rt_handle
|
let execute_thread = thread::spawn_worker_thread_with_stack_size(
|
||||||
.spawn_blocking(move || {
|
"execute thread",
|
||||||
|
move || {
|
||||||
validate_using_artifact(&artifact_path, ¶ms, executor_2, cpu_time_start)
|
validate_using_artifact(&artifact_path, ¶ms, executor_2, cpu_time_start)
|
||||||
})
|
},
|
||||||
.fuse();
|
Arc::clone(&condvar),
|
||||||
|
WaitOutcome::Finished,
|
||||||
|
EXECUTE_THREAD_STACK_SIZE,
|
||||||
|
)?;
|
||||||
|
|
||||||
pin_mut!(cpu_time_monitor_fut);
|
let outcome = thread::wait_for_threads(condvar);
|
||||||
pin_mut!(execute_fut);
|
|
||||||
|
|
||||||
let response = select_biased! {
|
let response = match outcome {
|
||||||
// If this future is not selected, the join handle is dropped and the thread will
|
WaitOutcome::Finished => {
|
||||||
// finish in the background.
|
let _ = cpu_time_monitor_tx.send(());
|
||||||
cpu_time_monitor_res = cpu_time_monitor_fut => {
|
execute_thread.join().unwrap_or_else(|e| {
|
||||||
match cpu_time_monitor_res {
|
// TODO: Use `Panic` error once that is implemented.
|
||||||
|
Response::format_internal(
|
||||||
|
"execute thread error",
|
||||||
|
&stringify_panic_payload(e),
|
||||||
|
)
|
||||||
|
})
|
||||||
|
},
|
||||||
|
// If the CPU thread is not selected, we signal it to end, the join handle is
|
||||||
|
// dropped and the thread will finish in the background.
|
||||||
|
WaitOutcome::TimedOut => {
|
||||||
|
match cpu_time_monitor_thread.join() {
|
||||||
Ok(Some(cpu_time_elapsed)) => {
|
Ok(Some(cpu_time_elapsed)) => {
|
||||||
// Log if we exceed the timeout and the other thread hasn't finished.
|
// Log if we exceed the timeout and the other thread hasn't finished.
|
||||||
gum::warn!(
|
gum::warn!(
|
||||||
@@ -125,14 +150,20 @@ pub fn worker_entrypoint(socket_path: &str, node_version: Option<&str>) {
|
|||||||
);
|
);
|
||||||
Response::TimedOut
|
Response::TimedOut
|
||||||
},
|
},
|
||||||
Ok(None) => Response::InternalError("error communicating over finished channel".into()),
|
Ok(None) => Response::format_internal(
|
||||||
Err(e) => Response::format_internal("cpu time monitor thread error", &e.to_string()),
|
"cpu time monitor thread error",
|
||||||
|
"error communicating over closed channel".into(),
|
||||||
|
),
|
||||||
|
// We can use an internal error here because errors in this thread are
|
||||||
|
// independent of the candidate.
|
||||||
|
Err(e) => Response::format_internal(
|
||||||
|
"cpu time monitor thread error",
|
||||||
|
&stringify_panic_payload(e),
|
||||||
|
),
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
execute_res = execute_fut => {
|
WaitOutcome::Pending =>
|
||||||
let _ = finished_tx.send(());
|
unreachable!("we run wait_while until the outcome is no longer pending; qed"),
|
||||||
execute_res.unwrap_or_else(|e| Response::format_internal("execute thread error", &e.to_string()))
|
|
||||||
},
|
|
||||||
};
|
};
|
||||||
|
|
||||||
send_response(&mut stream, response).await?;
|
send_response(&mut stream, response).await?;
|
||||||
@@ -143,7 +174,7 @@ pub fn worker_entrypoint(socket_path: &str, node_version: Option<&str>) {
|
|||||||
fn validate_using_artifact(
|
fn validate_using_artifact(
|
||||||
artifact_path: &Path,
|
artifact_path: &Path,
|
||||||
params: &[u8],
|
params: &[u8],
|
||||||
executor: Arc<Executor>,
|
executor: Executor,
|
||||||
cpu_time_start: ProcessTime,
|
cpu_time_start: ProcessTime,
|
||||||
) -> Response {
|
) -> Response {
|
||||||
// Check here if the file exists, because the error from Substrate is not match-able.
|
// Check here if the file exists, because the error from Substrate is not match-able.
|
||||||
@@ -163,13 +194,15 @@ fn validate_using_artifact(
|
|||||||
Ok(d) => d,
|
Ok(d) => d,
|
||||||
};
|
};
|
||||||
|
|
||||||
let duration = cpu_time_start.elapsed();
|
|
||||||
|
|
||||||
let result_descriptor = match ValidationResult::decode(&mut &descriptor_bytes[..]) {
|
let result_descriptor = match ValidationResult::decode(&mut &descriptor_bytes[..]) {
|
||||||
Err(err) =>
|
Err(err) =>
|
||||||
return Response::format_invalid("validation result decoding failed", &err.to_string()),
|
return Response::format_invalid("validation result decoding failed", &err.to_string()),
|
||||||
Ok(r) => r,
|
Ok(r) => r,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// Include the decoding in the measured time, to prevent any potential attacks exploiting some
|
||||||
|
// bug in decoding.
|
||||||
|
let duration = cpu_time_start.elapsed();
|
||||||
|
|
||||||
Response::Ok { result_descriptor, duration }
|
Response::Ok { result_descriptor, duration }
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -29,6 +29,42 @@ use std::{
|
|||||||
path::Path,
|
path::Path,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// Wasmtime powers the Substrate Executor. It compiles the wasm bytecode into native code.
|
||||||
|
// That native code does not create any stacks and just reuses the stack of the thread that
|
||||||
|
// wasmtime was invoked from.
|
||||||
|
//
|
||||||
|
// Also, we configure the executor to provide the deterministic stack and that requires
|
||||||
|
// supplying the amount of the native stack space that wasm is allowed to use. This is
|
||||||
|
// realized by supplying the limit into `wasmtime::Config::max_wasm_stack`.
|
||||||
|
//
|
||||||
|
// There are quirks to that configuration knob:
|
||||||
|
//
|
||||||
|
// 1. It only limits the amount of stack space consumed by wasm but does not ensure nor check
|
||||||
|
// that the stack space is actually available.
|
||||||
|
//
|
||||||
|
// That means, if the calling thread has 1 MiB of stack space left and the wasm code consumes
|
||||||
|
// more, then the wasmtime limit will **not** trigger. Instead, the wasm code will hit the
|
||||||
|
// guard page and the Rust stack overflow handler will be triggered. That leads to an
|
||||||
|
// **abort**.
|
||||||
|
//
|
||||||
|
// 2. It cannot and does not limit the stack space consumed by Rust code.
|
||||||
|
//
|
||||||
|
// Meaning that if the wasm code leaves no stack space for Rust code, then the Rust code
|
||||||
|
// will abort and that will abort the process as well.
|
||||||
|
//
|
||||||
|
// Typically on Linux the main thread gets the stack size specified by the `ulimit` and
|
||||||
|
// typically it's configured to 8 MiB. Rust's spawned threads are 2 MiB. OTOH, the
|
||||||
|
// NATIVE_STACK_MAX is set to 256 MiB. Not nearly enough.
|
||||||
|
//
|
||||||
|
// Hence we need to increase it. The simplest way to fix that is to spawn a thread with the desired
|
||||||
|
// stack limit.
|
||||||
|
//
|
||||||
|
// The reasoning why we pick this particular size is:
|
||||||
|
//
|
||||||
|
// The default Rust thread stack limit 2 MiB + 256 MiB wasm stack.
|
||||||
|
/// The stack size for the execute thread.
|
||||||
|
pub const EXECUTE_THREAD_STACK_SIZE: usize = 2 * 1024 * 1024 + NATIVE_STACK_MAX as usize;
|
||||||
|
|
||||||
// Memory configuration
|
// Memory configuration
|
||||||
//
|
//
|
||||||
// When Substrate Runtime is instantiated, a number of WASM pages are allocated for the Substrate
|
// When Substrate Runtime is instantiated, a number of WASM pages are allocated for the Substrate
|
||||||
@@ -142,60 +178,17 @@ fn params_to_wasmtime_semantics(par: &ExecutorParams) -> Result<Semantics, Strin
|
|||||||
Ok(sem)
|
Ok(sem)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
pub struct Executor {
|
pub struct Executor {
|
||||||
thread_pool: rayon::ThreadPool,
|
|
||||||
config: Config,
|
config: Config,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Executor {
|
impl Executor {
|
||||||
pub fn new(params: ExecutorParams) -> Result<Self, String> {
|
pub fn new(params: ExecutorParams) -> Result<Self, String> {
|
||||||
// Wasmtime powers the Substrate Executor. It compiles the wasm bytecode into native code.
|
|
||||||
// That native code does not create any stacks and just reuses the stack of the thread that
|
|
||||||
// wasmtime was invoked from.
|
|
||||||
//
|
|
||||||
// Also, we configure the executor to provide the deterministic stack and that requires
|
|
||||||
// supplying the amount of the native stack space that wasm is allowed to use. This is
|
|
||||||
// realized by supplying the limit into `wasmtime::Config::max_wasm_stack`.
|
|
||||||
//
|
|
||||||
// There are quirks to that configuration knob:
|
|
||||||
//
|
|
||||||
// 1. It only limits the amount of stack space consumed by wasm but does not ensure nor check
|
|
||||||
// that the stack space is actually available.
|
|
||||||
//
|
|
||||||
// That means, if the calling thread has 1 MiB of stack space left and the wasm code consumes
|
|
||||||
// more, then the wasmtime limit will **not** trigger. Instead, the wasm code will hit the
|
|
||||||
// guard page and the Rust stack overflow handler will be triggered. That leads to an
|
|
||||||
// **abort**.
|
|
||||||
//
|
|
||||||
// 2. It cannot and does not limit the stack space consumed by Rust code.
|
|
||||||
//
|
|
||||||
// Meaning that if the wasm code leaves no stack space for Rust code, then the Rust code
|
|
||||||
// will abort and that will abort the process as well.
|
|
||||||
//
|
|
||||||
// Typically on Linux the main thread gets the stack size specified by the `ulimit` and
|
|
||||||
// typically it's configured to 8 MiB. Rust's spawned threads are 2 MiB. OTOH, the
|
|
||||||
// NATIVE_STACK_MAX is set to 256 MiB. Not nearly enough.
|
|
||||||
//
|
|
||||||
// Hence we need to increase it.
|
|
||||||
//
|
|
||||||
// The simplest way to fix that is to spawn a thread with the desired stack limit. In order
|
|
||||||
// to avoid costs of creating a thread, we use a thread pool. The execution is
|
|
||||||
// single-threaded hence the thread pool has only one thread.
|
|
||||||
//
|
|
||||||
// The reasoning why we pick this particular size is:
|
|
||||||
//
|
|
||||||
// The default Rust thread stack limit 2 MiB + 256 MiB wasm stack.
|
|
||||||
let thread_stack_size = 2 * 1024 * 1024 + NATIVE_STACK_MAX as usize;
|
|
||||||
let thread_pool = rayon::ThreadPoolBuilder::new()
|
|
||||||
.num_threads(1)
|
|
||||||
.stack_size(thread_stack_size)
|
|
||||||
.build()
|
|
||||||
.map_err(|e| format!("Failed to create thread pool: {:?}", e))?;
|
|
||||||
|
|
||||||
let mut config = DEFAULT_CONFIG.clone();
|
let mut config = DEFAULT_CONFIG.clone();
|
||||||
config.semantics = params_to_wasmtime_semantics(¶ms)?;
|
config.semantics = params_to_wasmtime_semantics(¶ms)?;
|
||||||
|
|
||||||
Ok(Self { thread_pool, config })
|
Ok(Self { config })
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Executes the given PVF in the form of a compiled artifact and returns the result of execution
|
/// Executes the given PVF in the form of a compiled artifact and returns the result of execution
|
||||||
@@ -216,43 +209,26 @@ impl Executor {
|
|||||||
compiled_artifact_path: &Path,
|
compiled_artifact_path: &Path,
|
||||||
params: &[u8],
|
params: &[u8],
|
||||||
) -> Result<Vec<u8>, String> {
|
) -> Result<Vec<u8>, String> {
|
||||||
let mut result = None;
|
let mut extensions = sp_externalities::Extensions::new();
|
||||||
self.thread_pool.scope({
|
|
||||||
let result = &mut result;
|
extensions.register(sp_core::traits::ReadRuntimeVersionExt::new(ReadRuntimeVersion));
|
||||||
move |s| {
|
|
||||||
s.spawn(move |_| {
|
let mut ext = ValidationExternalities(extensions);
|
||||||
// spawn does not return a value, so we need to use a variable to pass the result.
|
|
||||||
*result = Some(
|
match sc_executor::with_externalities_safe(&mut ext, || {
|
||||||
do_execute(compiled_artifact_path, self.config.clone(), params)
|
let runtime = sc_executor_wasmtime::create_runtime_from_artifact::<HostFunctions>(
|
||||||
.map_err(|err| format!("execute error: {:?}", err)),
|
compiled_artifact_path,
|
||||||
);
|
self.config.clone(),
|
||||||
});
|
)?;
|
||||||
}
|
runtime.new_instance()?.call(InvokeMethod::Export("validate_block"), params)
|
||||||
});
|
}) {
|
||||||
result.unwrap_or_else(|| Err("rayon thread pool spawn failed".to_string()))
|
Ok(Ok(ok)) => Ok(ok),
|
||||||
|
Ok(Err(err)) | Err(err) => Err(err),
|
||||||
|
}
|
||||||
|
.map_err(|err| format!("execute error: {:?}", err))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
unsafe fn do_execute(
|
|
||||||
compiled_artifact_path: &Path,
|
|
||||||
config: Config,
|
|
||||||
params: &[u8],
|
|
||||||
) -> Result<Vec<u8>, sc_executor_common::error::Error> {
|
|
||||||
let mut extensions = sp_externalities::Extensions::new();
|
|
||||||
|
|
||||||
extensions.register(sp_core::traits::ReadRuntimeVersionExt::new(ReadRuntimeVersion));
|
|
||||||
|
|
||||||
let mut ext = ValidationExternalities(extensions);
|
|
||||||
|
|
||||||
sc_executor::with_externalities_safe(&mut ext, || {
|
|
||||||
let runtime = sc_executor_wasmtime::create_runtime_from_artifact::<HostFunctions>(
|
|
||||||
compiled_artifact_path,
|
|
||||||
config,
|
|
||||||
)?;
|
|
||||||
runtime.new_instance()?.call(InvokeMethod::Export("validate_block"), params)
|
|
||||||
})?
|
|
||||||
}
|
|
||||||
|
|
||||||
type HostFunctions = (
|
type HostFunctions = (
|
||||||
sp_io::misc::HostFunctions,
|
sp_io::misc::HostFunctions,
|
||||||
sp_io::crypto::HostFunctions,
|
sp_io::crypto::HostFunctions,
|
||||||
|
|||||||
@@ -33,14 +33,13 @@
|
|||||||
/// NOTE: Requires jemalloc enabled.
|
/// NOTE: Requires jemalloc enabled.
|
||||||
#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
|
#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
|
||||||
pub mod memory_tracker {
|
pub mod memory_tracker {
|
||||||
use crate::LOG_TARGET;
|
use crate::{
|
||||||
use polkadot_node_core_pvf::MemoryAllocationStats;
|
common::{stringify_panic_payload, thread},
|
||||||
use std::{
|
LOG_TARGET,
|
||||||
sync::mpsc::{Receiver, RecvTimeoutError, Sender},
|
|
||||||
time::Duration,
|
|
||||||
};
|
};
|
||||||
|
use polkadot_node_core_pvf::MemoryAllocationStats;
|
||||||
|
use std::{thread::JoinHandle, time::Duration};
|
||||||
use tikv_jemalloc_ctl::{epoch, stats, Error};
|
use tikv_jemalloc_ctl::{epoch, stats, Error};
|
||||||
use tokio::task::JoinHandle;
|
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
struct MemoryAllocationTracker {
|
struct MemoryAllocationTracker {
|
||||||
@@ -79,16 +78,16 @@ pub mod memory_tracker {
|
|||||||
/// 2. Sleep for some short interval. Whenever we wake up, take a snapshot by updating the
|
/// 2. Sleep for some short interval. Whenever we wake up, take a snapshot by updating the
|
||||||
/// allocation epoch.
|
/// allocation epoch.
|
||||||
///
|
///
|
||||||
/// 3. When we receive a signal that preparation has completed, take one last snapshot and return
|
/// 3. When we are notified that preparation has completed, take one last snapshot and return
|
||||||
/// the maximum observed values.
|
/// the maximum observed values.
|
||||||
///
|
///
|
||||||
/// # Errors
|
/// # Errors
|
||||||
///
|
///
|
||||||
/// For simplicity, any errors are returned as a string. As this is not a critical component, errors
|
/// For simplicity, any errors are returned as a string. As this is not a critical component, errors
|
||||||
/// are used for informational purposes (logging) only.
|
/// are used for informational purposes (logging) only.
|
||||||
pub fn memory_tracker_loop(finished_rx: Receiver<()>) -> Result<MemoryAllocationStats, String> {
|
pub fn memory_tracker_loop(condvar: thread::Cond) -> Result<MemoryAllocationStats, String> {
|
||||||
// This doesn't need to be too fine-grained since preparation currently takes 3-10s or more.
|
// NOTE: This doesn't need to be too fine-grained since preparation currently takes 3-10s or
|
||||||
// Apart from that, there is not really a science to this number.
|
// more. Apart from that, there is not really a science to this number.
|
||||||
const POLL_INTERVAL: Duration = Duration::from_millis(100);
|
const POLL_INTERVAL: Duration = Duration::from_millis(100);
|
||||||
|
|
||||||
let tracker = MemoryAllocationTracker::new().map_err(|err| err.to_string())?;
|
let tracker = MemoryAllocationTracker::new().map_err(|err| err.to_string())?;
|
||||||
@@ -109,58 +108,42 @@ pub mod memory_tracker {
|
|||||||
// Take a snapshot and update the max stats.
|
// Take a snapshot and update the max stats.
|
||||||
update_stats()?;
|
update_stats()?;
|
||||||
|
|
||||||
// Sleep.
|
// Sleep for the poll interval, or wake up if the condvar is triggered. Note that
|
||||||
match finished_rx.recv_timeout(POLL_INTERVAL) {
|
// `wait_timeout_while` is documented as not being very precise or reliable, which is
|
||||||
// Received finish signal.
|
// fine here -- see note above.
|
||||||
Ok(()) => {
|
match thread::wait_for_threads_with_timeout(&condvar, POLL_INTERVAL) {
|
||||||
|
Some(_outcome) => {
|
||||||
update_stats()?;
|
update_stats()?;
|
||||||
return Ok(max_stats)
|
return Ok(max_stats)
|
||||||
},
|
},
|
||||||
// Timed out, restart loop.
|
None => continue,
|
||||||
Err(RecvTimeoutError::Timeout) => continue,
|
|
||||||
Err(RecvTimeoutError::Disconnected) =>
|
|
||||||
return Err("memory_tracker_loop: finished_rx disconnected".into()),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Helper function to terminate the memory tracker thread and get the stats. Helps isolate all this
|
/// Helper function to get the stats from the memory tracker. Helps isolate this error handling.
|
||||||
/// error handling.
|
|
||||||
pub async fn get_memory_tracker_loop_stats(
|
pub async fn get_memory_tracker_loop_stats(
|
||||||
fut: JoinHandle<Result<MemoryAllocationStats, String>>,
|
thread: JoinHandle<Result<MemoryAllocationStats, String>>,
|
||||||
tx: Sender<()>,
|
|
||||||
worker_pid: u32,
|
worker_pid: u32,
|
||||||
) -> Option<MemoryAllocationStats> {
|
) -> Option<MemoryAllocationStats> {
|
||||||
// Signal to the memory tracker thread to terminate.
|
match thread.join() {
|
||||||
if let Err(err) = tx.send(()) {
|
Ok(Ok(stats)) => Some(stats),
|
||||||
gum::warn!(
|
Ok(Err(err)) => {
|
||||||
target: LOG_TARGET,
|
gum::warn!(
|
||||||
%worker_pid,
|
target: LOG_TARGET,
|
||||||
"worker: error sending signal to memory tracker_thread: {}",
|
%worker_pid,
|
||||||
err
|
"worker: error occurred in the memory tracker thread: {}", err
|
||||||
);
|
);
|
||||||
None
|
None
|
||||||
} else {
|
},
|
||||||
// Join on the thread handle.
|
Err(err) => {
|
||||||
match fut.await {
|
gum::warn!(
|
||||||
Ok(Ok(stats)) => Some(stats),
|
target: LOG_TARGET,
|
||||||
Ok(Err(err)) => {
|
%worker_pid,
|
||||||
gum::warn!(
|
"worker: error joining on memory tracker thread: {}", stringify_panic_payload(err)
|
||||||
target: LOG_TARGET,
|
);
|
||||||
%worker_pid,
|
None
|
||||||
"worker: error occurred in the memory tracker thread: {}", err
|
},
|
||||||
);
|
|
||||||
None
|
|
||||||
},
|
|
||||||
Err(err) => {
|
|
||||||
gum::warn!(
|
|
||||||
target: LOG_TARGET,
|
|
||||||
%worker_pid,
|
|
||||||
"worker: error joining on memory tracker thread: {}", err
|
|
||||||
);
|
|
||||||
None
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -19,17 +19,24 @@ use crate::memory_stats::max_rss_stat::{extract_max_rss_stat, get_max_rss_thread
|
|||||||
#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
|
#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
|
||||||
use crate::memory_stats::memory_tracker::{get_memory_tracker_loop_stats, memory_tracker_loop};
|
use crate::memory_stats::memory_tracker::{get_memory_tracker_loop_stats, memory_tracker_loop};
|
||||||
use crate::{
|
use crate::{
|
||||||
common::{bytes_to_path, cpu_time_monitor_loop, worker_event_loop},
|
common::{
|
||||||
|
bytes_to_path, cpu_time_monitor_loop, stringify_panic_payload,
|
||||||
|
thread::{self, WaitOutcome},
|
||||||
|
worker_event_loop,
|
||||||
|
},
|
||||||
prepare, prevalidate, LOG_TARGET,
|
prepare, prevalidate, LOG_TARGET,
|
||||||
};
|
};
|
||||||
use cpu_time::ProcessTime;
|
use cpu_time::ProcessTime;
|
||||||
use futures::{pin_mut, select_biased, FutureExt};
|
|
||||||
use parity_scale_codec::{Decode, Encode};
|
use parity_scale_codec::{Decode, Encode};
|
||||||
use polkadot_node_core_pvf::{
|
use polkadot_node_core_pvf::{
|
||||||
framed_recv, framed_send, CompiledArtifact, MemoryStats, PrepareError, PrepareResult,
|
framed_recv, framed_send, CompiledArtifact, MemoryStats, PrepareError, PrepareResult,
|
||||||
PrepareStats, PvfPrepData,
|
PrepareStats, PvfPrepData,
|
||||||
};
|
};
|
||||||
use std::{any::Any, panic, path::PathBuf, sync::mpsc::channel};
|
use std::{
|
||||||
|
path::PathBuf,
|
||||||
|
sync::{mpsc::channel, Arc},
|
||||||
|
time::Duration,
|
||||||
|
};
|
||||||
use tokio::{io, net::UnixStream};
|
use tokio::{io, net::UnixStream};
|
||||||
|
|
||||||
async fn recv_request(stream: &mut UnixStream) -> io::Result<(PvfPrepData, PathBuf)> {
|
async fn recv_request(stream: &mut UnixStream) -> io::Result<(PvfPrepData, PathBuf)> {
|
||||||
@@ -54,10 +61,14 @@ async fn send_response(stream: &mut UnixStream, result: PrepareResult) -> io::Re
|
|||||||
framed_send(stream, &result.encode()).await
|
framed_send(stream, &result.encode()).await
|
||||||
}
|
}
|
||||||
|
|
||||||
/// The entrypoint that the spawned prepare worker should start with. The `socket_path` specifies
|
/// The entrypoint that the spawned prepare worker should start with.
|
||||||
/// the path to the socket used to communicate with the host. The `node_version`, if `Some`,
|
///
|
||||||
/// is checked against the worker version. A mismatch results in immediate worker termination.
|
/// # Parameters
|
||||||
/// `None` is used for tests and in other situations when version check is not necessary.
|
///
|
||||||
|
/// The `socket_path` specifies the path to the socket used to communicate with the host. The
|
||||||
|
/// `node_version`, if `Some`, is checked against the worker version. A mismatch results in
|
||||||
|
/// immediate worker termination. `None` is used for tests and in other situations when version
|
||||||
|
/// check is not necessary.
|
||||||
///
|
///
|
||||||
/// # Flow
|
/// # Flow
|
||||||
///
|
///
|
||||||
@@ -69,8 +80,7 @@ async fn send_response(stream: &mut UnixStream, result: PrepareResult) -> io::Re
|
|||||||
///
|
///
|
||||||
/// 3. Start the CPU time monitor loop and the actual preparation in two separate threads.
|
/// 3. Start the CPU time monitor loop and the actual preparation in two separate threads.
|
||||||
///
|
///
|
||||||
/// 4. Select on the two threads created in step 3. If the CPU timeout was hit, the CPU time monitor
|
/// 4. Wait on the two threads created in step 3.
|
||||||
/// thread will trigger first.
|
|
||||||
///
|
///
|
||||||
/// 5. Stop the memory tracker and get the stats.
|
/// 5. Stop the memory tracker and get the stats.
|
||||||
///
|
///
|
||||||
@@ -79,7 +89,7 @@ async fn send_response(stream: &mut UnixStream, result: PrepareResult) -> io::Re
|
|||||||
/// 7. Send the result of preparation back to the host. If any error occurred in the above steps, we
|
/// 7. Send the result of preparation back to the host. If any error occurred in the above steps, we
|
||||||
/// send that in the `PrepareResult`.
|
/// send that in the `PrepareResult`.
|
||||||
pub fn worker_entrypoint(socket_path: &str, node_version: Option<&str>) {
|
pub fn worker_entrypoint(socket_path: &str, node_version: Option<&str>) {
|
||||||
worker_event_loop("prepare", socket_path, node_version, |rt_handle, mut stream| async move {
|
worker_event_loop("prepare", socket_path, node_version, |mut stream| async move {
|
||||||
let worker_pid = std::process::id();
|
let worker_pid = std::process::id();
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
@@ -90,74 +100,67 @@ pub fn worker_entrypoint(socket_path: &str, node_version: Option<&str>) {
|
|||||||
"worker: preparing artifact",
|
"worker: preparing artifact",
|
||||||
);
|
);
|
||||||
|
|
||||||
let cpu_time_start = ProcessTime::now();
|
|
||||||
let preparation_timeout = pvf.prep_timeout();
|
let preparation_timeout = pvf.prep_timeout();
|
||||||
|
|
||||||
// Run the memory tracker.
|
// Conditional variable to notify us when a thread is done.
|
||||||
|
let condvar = thread::get_condvar();
|
||||||
|
|
||||||
|
// Run the memory tracker in a regular, non-worker thread.
|
||||||
#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
|
#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
|
||||||
let (memory_tracker_tx, memory_tracker_rx) = channel::<()>();
|
let condvar_memory = Arc::clone(&condvar);
|
||||||
#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
|
#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
|
||||||
let memory_tracker_fut = rt_handle.spawn_blocking(move || memory_tracker_loop(memory_tracker_rx));
|
let memory_tracker_thread = std::thread::spawn(|| memory_tracker_loop(condvar_memory));
|
||||||
|
|
||||||
|
let cpu_time_start = ProcessTime::now();
|
||||||
|
|
||||||
// Spawn a new thread that runs the CPU time monitor.
|
// Spawn a new thread that runs the CPU time monitor.
|
||||||
let (cpu_time_monitor_tx, cpu_time_monitor_rx) = channel::<()>();
|
let (cpu_time_monitor_tx, cpu_time_monitor_rx) = channel::<()>();
|
||||||
let cpu_time_monitor_fut = rt_handle
|
let cpu_time_monitor_thread = thread::spawn_worker_thread(
|
||||||
.spawn_blocking(move || {
|
"cpu time monitor thread",
|
||||||
|
move || {
|
||||||
cpu_time_monitor_loop(cpu_time_start, preparation_timeout, cpu_time_monitor_rx)
|
cpu_time_monitor_loop(cpu_time_start, preparation_timeout, cpu_time_monitor_rx)
|
||||||
})
|
},
|
||||||
.fuse();
|
Arc::clone(&condvar),
|
||||||
|
WaitOutcome::TimedOut,
|
||||||
|
)?;
|
||||||
// Spawn another thread for preparation.
|
// Spawn another thread for preparation.
|
||||||
let prepare_fut = rt_handle
|
let prepare_thread = thread::spawn_worker_thread(
|
||||||
.spawn_blocking(move || {
|
"prepare thread",
|
||||||
let result = prepare_artifact(pvf);
|
move || {
|
||||||
|
let result = prepare_artifact(pvf, cpu_time_start);
|
||||||
|
|
||||||
// Get the `ru_maxrss` stat. If supported, call getrusage for the thread.
|
// Get the `ru_maxrss` stat. If supported, call getrusage for the thread.
|
||||||
#[cfg(target_os = "linux")]
|
#[cfg(target_os = "linux")]
|
||||||
let result = result.map(|artifact| (artifact, get_max_rss_thread()));
|
let result = result.map(|(artifact, elapsed)| (artifact, elapsed, get_max_rss_thread()));
|
||||||
|
|
||||||
result
|
result
|
||||||
})
|
|
||||||
.fuse();
|
|
||||||
|
|
||||||
pin_mut!(cpu_time_monitor_fut);
|
|
||||||
pin_mut!(prepare_fut);
|
|
||||||
|
|
||||||
let result = select_biased! {
|
|
||||||
// If this future is not selected, the join handle is dropped and the thread will
|
|
||||||
// finish in the background.
|
|
||||||
join_res = cpu_time_monitor_fut => {
|
|
||||||
match join_res {
|
|
||||||
Ok(Some(cpu_time_elapsed)) => {
|
|
||||||
// Log if we exceed the timeout and the other thread hasn't finished.
|
|
||||||
gum::warn!(
|
|
||||||
target: LOG_TARGET,
|
|
||||||
%worker_pid,
|
|
||||||
"prepare job took {}ms cpu time, exceeded prepare timeout {}ms",
|
|
||||||
cpu_time_elapsed.as_millis(),
|
|
||||||
preparation_timeout.as_millis(),
|
|
||||||
);
|
|
||||||
Err(PrepareError::TimedOut)
|
|
||||||
},
|
|
||||||
Ok(None) => Err(PrepareError::IoErr("error communicating over finished channel".into())),
|
|
||||||
Err(err) => Err(PrepareError::IoErr(err.to_string())),
|
|
||||||
}
|
|
||||||
},
|
},
|
||||||
prepare_res = prepare_fut => {
|
Arc::clone(&condvar),
|
||||||
let cpu_time_elapsed = cpu_time_start.elapsed();
|
WaitOutcome::Finished,
|
||||||
|
)?;
|
||||||
|
|
||||||
|
let outcome = thread::wait_for_threads(condvar);
|
||||||
|
|
||||||
|
let result = match outcome {
|
||||||
|
WaitOutcome::Finished => {
|
||||||
let _ = cpu_time_monitor_tx.send(());
|
let _ = cpu_time_monitor_tx.send(());
|
||||||
|
|
||||||
match prepare_res.unwrap_or_else(|err| Err(PrepareError::IoErr(err.to_string()))) {
|
match prepare_thread.join().unwrap_or_else(|err| {
|
||||||
|
Err(PrepareError::Panic(stringify_panic_payload(err)))
|
||||||
|
}) {
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
// Serialized error will be written into the socket.
|
// Serialized error will be written into the socket.
|
||||||
Err(err)
|
Err(err)
|
||||||
},
|
},
|
||||||
Ok(ok) => {
|
Ok(ok) => {
|
||||||
|
#[cfg(not(target_os = "linux"))]
|
||||||
|
let (artifact, cpu_time_elapsed) = ok;
|
||||||
|
#[cfg(target_os = "linux")]
|
||||||
|
let (artifact, cpu_time_elapsed, max_rss) = ok;
|
||||||
|
|
||||||
// Stop the memory stats worker and get its observed memory stats.
|
// Stop the memory stats worker and get its observed memory stats.
|
||||||
#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
|
#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
|
||||||
let memory_tracker_stats =
|
let memory_tracker_stats = get_memory_tracker_loop_stats(memory_tracker_thread, worker_pid).await;
|
||||||
get_memory_tracker_loop_stats(memory_tracker_fut, memory_tracker_tx, worker_pid).await;
|
|
||||||
#[cfg(target_os = "linux")]
|
|
||||||
let (ok, max_rss) = ok;
|
|
||||||
let memory_stats = MemoryStats {
|
let memory_stats = MemoryStats {
|
||||||
#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
|
#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
|
||||||
memory_tracker_stats,
|
memory_tracker_stats,
|
||||||
@@ -178,12 +181,36 @@ pub fn worker_entrypoint(socket_path: &str, node_version: Option<&str>) {
|
|||||||
"worker: writing artifact to {}",
|
"worker: writing artifact to {}",
|
||||||
dest.display(),
|
dest.display(),
|
||||||
);
|
);
|
||||||
tokio::fs::write(&dest, &ok).await?;
|
tokio::fs::write(&dest, &artifact).await?;
|
||||||
|
|
||||||
Ok(PrepareStats{cpu_time_elapsed, memory_stats})
|
Ok(PrepareStats { cpu_time_elapsed, memory_stats })
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
// If the CPU thread is not selected, we signal it to end, the join handle is
|
||||||
|
// dropped and the thread will finish in the background.
|
||||||
|
WaitOutcome::TimedOut => {
|
||||||
|
match cpu_time_monitor_thread.join() {
|
||||||
|
Ok(Some(cpu_time_elapsed)) => {
|
||||||
|
// Log if we exceed the timeout and the other thread hasn't finished.
|
||||||
|
gum::warn!(
|
||||||
|
target: LOG_TARGET,
|
||||||
|
%worker_pid,
|
||||||
|
"prepare job took {}ms cpu time, exceeded prepare timeout {}ms",
|
||||||
|
cpu_time_elapsed.as_millis(),
|
||||||
|
preparation_timeout.as_millis(),
|
||||||
|
);
|
||||||
|
Err(PrepareError::TimedOut)
|
||||||
|
},
|
||||||
|
Ok(None) => Err(PrepareError::IoErr(
|
||||||
|
"error communicating over closed channel".into(),
|
||||||
|
)),
|
||||||
|
// Errors in this thread are independent of the candidate.
|
||||||
|
Err(err) => Err(PrepareError::IoErr(stringify_panic_payload(err))),
|
||||||
|
}
|
||||||
|
},
|
||||||
|
WaitOutcome::Pending =>
|
||||||
|
unreachable!("we run wait_while until the outcome is no longer pending; qed"),
|
||||||
};
|
};
|
||||||
|
|
||||||
send_response(&mut stream, result).await?;
|
send_response(&mut stream, result).await?;
|
||||||
@@ -191,32 +218,18 @@ pub fn worker_entrypoint(socket_path: &str, node_version: Option<&str>) {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
fn prepare_artifact(pvf: PvfPrepData) -> Result<CompiledArtifact, PrepareError> {
|
fn prepare_artifact(
|
||||||
panic::catch_unwind(|| {
|
pvf: PvfPrepData,
|
||||||
let blob = match prevalidate(&pvf.code()) {
|
cpu_time_start: ProcessTime,
|
||||||
Err(err) => return Err(PrepareError::Prevalidation(format!("{:?}", err))),
|
) -> Result<(CompiledArtifact, Duration), PrepareError> {
|
||||||
Ok(b) => b,
|
let blob = match prevalidate(&pvf.code()) {
|
||||||
};
|
Err(err) => return Err(PrepareError::Prevalidation(format!("{:?}", err))),
|
||||||
|
Ok(b) => b,
|
||||||
|
};
|
||||||
|
|
||||||
match prepare(blob, &pvf.executor_params()) {
|
match prepare(blob, &pvf.executor_params()) {
|
||||||
Ok(compiled_artifact) => Ok(CompiledArtifact::new(compiled_artifact)),
|
Ok(compiled_artifact) => Ok(CompiledArtifact::new(compiled_artifact)),
|
||||||
Err(err) => Err(PrepareError::Preparation(format!("{:?}", err))),
|
Err(err) => Err(PrepareError::Preparation(format!("{:?}", err))),
|
||||||
}
|
|
||||||
})
|
|
||||||
.map_err(|panic_payload| PrepareError::Panic(stringify_panic_payload(panic_payload)))
|
|
||||||
.and_then(|inner_result| inner_result)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Attempt to convert an opaque panic payload to a string.
|
|
||||||
///
|
|
||||||
/// This is a best effort, and is not guaranteed to provide the most accurate value.
|
|
||||||
fn stringify_panic_payload(payload: Box<dyn Any + Send + 'static>) -> String {
|
|
||||||
match payload.downcast::<&'static str>() {
|
|
||||||
Ok(msg) => msg.to_string(),
|
|
||||||
Err(payload) => match payload.downcast::<String>() {
|
|
||||||
Ok(msg) => *msg,
|
|
||||||
// At least we tried...
|
|
||||||
Err(_) => "unknown panic payload".to_string(),
|
|
||||||
},
|
|
||||||
}
|
}
|
||||||
|
.map(|artifact| (artifact, cpu_time_start.elapsed()))
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user