From b75b137b0f5f6b6fc86a5c8ef2aca2638eb93085 Mon Sep 17 00:00:00 2001 From: Marcin S Date: Tue, 16 May 2023 14:34:58 -0400 Subject: [PATCH] PVF: Remove `rayon` and some uses of `tokio` (#7153) --- polkadot/node/core/pvf/worker/src/common.rs | 152 ++++++++++++++- polkadot/node/core/pvf/worker/src/execute.rs | 107 +++++++---- .../node/core/pvf/worker/src/executor_intf.rs | 134 ++++++------- .../node/core/pvf/worker/src/memory_stats.rs | 87 ++++----- polkadot/node/core/pvf/worker/src/prepare.rs | 177 ++++++++++-------- 5 files changed, 397 insertions(+), 260 deletions(-) diff --git a/polkadot/node/core/pvf/worker/src/common.rs b/polkadot/node/core/pvf/worker/src/common.rs index 84bc88701d..00289737a5 100644 --- a/polkadot/node/core/pvf/worker/src/common.rs +++ b/polkadot/node/core/pvf/worker/src/common.rs @@ -18,15 +18,12 @@ use crate::LOG_TARGET; use cpu_time::ProcessTime; use futures::never::Never; use std::{ + any::Any, path::PathBuf, sync::mpsc::{Receiver, RecvTimeoutError}, time::Duration, }; -use tokio::{ - io, - net::UnixStream, - runtime::{Handle, Runtime}, -}; +use tokio::{io, net::UnixStream, runtime::Runtime}; /// Some allowed overhead that we account for in the "CPU time monitor" thread's sleeps, on the /// child process. @@ -44,7 +41,7 @@ pub fn worker_event_loop( node_version: Option<&str>, mut event_loop: F, ) where - F: FnMut(Handle, UnixStream) -> Fut, + F: FnMut(UnixStream) -> Fut, Fut: futures::Future>, { let worker_pid = std::process::id(); @@ -68,13 +65,12 @@ pub fn worker_event_loop( // Run the main worker loop. let rt = Runtime::new().expect("Creates tokio runtime. If this panics the worker will die and the host will detect that and deal with it."); - let handle = rt.handle(); let err = rt .block_on(async move { let stream = UnixStream::connect(socket_path).await?; let _ = tokio::fs::remove_file(socket_path).await; - let result = event_loop(handle.clone(), stream).await; + let result = event_loop(stream).await; result }) @@ -108,8 +104,10 @@ pub fn cpu_time_monitor_loop( // Treat the timeout as CPU time, which is less subject to variance due to load. if cpu_time_elapsed <= timeout { - // Sleep for the remaining CPU time, plus a bit to account for overhead. Note that the sleep - // is wall clock time. The CPU clock may be slower than the wall clock. + // Sleep for the remaining CPU time, plus a bit to account for overhead. (And we don't + // want to wake up too often -- so, since we just want to halt the worker thread if it + // stalled, we can sleep longer than necessary.) Note that the sleep is wall clock time. + // The CPU clock may be slower than the wall clock. let sleep_interval = timeout.saturating_sub(cpu_time_elapsed) + JOB_TIMEOUT_OVERHEAD; match finished_rx.recv_timeout(sleep_interval) { // Received finish signal. @@ -124,6 +122,20 @@ pub fn cpu_time_monitor_loop( } } +/// Attempt to convert an opaque panic payload to a string. +/// +/// This is a best effort, and is not guaranteed to provide the most accurate value. +pub fn stringify_panic_payload(payload: Box) -> String { + match payload.downcast::<&'static str>() { + Ok(msg) => msg.to_string(), + Err(payload) => match payload.downcast::() { + Ok(msg) => *msg, + // At least we tried... + Err(_) => "unknown panic payload".to_string(), + }, + } +} + /// In case of node and worker version mismatch (as a result of in-place upgrade), send `SIGTERM` /// to the node to tear it down and prevent it from raising disputes on valid candidates. Node /// restart should be handled by the node owner. As node exits, unix sockets opened to workers @@ -140,3 +152,123 @@ fn kill_parent_node_in_emergency() { } } } + +/// Functionality related to threads spawned by the workers. +/// +/// The motivation for this module is to coordinate worker threads without using async Rust. +pub mod thread { + use std::{ + panic, + sync::{Arc, Condvar, Mutex}, + thread, + time::Duration, + }; + + /// Contains the outcome of waiting on threads, or `Pending` if none are ready. + #[derive(Clone, Copy)] + pub enum WaitOutcome { + Finished, + TimedOut, + Pending, + } + + impl WaitOutcome { + pub fn is_pending(&self) -> bool { + matches!(self, Self::Pending) + } + } + + /// Helper type. + pub type Cond = Arc<(Mutex, Condvar)>; + + /// Gets a condvar initialized to `Pending`. + pub fn get_condvar() -> Cond { + Arc::new((Mutex::new(WaitOutcome::Pending), Condvar::new())) + } + + /// Runs a thread, afterwards notifying the threads waiting on the condvar. Catches panics and + /// resumes them after triggering the condvar, so that the waiting thread is notified on panics. + pub fn spawn_worker_thread( + name: &str, + f: F, + cond: Cond, + outcome: WaitOutcome, + ) -> std::io::Result> + where + F: FnOnce() -> R, + F: Send + 'static + panic::UnwindSafe, + R: Send + 'static, + { + thread::Builder::new() + .name(name.into()) + .spawn(move || cond_notify_on_done(f, cond, outcome)) + } + + /// Runs a worker thread with the given stack size. See [`spawn_worker_thread`]. + pub fn spawn_worker_thread_with_stack_size( + name: &str, + f: F, + cond: Cond, + outcome: WaitOutcome, + stack_size: usize, + ) -> std::io::Result> + where + F: FnOnce() -> R, + F: Send + 'static + panic::UnwindSafe, + R: Send + 'static, + { + thread::Builder::new() + .name(name.into()) + .stack_size(stack_size) + .spawn(move || cond_notify_on_done(f, cond, outcome)) + } + + /// Runs a function, afterwards notifying the threads waiting on the condvar. Catches panics and + /// resumes them after triggering the condvar, so that the waiting thread is notified on panics. + fn cond_notify_on_done(f: F, cond: Cond, outcome: WaitOutcome) -> R + where + F: FnOnce() -> R, + F: panic::UnwindSafe, + { + let result = panic::catch_unwind(|| f()); + cond_notify_all(cond, outcome); + match result { + Ok(inner) => return inner, + Err(err) => panic::resume_unwind(err), + } + } + + /// Helper function to notify all threads waiting on this condvar. + fn cond_notify_all(cond: Cond, outcome: WaitOutcome) { + let (lock, cvar) = &*cond; + let mut flag = lock.lock().unwrap(); + if !flag.is_pending() { + // Someone else already triggered the condvar. + return + } + *flag = outcome; + cvar.notify_all(); + } + + /// Block the thread while it waits on the condvar. + pub fn wait_for_threads(cond: Cond) -> WaitOutcome { + let (lock, cvar) = &*cond; + let guard = cvar.wait_while(lock.lock().unwrap(), |flag| flag.is_pending()).unwrap(); + *guard + } + + /// Block the thread while it waits on the condvar or on a timeout. If the timeout is hit, + /// returns `None`. + #[cfg_attr(not(any(target_os = "linux", feature = "jemalloc-allocator")), allow(dead_code))] + pub fn wait_for_threads_with_timeout(cond: &Cond, dur: Duration) -> Option { + let (lock, cvar) = &**cond; + let result = cvar + .wait_timeout_while(lock.lock().unwrap(), dur, |flag| flag.is_pending()) + .unwrap(); + if result.1.timed_out() { + None + } else { + Some(*result.0) + } + } +} diff --git a/polkadot/node/core/pvf/worker/src/execute.rs b/polkadot/node/core/pvf/worker/src/execute.rs index 9f6ff164a2..78f1f700ad 100644 --- a/polkadot/node/core/pvf/worker/src/execute.rs +++ b/polkadot/node/core/pvf/worker/src/execute.rs @@ -15,12 +15,15 @@ // along with Polkadot. If not, see . use crate::{ - common::{bytes_to_path, cpu_time_monitor_loop, worker_event_loop}, - executor_intf::Executor, + common::{ + bytes_to_path, cpu_time_monitor_loop, stringify_panic_payload, + thread::{self, WaitOutcome}, + worker_event_loop, + }, + executor_intf::{Executor, EXECUTE_THREAD_STACK_SIZE}, LOG_TARGET, }; use cpu_time::ProcessTime; -use futures::{pin_mut, select_biased, FutureExt}; use parity_scale_codec::{Decode, Encode}; use polkadot_node_core_pvf::{ framed_recv, framed_send, ExecuteHandshake as Handshake, ExecuteResponse as Response, @@ -67,18 +70,22 @@ async fn send_response(stream: &mut UnixStream, response: Response) -> io::Resul framed_send(stream, &response.encode()).await } -/// The entrypoint that the spawned execute worker should start with. The `socket_path` specifies -/// the path to the socket used to communicate with the host. The `node_version`, if `Some`, -/// is checked against the worker version. A mismatch results in immediate worker termination. -/// `None` is used for tests and in other situations when version check is not necessary. +/// The entrypoint that the spawned execute worker should start with. +/// +/// # Parameters +/// +/// The `socket_path` specifies the path to the socket used to communicate with the host. The +/// `node_version`, if `Some`, is checked against the worker version. A mismatch results in +/// immediate worker termination. `None` is used for tests and in other situations when version +/// check is not necessary. pub fn worker_entrypoint(socket_path: &str, node_version: Option<&str>) { - worker_event_loop("execute", socket_path, node_version, |rt_handle, mut stream| async move { + worker_event_loop("execute", socket_path, node_version, |mut stream| async move { let worker_pid = std::process::id(); let handshake = recv_handshake(&mut stream).await?; - let executor = Arc::new(Executor::new(handshake.executor_params).map_err(|e| { + let executor = Executor::new(handshake.executor_params).map_err(|e| { io::Error::new(io::ErrorKind::Other, format!("cannot create executor: {}", e)) - })?); + })?; loop { let (artifact_path, params, execution_timeout) = recv_request(&mut stream).await?; @@ -89,31 +96,49 @@ pub fn worker_entrypoint(socket_path: &str, node_version: Option<&str>) { artifact_path.display(), ); - // Used to signal to the cpu time monitor thread that it can finish. - let (finished_tx, finished_rx) = channel::<()>(); + // Conditional variable to notify us when a thread is done. + let condvar = thread::get_condvar(); + let cpu_time_start = ProcessTime::now(); // Spawn a new thread that runs the CPU time monitor. - let cpu_time_monitor_fut = rt_handle - .spawn_blocking(move || { - cpu_time_monitor_loop(cpu_time_start, execution_timeout, finished_rx) - }) - .fuse(); + let (cpu_time_monitor_tx, cpu_time_monitor_rx) = channel::<()>(); + let cpu_time_monitor_thread = thread::spawn_worker_thread( + "cpu time monitor thread", + move || { + cpu_time_monitor_loop(cpu_time_start, execution_timeout, cpu_time_monitor_rx) + }, + Arc::clone(&condvar), + WaitOutcome::TimedOut, + )?; let executor_2 = executor.clone(); - let execute_fut = rt_handle - .spawn_blocking(move || { + let execute_thread = thread::spawn_worker_thread_with_stack_size( + "execute thread", + move || { validate_using_artifact(&artifact_path, ¶ms, executor_2, cpu_time_start) - }) - .fuse(); + }, + Arc::clone(&condvar), + WaitOutcome::Finished, + EXECUTE_THREAD_STACK_SIZE, + )?; - pin_mut!(cpu_time_monitor_fut); - pin_mut!(execute_fut); + let outcome = thread::wait_for_threads(condvar); - let response = select_biased! { - // If this future is not selected, the join handle is dropped and the thread will - // finish in the background. - cpu_time_monitor_res = cpu_time_monitor_fut => { - match cpu_time_monitor_res { + let response = match outcome { + WaitOutcome::Finished => { + let _ = cpu_time_monitor_tx.send(()); + execute_thread.join().unwrap_or_else(|e| { + // TODO: Use `Panic` error once that is implemented. + Response::format_internal( + "execute thread error", + &stringify_panic_payload(e), + ) + }) + }, + // If the CPU thread is not selected, we signal it to end, the join handle is + // dropped and the thread will finish in the background. + WaitOutcome::TimedOut => { + match cpu_time_monitor_thread.join() { Ok(Some(cpu_time_elapsed)) => { // Log if we exceed the timeout and the other thread hasn't finished. gum::warn!( @@ -125,14 +150,20 @@ pub fn worker_entrypoint(socket_path: &str, node_version: Option<&str>) { ); Response::TimedOut }, - Ok(None) => Response::InternalError("error communicating over finished channel".into()), - Err(e) => Response::format_internal("cpu time monitor thread error", &e.to_string()), + Ok(None) => Response::format_internal( + "cpu time monitor thread error", + "error communicating over closed channel".into(), + ), + // We can use an internal error here because errors in this thread are + // independent of the candidate. + Err(e) => Response::format_internal( + "cpu time monitor thread error", + &stringify_panic_payload(e), + ), } }, - execute_res = execute_fut => { - let _ = finished_tx.send(()); - execute_res.unwrap_or_else(|e| Response::format_internal("execute thread error", &e.to_string())) - }, + WaitOutcome::Pending => + unreachable!("we run wait_while until the outcome is no longer pending; qed"), }; send_response(&mut stream, response).await?; @@ -143,7 +174,7 @@ pub fn worker_entrypoint(socket_path: &str, node_version: Option<&str>) { fn validate_using_artifact( artifact_path: &Path, params: &[u8], - executor: Arc, + executor: Executor, cpu_time_start: ProcessTime, ) -> Response { // Check here if the file exists, because the error from Substrate is not match-able. @@ -163,13 +194,15 @@ fn validate_using_artifact( Ok(d) => d, }; - let duration = cpu_time_start.elapsed(); - let result_descriptor = match ValidationResult::decode(&mut &descriptor_bytes[..]) { Err(err) => return Response::format_invalid("validation result decoding failed", &err.to_string()), Ok(r) => r, }; + // Include the decoding in the measured time, to prevent any potential attacks exploiting some + // bug in decoding. + let duration = cpu_time_start.elapsed(); + Response::Ok { result_descriptor, duration } } diff --git a/polkadot/node/core/pvf/worker/src/executor_intf.rs b/polkadot/node/core/pvf/worker/src/executor_intf.rs index 54bf6fd6bc..05d3b41c76 100644 --- a/polkadot/node/core/pvf/worker/src/executor_intf.rs +++ b/polkadot/node/core/pvf/worker/src/executor_intf.rs @@ -29,6 +29,42 @@ use std::{ path::Path, }; +// Wasmtime powers the Substrate Executor. It compiles the wasm bytecode into native code. +// That native code does not create any stacks and just reuses the stack of the thread that +// wasmtime was invoked from. +// +// Also, we configure the executor to provide the deterministic stack and that requires +// supplying the amount of the native stack space that wasm is allowed to use. This is +// realized by supplying the limit into `wasmtime::Config::max_wasm_stack`. +// +// There are quirks to that configuration knob: +// +// 1. It only limits the amount of stack space consumed by wasm but does not ensure nor check +// that the stack space is actually available. +// +// That means, if the calling thread has 1 MiB of stack space left and the wasm code consumes +// more, then the wasmtime limit will **not** trigger. Instead, the wasm code will hit the +// guard page and the Rust stack overflow handler will be triggered. That leads to an +// **abort**. +// +// 2. It cannot and does not limit the stack space consumed by Rust code. +// +// Meaning that if the wasm code leaves no stack space for Rust code, then the Rust code +// will abort and that will abort the process as well. +// +// Typically on Linux the main thread gets the stack size specified by the `ulimit` and +// typically it's configured to 8 MiB. Rust's spawned threads are 2 MiB. OTOH, the +// NATIVE_STACK_MAX is set to 256 MiB. Not nearly enough. +// +// Hence we need to increase it. The simplest way to fix that is to spawn a thread with the desired +// stack limit. +// +// The reasoning why we pick this particular size is: +// +// The default Rust thread stack limit 2 MiB + 256 MiB wasm stack. +/// The stack size for the execute thread. +pub const EXECUTE_THREAD_STACK_SIZE: usize = 2 * 1024 * 1024 + NATIVE_STACK_MAX as usize; + // Memory configuration // // When Substrate Runtime is instantiated, a number of WASM pages are allocated for the Substrate @@ -142,60 +178,17 @@ fn params_to_wasmtime_semantics(par: &ExecutorParams) -> Result Result { - // Wasmtime powers the Substrate Executor. It compiles the wasm bytecode into native code. - // That native code does not create any stacks and just reuses the stack of the thread that - // wasmtime was invoked from. - // - // Also, we configure the executor to provide the deterministic stack and that requires - // supplying the amount of the native stack space that wasm is allowed to use. This is - // realized by supplying the limit into `wasmtime::Config::max_wasm_stack`. - // - // There are quirks to that configuration knob: - // - // 1. It only limits the amount of stack space consumed by wasm but does not ensure nor check - // that the stack space is actually available. - // - // That means, if the calling thread has 1 MiB of stack space left and the wasm code consumes - // more, then the wasmtime limit will **not** trigger. Instead, the wasm code will hit the - // guard page and the Rust stack overflow handler will be triggered. That leads to an - // **abort**. - // - // 2. It cannot and does not limit the stack space consumed by Rust code. - // - // Meaning that if the wasm code leaves no stack space for Rust code, then the Rust code - // will abort and that will abort the process as well. - // - // Typically on Linux the main thread gets the stack size specified by the `ulimit` and - // typically it's configured to 8 MiB. Rust's spawned threads are 2 MiB. OTOH, the - // NATIVE_STACK_MAX is set to 256 MiB. Not nearly enough. - // - // Hence we need to increase it. - // - // The simplest way to fix that is to spawn a thread with the desired stack limit. In order - // to avoid costs of creating a thread, we use a thread pool. The execution is - // single-threaded hence the thread pool has only one thread. - // - // The reasoning why we pick this particular size is: - // - // The default Rust thread stack limit 2 MiB + 256 MiB wasm stack. - let thread_stack_size = 2 * 1024 * 1024 + NATIVE_STACK_MAX as usize; - let thread_pool = rayon::ThreadPoolBuilder::new() - .num_threads(1) - .stack_size(thread_stack_size) - .build() - .map_err(|e| format!("Failed to create thread pool: {:?}", e))?; - let mut config = DEFAULT_CONFIG.clone(); config.semantics = params_to_wasmtime_semantics(¶ms)?; - Ok(Self { thread_pool, config }) + Ok(Self { config }) } /// Executes the given PVF in the form of a compiled artifact and returns the result of execution @@ -216,43 +209,26 @@ impl Executor { compiled_artifact_path: &Path, params: &[u8], ) -> Result, String> { - let mut result = None; - self.thread_pool.scope({ - let result = &mut result; - move |s| { - s.spawn(move |_| { - // spawn does not return a value, so we need to use a variable to pass the result. - *result = Some( - do_execute(compiled_artifact_path, self.config.clone(), params) - .map_err(|err| format!("execute error: {:?}", err)), - ); - }); - } - }); - result.unwrap_or_else(|| Err("rayon thread pool spawn failed".to_string())) + let mut extensions = sp_externalities::Extensions::new(); + + extensions.register(sp_core::traits::ReadRuntimeVersionExt::new(ReadRuntimeVersion)); + + let mut ext = ValidationExternalities(extensions); + + match sc_executor::with_externalities_safe(&mut ext, || { + let runtime = sc_executor_wasmtime::create_runtime_from_artifact::( + compiled_artifact_path, + self.config.clone(), + )?; + runtime.new_instance()?.call(InvokeMethod::Export("validate_block"), params) + }) { + Ok(Ok(ok)) => Ok(ok), + Ok(Err(err)) | Err(err) => Err(err), + } + .map_err(|err| format!("execute error: {:?}", err)) } } -unsafe fn do_execute( - compiled_artifact_path: &Path, - config: Config, - params: &[u8], -) -> Result, sc_executor_common::error::Error> { - let mut extensions = sp_externalities::Extensions::new(); - - extensions.register(sp_core::traits::ReadRuntimeVersionExt::new(ReadRuntimeVersion)); - - let mut ext = ValidationExternalities(extensions); - - sc_executor::with_externalities_safe(&mut ext, || { - let runtime = sc_executor_wasmtime::create_runtime_from_artifact::( - compiled_artifact_path, - config, - )?; - runtime.new_instance()?.call(InvokeMethod::Export("validate_block"), params) - })? -} - type HostFunctions = ( sp_io::misc::HostFunctions, sp_io::crypto::HostFunctions, diff --git a/polkadot/node/core/pvf/worker/src/memory_stats.rs b/polkadot/node/core/pvf/worker/src/memory_stats.rs index 945c849eb1..907f793d87 100644 --- a/polkadot/node/core/pvf/worker/src/memory_stats.rs +++ b/polkadot/node/core/pvf/worker/src/memory_stats.rs @@ -33,14 +33,13 @@ /// NOTE: Requires jemalloc enabled. #[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))] pub mod memory_tracker { - use crate::LOG_TARGET; - use polkadot_node_core_pvf::MemoryAllocationStats; - use std::{ - sync::mpsc::{Receiver, RecvTimeoutError, Sender}, - time::Duration, + use crate::{ + common::{stringify_panic_payload, thread}, + LOG_TARGET, }; + use polkadot_node_core_pvf::MemoryAllocationStats; + use std::{thread::JoinHandle, time::Duration}; use tikv_jemalloc_ctl::{epoch, stats, Error}; - use tokio::task::JoinHandle; #[derive(Clone)] struct MemoryAllocationTracker { @@ -79,16 +78,16 @@ pub mod memory_tracker { /// 2. Sleep for some short interval. Whenever we wake up, take a snapshot by updating the /// allocation epoch. /// - /// 3. When we receive a signal that preparation has completed, take one last snapshot and return + /// 3. When we are notified that preparation has completed, take one last snapshot and return /// the maximum observed values. /// /// # Errors /// /// For simplicity, any errors are returned as a string. As this is not a critical component, errors /// are used for informational purposes (logging) only. - pub fn memory_tracker_loop(finished_rx: Receiver<()>) -> Result { - // This doesn't need to be too fine-grained since preparation currently takes 3-10s or more. - // Apart from that, there is not really a science to this number. + pub fn memory_tracker_loop(condvar: thread::Cond) -> Result { + // NOTE: This doesn't need to be too fine-grained since preparation currently takes 3-10s or + // more. Apart from that, there is not really a science to this number. const POLL_INTERVAL: Duration = Duration::from_millis(100); let tracker = MemoryAllocationTracker::new().map_err(|err| err.to_string())?; @@ -109,58 +108,42 @@ pub mod memory_tracker { // Take a snapshot and update the max stats. update_stats()?; - // Sleep. - match finished_rx.recv_timeout(POLL_INTERVAL) { - // Received finish signal. - Ok(()) => { + // Sleep for the poll interval, or wake up if the condvar is triggered. Note that + // `wait_timeout_while` is documented as not being very precise or reliable, which is + // fine here -- see note above. + match thread::wait_for_threads_with_timeout(&condvar, POLL_INTERVAL) { + Some(_outcome) => { update_stats()?; return Ok(max_stats) }, - // Timed out, restart loop. - Err(RecvTimeoutError::Timeout) => continue, - Err(RecvTimeoutError::Disconnected) => - return Err("memory_tracker_loop: finished_rx disconnected".into()), + None => continue, } } } - /// Helper function to terminate the memory tracker thread and get the stats. Helps isolate all this - /// error handling. + /// Helper function to get the stats from the memory tracker. Helps isolate this error handling. pub async fn get_memory_tracker_loop_stats( - fut: JoinHandle>, - tx: Sender<()>, + thread: JoinHandle>, worker_pid: u32, ) -> Option { - // Signal to the memory tracker thread to terminate. - if let Err(err) = tx.send(()) { - gum::warn!( - target: LOG_TARGET, - %worker_pid, - "worker: error sending signal to memory tracker_thread: {}", - err - ); - None - } else { - // Join on the thread handle. - match fut.await { - Ok(Ok(stats)) => Some(stats), - Ok(Err(err)) => { - gum::warn!( - target: LOG_TARGET, - %worker_pid, - "worker: error occurred in the memory tracker thread: {}", err - ); - None - }, - Err(err) => { - gum::warn!( - target: LOG_TARGET, - %worker_pid, - "worker: error joining on memory tracker thread: {}", err - ); - None - }, - } + match thread.join() { + Ok(Ok(stats)) => Some(stats), + Ok(Err(err)) => { + gum::warn!( + target: LOG_TARGET, + %worker_pid, + "worker: error occurred in the memory tracker thread: {}", err + ); + None + }, + Err(err) => { + gum::warn!( + target: LOG_TARGET, + %worker_pid, + "worker: error joining on memory tracker thread: {}", stringify_panic_payload(err) + ); + None + }, } } } diff --git a/polkadot/node/core/pvf/worker/src/prepare.rs b/polkadot/node/core/pvf/worker/src/prepare.rs index 3cec7439f8..36b05318c8 100644 --- a/polkadot/node/core/pvf/worker/src/prepare.rs +++ b/polkadot/node/core/pvf/worker/src/prepare.rs @@ -19,17 +19,24 @@ use crate::memory_stats::max_rss_stat::{extract_max_rss_stat, get_max_rss_thread #[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))] use crate::memory_stats::memory_tracker::{get_memory_tracker_loop_stats, memory_tracker_loop}; use crate::{ - common::{bytes_to_path, cpu_time_monitor_loop, worker_event_loop}, + common::{ + bytes_to_path, cpu_time_monitor_loop, stringify_panic_payload, + thread::{self, WaitOutcome}, + worker_event_loop, + }, prepare, prevalidate, LOG_TARGET, }; use cpu_time::ProcessTime; -use futures::{pin_mut, select_biased, FutureExt}; use parity_scale_codec::{Decode, Encode}; use polkadot_node_core_pvf::{ framed_recv, framed_send, CompiledArtifact, MemoryStats, PrepareError, PrepareResult, PrepareStats, PvfPrepData, }; -use std::{any::Any, panic, path::PathBuf, sync::mpsc::channel}; +use std::{ + path::PathBuf, + sync::{mpsc::channel, Arc}, + time::Duration, +}; use tokio::{io, net::UnixStream}; async fn recv_request(stream: &mut UnixStream) -> io::Result<(PvfPrepData, PathBuf)> { @@ -54,10 +61,14 @@ async fn send_response(stream: &mut UnixStream, result: PrepareResult) -> io::Re framed_send(stream, &result.encode()).await } -/// The entrypoint that the spawned prepare worker should start with. The `socket_path` specifies -/// the path to the socket used to communicate with the host. The `node_version`, if `Some`, -/// is checked against the worker version. A mismatch results in immediate worker termination. -/// `None` is used for tests and in other situations when version check is not necessary. +/// The entrypoint that the spawned prepare worker should start with. +/// +/// # Parameters +/// +/// The `socket_path` specifies the path to the socket used to communicate with the host. The +/// `node_version`, if `Some`, is checked against the worker version. A mismatch results in +/// immediate worker termination. `None` is used for tests and in other situations when version +/// check is not necessary. /// /// # Flow /// @@ -69,8 +80,7 @@ async fn send_response(stream: &mut UnixStream, result: PrepareResult) -> io::Re /// /// 3. Start the CPU time monitor loop and the actual preparation in two separate threads. /// -/// 4. Select on the two threads created in step 3. If the CPU timeout was hit, the CPU time monitor -/// thread will trigger first. +/// 4. Wait on the two threads created in step 3. /// /// 5. Stop the memory tracker and get the stats. /// @@ -79,7 +89,7 @@ async fn send_response(stream: &mut UnixStream, result: PrepareResult) -> io::Re /// 7. Send the result of preparation back to the host. If any error occurred in the above steps, we /// send that in the `PrepareResult`. pub fn worker_entrypoint(socket_path: &str, node_version: Option<&str>) { - worker_event_loop("prepare", socket_path, node_version, |rt_handle, mut stream| async move { + worker_event_loop("prepare", socket_path, node_version, |mut stream| async move { let worker_pid = std::process::id(); loop { @@ -90,74 +100,67 @@ pub fn worker_entrypoint(socket_path: &str, node_version: Option<&str>) { "worker: preparing artifact", ); - let cpu_time_start = ProcessTime::now(); let preparation_timeout = pvf.prep_timeout(); - // Run the memory tracker. + // Conditional variable to notify us when a thread is done. + let condvar = thread::get_condvar(); + + // Run the memory tracker in a regular, non-worker thread. #[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))] - let (memory_tracker_tx, memory_tracker_rx) = channel::<()>(); + let condvar_memory = Arc::clone(&condvar); #[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))] - let memory_tracker_fut = rt_handle.spawn_blocking(move || memory_tracker_loop(memory_tracker_rx)); + let memory_tracker_thread = std::thread::spawn(|| memory_tracker_loop(condvar_memory)); + + let cpu_time_start = ProcessTime::now(); // Spawn a new thread that runs the CPU time monitor. let (cpu_time_monitor_tx, cpu_time_monitor_rx) = channel::<()>(); - let cpu_time_monitor_fut = rt_handle - .spawn_blocking(move || { + let cpu_time_monitor_thread = thread::spawn_worker_thread( + "cpu time monitor thread", + move || { cpu_time_monitor_loop(cpu_time_start, preparation_timeout, cpu_time_monitor_rx) - }) - .fuse(); + }, + Arc::clone(&condvar), + WaitOutcome::TimedOut, + )?; // Spawn another thread for preparation. - let prepare_fut = rt_handle - .spawn_blocking(move || { - let result = prepare_artifact(pvf); + let prepare_thread = thread::spawn_worker_thread( + "prepare thread", + move || { + let result = prepare_artifact(pvf, cpu_time_start); // Get the `ru_maxrss` stat. If supported, call getrusage for the thread. #[cfg(target_os = "linux")] - let result = result.map(|artifact| (artifact, get_max_rss_thread())); + let result = result.map(|(artifact, elapsed)| (artifact, elapsed, get_max_rss_thread())); result - }) - .fuse(); - - pin_mut!(cpu_time_monitor_fut); - pin_mut!(prepare_fut); - - let result = select_biased! { - // If this future is not selected, the join handle is dropped and the thread will - // finish in the background. - join_res = cpu_time_monitor_fut => { - match join_res { - Ok(Some(cpu_time_elapsed)) => { - // Log if we exceed the timeout and the other thread hasn't finished. - gum::warn!( - target: LOG_TARGET, - %worker_pid, - "prepare job took {}ms cpu time, exceeded prepare timeout {}ms", - cpu_time_elapsed.as_millis(), - preparation_timeout.as_millis(), - ); - Err(PrepareError::TimedOut) - }, - Ok(None) => Err(PrepareError::IoErr("error communicating over finished channel".into())), - Err(err) => Err(PrepareError::IoErr(err.to_string())), - } }, - prepare_res = prepare_fut => { - let cpu_time_elapsed = cpu_time_start.elapsed(); + Arc::clone(&condvar), + WaitOutcome::Finished, + )?; + + let outcome = thread::wait_for_threads(condvar); + + let result = match outcome { + WaitOutcome::Finished => { let _ = cpu_time_monitor_tx.send(()); - match prepare_res.unwrap_or_else(|err| Err(PrepareError::IoErr(err.to_string()))) { + match prepare_thread.join().unwrap_or_else(|err| { + Err(PrepareError::Panic(stringify_panic_payload(err))) + }) { Err(err) => { // Serialized error will be written into the socket. Err(err) }, Ok(ok) => { + #[cfg(not(target_os = "linux"))] + let (artifact, cpu_time_elapsed) = ok; + #[cfg(target_os = "linux")] + let (artifact, cpu_time_elapsed, max_rss) = ok; + // Stop the memory stats worker and get its observed memory stats. #[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))] - let memory_tracker_stats = - get_memory_tracker_loop_stats(memory_tracker_fut, memory_tracker_tx, worker_pid).await; - #[cfg(target_os = "linux")] - let (ok, max_rss) = ok; + let memory_tracker_stats = get_memory_tracker_loop_stats(memory_tracker_thread, worker_pid).await; let memory_stats = MemoryStats { #[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))] memory_tracker_stats, @@ -178,12 +181,36 @@ pub fn worker_entrypoint(socket_path: &str, node_version: Option<&str>) { "worker: writing artifact to {}", dest.display(), ); - tokio::fs::write(&dest, &ok).await?; + tokio::fs::write(&dest, &artifact).await?; - Ok(PrepareStats{cpu_time_elapsed, memory_stats}) + Ok(PrepareStats { cpu_time_elapsed, memory_stats }) }, } }, + // If the CPU thread is not selected, we signal it to end, the join handle is + // dropped and the thread will finish in the background. + WaitOutcome::TimedOut => { + match cpu_time_monitor_thread.join() { + Ok(Some(cpu_time_elapsed)) => { + // Log if we exceed the timeout and the other thread hasn't finished. + gum::warn!( + target: LOG_TARGET, + %worker_pid, + "prepare job took {}ms cpu time, exceeded prepare timeout {}ms", + cpu_time_elapsed.as_millis(), + preparation_timeout.as_millis(), + ); + Err(PrepareError::TimedOut) + }, + Ok(None) => Err(PrepareError::IoErr( + "error communicating over closed channel".into(), + )), + // Errors in this thread are independent of the candidate. + Err(err) => Err(PrepareError::IoErr(stringify_panic_payload(err))), + } + }, + WaitOutcome::Pending => + unreachable!("we run wait_while until the outcome is no longer pending; qed"), }; send_response(&mut stream, result).await?; @@ -191,32 +218,18 @@ pub fn worker_entrypoint(socket_path: &str, node_version: Option<&str>) { }); } -fn prepare_artifact(pvf: PvfPrepData) -> Result { - panic::catch_unwind(|| { - let blob = match prevalidate(&pvf.code()) { - Err(err) => return Err(PrepareError::Prevalidation(format!("{:?}", err))), - Ok(b) => b, - }; +fn prepare_artifact( + pvf: PvfPrepData, + cpu_time_start: ProcessTime, +) -> Result<(CompiledArtifact, Duration), PrepareError> { + let blob = match prevalidate(&pvf.code()) { + Err(err) => return Err(PrepareError::Prevalidation(format!("{:?}", err))), + Ok(b) => b, + }; - match prepare(blob, &pvf.executor_params()) { - Ok(compiled_artifact) => Ok(CompiledArtifact::new(compiled_artifact)), - Err(err) => Err(PrepareError::Preparation(format!("{:?}", err))), - } - }) - .map_err(|panic_payload| PrepareError::Panic(stringify_panic_payload(panic_payload))) - .and_then(|inner_result| inner_result) -} - -/// Attempt to convert an opaque panic payload to a string. -/// -/// This is a best effort, and is not guaranteed to provide the most accurate value. -fn stringify_panic_payload(payload: Box) -> String { - match payload.downcast::<&'static str>() { - Ok(msg) => msg.to_string(), - Err(payload) => match payload.downcast::() { - Ok(msg) => *msg, - // At least we tried... - Err(_) => "unknown panic payload".to_string(), - }, + match prepare(blob, &pvf.executor_params()) { + Ok(compiled_artifact) => Ok(CompiledArtifact::new(compiled_artifact)), + Err(err) => Err(PrepareError::Preparation(format!("{:?}", err))), } + .map(|artifact| (artifact, cpu_time_start.elapsed())) }