Replace async-std with tokio in PVF subsystem (#6419)

* Replace async-std with tokio in PVF subsystem

* Rework workers to use `select!` instead of a mutex

The improvement in code readability is more important than the thread overhead.

* Remove unnecessary `fuse`

* Add explanation for `expect()`

* Update node/core/pvf/src/worker_common.rs

Co-authored-by: Bastian Köcher <info@kchr.de>

* Update node/core/pvf/src/worker_common.rs

Co-authored-by: Bastian Köcher <info@kchr.de>

* Address some review comments

* Shutdown tokio runtime

* Run cargo fmt

* Add a small note about retries

* Fix up merge

* Rework `cpu_time_monitor_loop` to return when other thread finishes

* Add error string to PrepareError::IoErr variant

* Log when artifacts fail to prepare

* Fix `cpu_time_monitor_loop`; fix test

* Fix text

* Fix a couple of potential minor data races.

First data race was due to logging in the CPU monitor thread even if the
job (other thread) finished. It can technically finish before or after the log.

Maybe best would be to move this log to the `select!`s, where we are guaranteed
to have chosen the timed-out branch, although there would be a bit of
duplication.

Also, it was possible for this thread to complete before we executed
`finished_tx.send` in the other thread, which would trigger an error as the
receiver has already been dropped. And right now, such a spurious error from
`send` would be returned even if the job otherwise succeeded.

* Update Cargo.lock

Co-authored-by: Bastian Köcher <info@kchr.de>
This commit is contained in:
Marcin S
2023-01-10 04:51:13 -05:00
committed by GitHub
parent 166b921912
commit 44fd95661c
18 changed files with 298 additions and 472 deletions
+1 -148
View File
@@ -305,57 +305,6 @@ version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9b34d609dfbaf33d6889b2b7106d3ca345eacad44200913df5ba02bfd31d2ba9" checksum = "9b34d609dfbaf33d6889b2b7106d3ca345eacad44200913df5ba02bfd31d2ba9"
[[package]]
name = "async-attributes"
version = "1.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a3203e79f4dd9bdda415ed03cf14dae5a2bf775c683a00f94e9cd1faf0f596e5"
dependencies = [
"quote",
"syn",
]
[[package]]
name = "async-channel"
version = "1.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2114d64672151c0c5eaa5e131ec84a74f06e1e559830dabba01ca30605d66319"
dependencies = [
"concurrent-queue",
"event-listener",
"futures-core",
]
[[package]]
name = "async-executor"
version = "1.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "871f9bb5e0a22eeb7e8cf16641feb87c9dc67032ccf8ff49e772eb9941d3a965"
dependencies = [
"async-task",
"concurrent-queue",
"fastrand",
"futures-lite",
"once_cell",
"slab",
]
[[package]]
name = "async-global-executor"
version = "2.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9586ec52317f36de58453159d48351bc244bc24ced3effc1fce22f3d48664af6"
dependencies = [
"async-channel",
"async-executor",
"async-io",
"async-mutex",
"blocking",
"futures-lite",
"num_cpus",
"once_cell",
]
[[package]] [[package]]
name = "async-io" name = "async-io"
version = "1.6.0" version = "1.6.0"
@@ -384,65 +333,6 @@ dependencies = [
"event-listener", "event-listener",
] ]
[[package]]
name = "async-mutex"
version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "479db852db25d9dbf6204e6cb6253698f175c15726470f78af0d918e99d6156e"
dependencies = [
"event-listener",
]
[[package]]
name = "async-process"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "83137067e3a2a6a06d67168e49e68a0957d215410473a740cea95a2425c0b7c6"
dependencies = [
"async-io",
"blocking",
"cfg-if",
"event-listener",
"futures-lite",
"libc",
"once_cell",
"signal-hook",
"winapi",
]
[[package]]
name = "async-std"
version = "1.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "62565bb4402e926b29953c785397c6dc0391b7b446e45008b0049eb43cec6f5d"
dependencies = [
"async-attributes",
"async-channel",
"async-global-executor",
"async-io",
"async-lock",
"crossbeam-utils",
"futures-channel",
"futures-core",
"futures-io",
"futures-lite",
"gloo-timers",
"kv-log-macro",
"log",
"memchr",
"once_cell",
"pin-project-lite 0.2.7",
"pin-utils",
"slab",
"wasm-bindgen-futures",
]
[[package]]
name = "async-task"
version = "4.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e91831deabf0d6d7ec49552e489aed63b7456a7a3c46cff62adad428110b0af0"
[[package]] [[package]]
name = "async-trait" name = "async-trait"
version = "0.1.58" version = "0.1.58"
@@ -753,20 +643,6 @@ version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8d696c370c750c948ada61c69a0ee2cbbb9c50b1019ddb86d9317157a99c2cae" checksum = "8d696c370c750c948ada61c69a0ee2cbbb9c50b1019ddb86d9317157a99c2cae"
[[package]]
name = "blocking"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "046e47d4b2d391b1f6f8b407b1deb8dee56c1852ccd868becf2710f601b5f427"
dependencies = [
"async-channel",
"async-task",
"atomic-waker",
"fastrand",
"futures-lite",
"once_cell",
]
[[package]] [[package]]
name = "bounded-vec" name = "bounded-vec"
version = "0.6.0" version = "0.6.0"
@@ -2911,19 +2787,6 @@ dependencies = [
"regex", "regex",
] ]
[[package]]
name = "gloo-timers"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "47204a46aaff920a1ea58b11d03dec6f704287d27561724a4631e450654a891f"
dependencies = [
"futures-channel",
"futures-core",
"js-sys",
"wasm-bindgen",
"web-sys",
]
[[package]] [[package]]
name = "group" name = "group"
version = "0.12.1" version = "0.12.1"
@@ -3665,15 +3528,6 @@ dependencies = [
"sp-weights", "sp-weights",
] ]
[[package]]
name = "kv-log-macro"
version = "1.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0de8b303297635ad57c9f5059fd9cee7a47f8e8daa09df0fcd07dd39fb22977f"
dependencies = [
"log",
]
[[package]] [[package]]
name = "kvdb" name = "kvdb"
version = "0.13.0" version = "0.13.0"
@@ -6930,8 +6784,6 @@ version = "0.9.33"
dependencies = [ dependencies = [
"always-assert", "always-assert",
"assert_matches", "assert_matches",
"async-process",
"async-std",
"cpu-time", "cpu-time",
"futures", "futures",
"futures-timer", "futures-timer",
@@ -6956,6 +6808,7 @@ dependencies = [
"tempfile", "tempfile",
"test-parachain-adder", "test-parachain-adder",
"test-parachain-halt", "test-parachain-halt",
"tokio",
"tracing-gum", "tracing-gum",
] ]
@@ -627,7 +627,8 @@ trait ValidationBackend {
self.validate_candidate(pvf.clone(), timeout, params.encode()).await; self.validate_candidate(pvf.clone(), timeout, params.encode()).await;
// If we get an AmbiguousWorkerDeath error, retry once after a brief delay, on the // If we get an AmbiguousWorkerDeath error, retry once after a brief delay, on the
// assumption that the conditions that caused this error may have been transient. // assumption that the conditions that caused this error may have been transient. Note that
// this error is only a result of execution itself and not of preparation.
if let Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::AmbiguousWorkerDeath)) = if let Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::AmbiguousWorkerDeath)) =
validation_result validation_result
{ {
@@ -676,12 +677,12 @@ impl ValidationBackend for ValidationHost {
async fn precheck_pvf(&mut self, pvf: Pvf) -> Result<Duration, PrepareError> { async fn precheck_pvf(&mut self, pvf: Pvf) -> Result<Duration, PrepareError> {
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
if let Err(_) = self.precheck_pvf(pvf, tx).await { if let Err(err) = self.precheck_pvf(pvf, tx).await {
// Return an IO error if there was an error communicating with the host. // Return an IO error if there was an error communicating with the host.
return Err(PrepareError::IoErr) return Err(PrepareError::IoErr(err))
} }
let precheck_result = rx.await.or(Err(PrepareError::IoErr))?; let precheck_result = rx.await.map_err(|err| PrepareError::IoErr(err.to_string()))?;
precheck_result precheck_result
} }
@@ -1053,5 +1053,5 @@ fn precheck_properly_classifies_outcomes() {
inner(Err(PrepareError::Panic("baz".to_owned())), PreCheckOutcome::Invalid); inner(Err(PrepareError::Panic("baz".to_owned())), PreCheckOutcome::Invalid);
inner(Err(PrepareError::TimedOut), PreCheckOutcome::Failed); inner(Err(PrepareError::TimedOut), PreCheckOutcome::Failed);
inner(Err(PrepareError::IoErr), PreCheckOutcome::Failed); inner(Err(PrepareError::IoErr("fizz".to_owned())), PreCheckOutcome::Failed);
} }
+1 -2
View File
@@ -10,8 +10,6 @@ path = "bin/puppet_worker.rs"
[dependencies] [dependencies]
always-assert = "0.1" always-assert = "0.1"
async-std = { version = "1.11.0", features = ["attributes"] }
async-process = "1.3.0"
assert_matches = "1.4.0" assert_matches = "1.4.0"
cpu-time = "1.0.0" cpu-time = "1.0.0"
futures = "0.3.21" futures = "0.3.21"
@@ -21,6 +19,7 @@ gum = { package = "tracing-gum", path = "../../gum" }
pin-project = "1.0.9" pin-project = "1.0.9"
rand = "0.8.5" rand = "0.8.5"
tempfile = "3.3.0" tempfile = "3.3.0"
tokio = { version = "1.22.0", features = ["fs", "process"] }
rayon = "1.5.1" rayon = "1.5.1"
parity-scale-codec = { version = "3.1.5", default-features = false, features = ["derive"] } parity-scale-codec = { version = "3.1.5", default-features = false, features = ["derive"] }
+8 -11
View File
@@ -16,10 +16,10 @@
use crate::{error::PrepareError, host::PrepareResultSender}; use crate::{error::PrepareError, host::PrepareResultSender};
use always_assert::always; use always_assert::always;
use async_std::path::{Path, PathBuf};
use polkadot_parachain::primitives::ValidationCodeHash; use polkadot_parachain::primitives::ValidationCodeHash;
use std::{ use std::{
collections::HashMap, collections::HashMap,
path::{Path, PathBuf},
time::{Duration, SystemTime}, time::{Duration, SystemTime},
}; };
@@ -136,8 +136,8 @@ impl Artifacts {
pub async fn new(cache_path: &Path) -> Self { pub async fn new(cache_path: &Path) -> Self {
// Make sure that the cache path directory and all its parents are created. // Make sure that the cache path directory and all its parents are created.
// First delete the entire cache. Nodes are long-running so this should populate shortly. // First delete the entire cache. Nodes are long-running so this should populate shortly.
let _ = async_std::fs::remove_dir_all(cache_path).await; let _ = tokio::fs::remove_dir_all(cache_path).await;
let _ = async_std::fs::create_dir_all(cache_path).await; let _ = tokio::fs::create_dir_all(cache_path).await;
Self { artifacts: HashMap::new() } Self { artifacts: HashMap::new() }
} }
@@ -214,9 +214,8 @@ impl Artifacts {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::{ArtifactId, Artifacts}; use super::{ArtifactId, Artifacts};
use async_std::path::Path;
use sp_core::H256; use sp_core::H256;
use std::str::FromStr; use std::{path::Path, str::FromStr};
#[test] #[test]
fn from_file_name() { fn from_file_name() {
@@ -252,11 +251,9 @@ mod tests {
); );
} }
#[test] #[tokio::test]
fn artifacts_removes_cache_on_startup() { async fn artifacts_removes_cache_on_startup() {
let fake_cache_path = async_std::task::block_on(async move { let fake_cache_path = crate::worker_common::tmpfile("test-cache").await.unwrap();
crate::worker_common::tmpfile("test-cache").await.unwrap()
});
let fake_artifact_path = { let fake_artifact_path = {
let mut p = fake_cache_path.clone(); let mut p = fake_cache_path.clone();
p.push("wasmtime_0x1234567890123456789012345678901234567890123456789012345678901234"); p.push("wasmtime_0x1234567890123456789012345678901234567890123456789012345678901234");
@@ -271,7 +268,7 @@ mod tests {
// this should remove it and re-create. // this should remove it and re-create.
let p = &fake_cache_path; let p = &fake_cache_path;
async_std::task::block_on(async { Artifacts::new(p).await }); Artifacts::new(p).await;
assert_eq!(std::fs::read_dir(&fake_cache_path).unwrap().count(), 0); assert_eq!(std::fs::read_dir(&fake_cache_path).unwrap().count(), 0);
+3 -3
View File
@@ -34,7 +34,7 @@ pub enum PrepareError {
TimedOut, TimedOut,
/// An IO error occurred while receiving the result from the worker process. This state is reported by the /// An IO error occurred while receiving the result from the worker process. This state is reported by the
/// validation host (not by the worker). /// validation host (not by the worker).
IoErr, IoErr(String),
/// The temporary file for the artifact could not be created at the given cache path. This state is reported by the /// The temporary file for the artifact could not be created at the given cache path. This state is reported by the
/// validation host (not by the worker). /// validation host (not by the worker).
CreateTmpFileErr(String), CreateTmpFileErr(String),
@@ -54,7 +54,7 @@ impl PrepareError {
use PrepareError::*; use PrepareError::*;
match self { match self {
Prevalidation(_) | Preparation(_) | Panic(_) => true, Prevalidation(_) | Preparation(_) | Panic(_) => true,
TimedOut | IoErr | CreateTmpFileErr(_) | RenameTmpFileErr(_) => false, TimedOut | IoErr(_) | CreateTmpFileErr(_) | RenameTmpFileErr(_) => false,
} }
} }
} }
@@ -67,7 +67,7 @@ impl fmt::Display for PrepareError {
Preparation(err) => write!(f, "preparation: {}", err), Preparation(err) => write!(f, "preparation: {}", err),
Panic(err) => write!(f, "panic: {}", err), Panic(err) => write!(f, "panic: {}", err),
TimedOut => write!(f, "prepare: timeout"), TimedOut => write!(f, "prepare: timeout"),
IoErr => write!(f, "prepare: io error while receiving response"), IoErr(err) => write!(f, "prepare: io error while receiving response: {}", err),
CreateTmpFileErr(err) => write!(f, "prepare: error creating tmp file: {}", err), CreateTmpFileErr(err) => write!(f, "prepare: error creating tmp file: {}", err),
RenameTmpFileErr(err) => write!(f, "prepare: error renaming tmp file: {}", err), RenameTmpFileErr(err) => write!(f, "prepare: error renaming tmp file: {}", err),
} }
+1 -2
View File
@@ -24,7 +24,6 @@ use crate::{
worker_common::{IdleWorker, WorkerHandle}, worker_common::{IdleWorker, WorkerHandle},
InvalidCandidate, ValidationError, LOG_TARGET, InvalidCandidate, ValidationError, LOG_TARGET,
}; };
use async_std::path::PathBuf;
use futures::{ use futures::{
channel::mpsc, channel::mpsc,
future::BoxFuture, future::BoxFuture,
@@ -32,7 +31,7 @@ use futures::{
Future, FutureExt, Future, FutureExt,
}; };
use slotmap::HopSlotMap; use slotmap::HopSlotMap;
use std::{collections::VecDeque, fmt, time::Duration}; use std::{collections::VecDeque, fmt, path::PathBuf, time::Duration};
slotmap::new_key_type! { struct Worker; } slotmap::new_key_type! { struct Worker; }
+51 -50
View File
@@ -19,30 +19,22 @@ use crate::{
executor_intf::Executor, executor_intf::Executor,
worker_common::{ worker_common::{
bytes_to_path, cpu_time_monitor_loop, framed_recv, framed_send, path_to_bytes, bytes_to_path, cpu_time_monitor_loop, framed_recv, framed_send, path_to_bytes,
spawn_with_program_path, worker_event_loop, IdleWorker, JobKind, SpawnErr, WorkerHandle, spawn_with_program_path, worker_event_loop, IdleWorker, SpawnErr, WorkerHandle,
JOB_TIMEOUT_WALL_CLOCK_FACTOR, JOB_TIMEOUT_WALL_CLOCK_FACTOR,
}, },
LOG_TARGET, LOG_TARGET,
}; };
use async_std::{
io,
os::unix::net::UnixStream,
path::{Path, PathBuf},
task,
};
use cpu_time::ProcessTime; use cpu_time::ProcessTime;
use futures::FutureExt; use futures::{pin_mut, select_biased, FutureExt};
use futures_timer::Delay; use futures_timer::Delay;
use parity_scale_codec::{Decode, Encode}; use parity_scale_codec::{Decode, Encode};
use polkadot_parachain::primitives::ValidationResult; use polkadot_parachain::primitives::ValidationResult;
use std::{ use std::{
sync::{ path::{Path, PathBuf},
atomic::{AtomicBool, Ordering}, sync::{mpsc::channel, Arc},
Arc,
},
thread,
time::Duration, time::Duration,
}; };
use tokio::{io, net::UnixStream};
/// Spawns a new worker with the given program path that acts as the worker and the spawn timeout. /// Spawns a new worker with the given program path that acts as the worker and the spawn timeout.
/// ///
@@ -235,10 +227,10 @@ impl Response {
/// The entrypoint that the spawned execute worker should start with. The `socket_path` specifies /// The entrypoint that the spawned execute worker should start with. The `socket_path` specifies
/// the path to the socket used to communicate with the host. /// the path to the socket used to communicate with the host.
pub fn worker_entrypoint(socket_path: &str) { pub fn worker_entrypoint(socket_path: &str) {
worker_event_loop("execute", socket_path, |mut stream| async move { worker_event_loop("execute", socket_path, |rt_handle, mut stream| async move {
let executor = Executor::new().map_err(|e| { let executor = Arc::new(Executor::new().map_err(|e| {
io::Error::new(io::ErrorKind::Other, format!("cannot create executor: {}", e)) io::Error::new(io::ErrorKind::Other, format!("cannot create executor: {}", e))
})?; })?);
loop { loop {
let (artifact_path, params, execution_timeout) = recv_request(&mut stream).await?; let (artifact_path, params, execution_timeout) = recv_request(&mut stream).await?;
@@ -249,52 +241,61 @@ pub fn worker_entrypoint(socket_path: &str) {
artifact_path.display(), artifact_path.display(),
); );
// Create a lock flag. We set it when either thread finishes. // Used to signal to the cpu time monitor thread that it can finish.
let lock = Arc::new(AtomicBool::new(false)); let (finished_tx, finished_rx) = channel::<()>();
let cpu_time_start = ProcessTime::now(); let cpu_time_start = ProcessTime::now();
// Spawn a new thread that runs the CPU time monitor. Continuously wakes up from // Spawn a new thread that runs the CPU time monitor.
// sleeping and then either sleeps for the remaining CPU time, or kills the process if let thread_fut = rt_handle
// we exceed the CPU timeout. .spawn_blocking(move || {
let (stream_2, cpu_time_start_2, execution_timeout_2, lock_2) = cpu_time_monitor_loop(cpu_time_start, execution_timeout, finished_rx)
(stream.clone(), cpu_time_start, execution_timeout, lock.clone()); })
let handle = .fuse();
thread::Builder::new().name("CPU time monitor".into()).spawn(move || { let executor_2 = executor.clone();
task::block_on(async { let execute_fut = rt_handle
cpu_time_monitor_loop( .spawn_blocking(move || {
JobKind::Execute, validate_using_artifact(&artifact_path, &params, executor_2, cpu_time_start)
stream_2, })
cpu_time_start_2, .fuse();
execution_timeout_2,
lock_2,
)
.await;
})
})?;
let response = pin_mut!(thread_fut);
validate_using_artifact(&artifact_path, &params, &executor, cpu_time_start).await; pin_mut!(execute_fut);
let lock_result = let response = select_biased! {
lock.compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed); // If this future is not selected, the join handle is dropped and the thread will
if lock_result.is_err() { // finish in the background.
// The other thread is still sending an error response over the socket. Wait on it join_res = thread_fut => {
// and return. match join_res {
let _ = handle.join(); Ok(Some(cpu_time_elapsed)) => {
// Monitor thread detected timeout and likely already terminated the process, // Log if we exceed the timeout and the other thread hasn't finished.
// nothing to do. gum::warn!(
continue target: LOG_TARGET,
} worker_pid = %std::process::id(),
"execute job took {}ms cpu time, exceeded execute timeout {}ms",
cpu_time_elapsed.as_millis(),
execution_timeout.as_millis(),
);
Response::TimedOut
},
Ok(None) => Response::InternalError("error communicating over finished channel".into()),
Err(e) => Response::InternalError(format!("{}", e)),
}
},
execute_res = execute_fut => {
let _ = finished_tx.send(());
execute_res.unwrap_or_else(|e| Response::InternalError(format!("{}", e)))
},
};
send_response(&mut stream, response).await?; send_response(&mut stream, response).await?;
} }
}); });
} }
async fn validate_using_artifact( fn validate_using_artifact(
artifact_path: &Path, artifact_path: &Path,
params: &[u8], params: &[u8],
executor: &Executor, executor: Arc<Executor>,
cpu_time_start: ProcessTime, cpu_time_start: ProcessTime,
) -> Response { ) -> Response {
let descriptor_bytes = match unsafe { let descriptor_bytes = match unsafe {
+29 -20
View File
@@ -28,7 +28,6 @@ use crate::{
prepare, PrepareResult, Priority, Pvf, ValidationError, LOG_TARGET, prepare, PrepareResult, Priority, Pvf, ValidationError, LOG_TARGET,
}; };
use always_assert::never; use always_assert::never;
use async_std::path::{Path, PathBuf};
use futures::{ use futures::{
channel::{mpsc, oneshot}, channel::{mpsc, oneshot},
Future, FutureExt, SinkExt, StreamExt, Future, FutureExt, SinkExt, StreamExt,
@@ -36,6 +35,7 @@ use futures::{
use polkadot_parachain::primitives::ValidationResult; use polkadot_parachain::primitives::ValidationResult;
use std::{ use std::{
collections::HashMap, collections::HashMap,
path::{Path, PathBuf},
time::{Duration, SystemTime}, time::{Duration, SystemTime},
}; };
@@ -171,7 +171,7 @@ pub struct Config {
impl Config { impl Config {
/// Create a new instance of the configuration. /// Create a new instance of the configuration.
pub fn new(cache_path: std::path::PathBuf, program_path: std::path::PathBuf) -> Self { pub fn new(cache_path: std::path::PathBuf, program_path: std::path::PathBuf) -> Self {
// Do not contaminate the other parts of the codebase with the types from `async_std`. // Do not contaminate the other parts of the codebase with the types from `tokio`.
let cache_path = PathBuf::from(cache_path); let cache_path = PathBuf::from(cache_path);
let program_path = PathBuf::from(program_path); let program_path = PathBuf::from(program_path);
@@ -723,10 +723,19 @@ async fn handle_prepare_done(
*state = match result { *state = match result {
Ok(cpu_time_elapsed) => Ok(cpu_time_elapsed) =>
ArtifactState::Prepared { last_time_needed: SystemTime::now(), cpu_time_elapsed }, ArtifactState::Prepared { last_time_needed: SystemTime::now(), cpu_time_elapsed },
Err(error) => ArtifactState::FailedToProcess { Err(error) => {
last_time_failed: SystemTime::now(), gum::debug!(
num_failures: *num_failures + 1, target: LOG_TARGET,
error, artifact_id = ?artifact_id,
num_failures = ?num_failures,
"Failed to process artifact: {}",
error
);
ArtifactState::FailedToProcess {
last_time_failed: SystemTime::now(),
num_failures: *num_failures + 1,
error,
}
}, },
}; };
@@ -778,7 +787,7 @@ async fn sweeper_task(mut sweeper_rx: mpsc::Receiver<PathBuf>) {
match sweeper_rx.next().await { match sweeper_rx.next().await {
None => break, None => break,
Some(condemned) => { Some(condemned) => {
let result = async_std::fs::remove_file(&condemned).await; let result = tokio::fs::remove_file(&condemned).await;
gum::trace!( gum::trace!(
target: LOG_TARGET, target: LOG_TARGET,
?result, ?result,
@@ -827,7 +836,7 @@ mod tests {
const TEST_EXECUTION_TIMEOUT: Duration = Duration::from_secs(3); const TEST_EXECUTION_TIMEOUT: Duration = Duration::from_secs(3);
#[async_std::test] #[tokio::test]
async fn pulse_test() { async fn pulse_test() {
let pulse = pulse_every(Duration::from_millis(100)); let pulse = pulse_every(Duration::from_millis(100));
futures::pin_mut!(pulse); futures::pin_mut!(pulse);
@@ -1017,19 +1026,19 @@ mod tests {
} }
} }
#[async_std::test] #[tokio::test]
async fn shutdown_on_handle_drop() { async fn shutdown_on_handle_drop() {
let test = Builder::default().build(); let test = Builder::default().build();
let join_handle = async_std::task::spawn(test.run); let join_handle = tokio::task::spawn(test.run);
// Dropping the handle will lead to conclusion of the read part and thus will make the event // Dropping the handle will lead to conclusion of the read part and thus will make the event
// loop to stop, which in turn will resolve the join handle. // loop to stop, which in turn will resolve the join handle.
drop(test.to_host_tx); drop(test.to_host_tx);
join_handle.await; join_handle.await.unwrap();
} }
#[async_std::test] #[tokio::test]
async fn pruning() { async fn pruning() {
let mock_now = SystemTime::now() - Duration::from_millis(1000); let mock_now = SystemTime::now() - Duration::from_millis(1000);
@@ -1059,7 +1068,7 @@ mod tests {
test.poll_ensure_to_sweeper_is_empty().await; test.poll_ensure_to_sweeper_is_empty().await;
} }
#[async_std::test] #[tokio::test]
async fn execute_pvf_requests() { async fn execute_pvf_requests() {
let mut test = Builder::default().build(); let mut test = Builder::default().build();
let mut host = test.host_handle(); let mut host = test.host_handle();
@@ -1159,7 +1168,7 @@ mod tests {
); );
} }
#[async_std::test] #[tokio::test]
async fn precheck_pvf() { async fn precheck_pvf() {
let mut test = Builder::default().build(); let mut test = Builder::default().build();
let mut host = test.host_handle(); let mut host = test.host_handle();
@@ -1214,7 +1223,7 @@ mod tests {
} }
} }
#[async_std::test] #[tokio::test]
async fn test_prepare_done() { async fn test_prepare_done() {
let mut test = Builder::default().build(); let mut test = Builder::default().build();
let mut host = test.host_handle(); let mut host = test.host_handle();
@@ -1301,7 +1310,7 @@ mod tests {
// Test that multiple prechecking requests do not trigger preparation retries if the first one // Test that multiple prechecking requests do not trigger preparation retries if the first one
// failed. // failed.
#[async_std::test] #[tokio::test]
async fn test_precheck_prepare_retry() { async fn test_precheck_prepare_retry() {
let mut test = Builder::default().build(); let mut test = Builder::default().build();
let mut host = test.host_handle(); let mut host = test.host_handle();
@@ -1344,7 +1353,7 @@ mod tests {
// Test that multiple execution requests trigger preparation retries if the first one failed due // Test that multiple execution requests trigger preparation retries if the first one failed due
// to a potentially non-reproducible error. // to a potentially non-reproducible error.
#[async_std::test] #[tokio::test]
async fn test_execute_prepare_retry() { async fn test_execute_prepare_retry() {
let mut test = Builder::default().build(); let mut test = Builder::default().build();
let mut host = test.host_handle(); let mut host = test.host_handle();
@@ -1414,7 +1423,7 @@ mod tests {
// Test that multiple execution requests don't trigger preparation retries if the first one // Test that multiple execution requests don't trigger preparation retries if the first one
// failed due to a reproducible error (e.g. Prevalidation). // failed due to a reproducible error (e.g. Prevalidation).
#[async_std::test] #[tokio::test]
async fn test_execute_prepare_no_retry() { async fn test_execute_prepare_no_retry() {
let mut test = Builder::default().build(); let mut test = Builder::default().build();
let mut host = test.host_handle(); let mut host = test.host_handle();
@@ -1480,7 +1489,7 @@ mod tests {
} }
// Test that multiple heads-up requests trigger preparation retries if the first one failed. // Test that multiple heads-up requests trigger preparation retries if the first one failed.
#[async_std::test] #[tokio::test]
async fn test_heads_up_prepare_retry() { async fn test_heads_up_prepare_retry() {
let mut test = Builder::default().build(); let mut test = Builder::default().build();
let mut host = test.host_handle(); let mut host = test.host_handle();
@@ -1521,7 +1530,7 @@ mod tests {
); );
} }
#[async_std::test] #[tokio::test]
async fn cancellation() { async fn cancellation() {
let mut test = Builder::default().build(); let mut test = Builder::default().build();
let mut host = test.host_handle(); let mut host = test.host_handle();
+1
View File
@@ -113,6 +113,7 @@ pub use pvf::Pvf;
pub use host::{start, Config, ValidationHost}; pub use host::{start, Config, ValidationHost};
pub use metrics::Metrics; pub use metrics::Metrics;
pub use worker_common::JOB_TIMEOUT_WALL_CLOCK_FACTOR;
pub use execute::worker_entrypoint as execute_worker_entrypoint; pub use execute::worker_entrypoint as execute_worker_entrypoint;
pub use prepare::worker_entrypoint as prepare_worker_entrypoint; pub use prepare::worker_entrypoint as prepare_worker_entrypoint;
+9 -4
View File
@@ -22,12 +22,17 @@ use crate::{
LOG_TARGET, LOG_TARGET,
}; };
use always_assert::never; use always_assert::never;
use async_std::path::{Path, PathBuf};
use futures::{ use futures::{
channel::mpsc, future::BoxFuture, stream::FuturesUnordered, Future, FutureExt, StreamExt, channel::mpsc, future::BoxFuture, stream::FuturesUnordered, Future, FutureExt, StreamExt,
}; };
use slotmap::HopSlotMap; use slotmap::HopSlotMap;
use std::{fmt, sync::Arc, task::Poll, time::Duration}; use std::{
fmt,
path::{Path, PathBuf},
sync::Arc,
task::Poll,
time::Duration,
};
slotmap::new_key_type! { pub struct Worker; } slotmap::new_key_type! { pub struct Worker; }
@@ -322,14 +327,14 @@ fn handle_mux(
Ok(()) Ok(())
}, },
Outcome::IoErr => { Outcome::IoErr(err) => {
if attempt_retire(metrics, spawned, worker) { if attempt_retire(metrics, spawned, worker) {
reply( reply(
from_pool, from_pool,
FromPool::Concluded { FromPool::Concluded {
worker, worker,
rip: true, rip: true,
result: Err(PrepareError::IoErr), result: Err(PrepareError::IoErr(err)),
}, },
)?; )?;
} }
+8 -8
View File
@@ -19,10 +19,10 @@
use super::pool::{self, Worker}; use super::pool::{self, Worker};
use crate::{artifacts::ArtifactId, metrics::Metrics, PrepareResult, Priority, Pvf, LOG_TARGET}; use crate::{artifacts::ArtifactId, metrics::Metrics, PrepareResult, Priority, Pvf, LOG_TARGET};
use always_assert::{always, never}; use always_assert::{always, never};
use async_std::path::PathBuf;
use futures::{channel::mpsc, stream::StreamExt as _, Future, SinkExt}; use futures::{channel::mpsc, stream::StreamExt as _, Future, SinkExt};
use std::{ use std::{
collections::{HashMap, VecDeque}, collections::{HashMap, VecDeque},
path::PathBuf,
time::Duration, time::Duration,
}; };
@@ -603,7 +603,7 @@ mod tests {
} }
} }
#[async_std::test] #[tokio::test]
async fn properly_concludes() { async fn properly_concludes() {
let mut test = Test::new(2, 2); let mut test = Test::new(2, 2);
@@ -625,7 +625,7 @@ mod tests {
assert_eq!(test.poll_and_recv_from_queue().await.artifact_id, pvf(1).as_artifact_id()); assert_eq!(test.poll_and_recv_from_queue().await.artifact_id, pvf(1).as_artifact_id());
} }
#[async_std::test] #[tokio::test]
async fn dont_spawn_over_soft_limit_unless_critical() { async fn dont_spawn_over_soft_limit_unless_critical() {
let mut test = Test::new(2, 3); let mut test = Test::new(2, 3);
let preparation_timeout = PRECHECK_PREPARATION_TIMEOUT; let preparation_timeout = PRECHECK_PREPARATION_TIMEOUT;
@@ -669,7 +669,7 @@ mod tests {
assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn); assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn);
} }
#[async_std::test] #[tokio::test]
async fn cull_unwanted() { async fn cull_unwanted() {
let mut test = Test::new(1, 2); let mut test = Test::new(1, 2);
let preparation_timeout = PRECHECK_PREPARATION_TIMEOUT; let preparation_timeout = PRECHECK_PREPARATION_TIMEOUT;
@@ -707,7 +707,7 @@ mod tests {
assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Kill(w1)); assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Kill(w1));
} }
#[async_std::test] #[tokio::test]
async fn worker_mass_die_out_doesnt_stall_queue() { async fn worker_mass_die_out_doesnt_stall_queue() {
let mut test = Test::new(2, 2); let mut test = Test::new(2, 2);
@@ -741,7 +741,7 @@ mod tests {
assert_eq!(test.poll_and_recv_from_queue().await.artifact_id, pvf(1).as_artifact_id()); assert_eq!(test.poll_and_recv_from_queue().await.artifact_id, pvf(1).as_artifact_id());
} }
#[async_std::test] #[tokio::test]
async fn doesnt_resurrect_ripped_worker_if_no_work() { async fn doesnt_resurrect_ripped_worker_if_no_work() {
let mut test = Test::new(2, 2); let mut test = Test::new(2, 2);
@@ -761,12 +761,12 @@ mod tests {
test.send_from_pool(pool::FromPool::Concluded { test.send_from_pool(pool::FromPool::Concluded {
worker: w1, worker: w1,
rip: true, rip: true,
result: Err(PrepareError::IoErr), result: Err(PrepareError::IoErr("test".into())),
}); });
test.poll_ensure_to_pool_is_empty().await; test.poll_ensure_to_pool_is_empty().await;
} }
#[async_std::test] #[tokio::test]
async fn rip_for_start_work() { async fn rip_for_start_work() {
let mut test = Test::new(2, 2); let mut test = Test::new(2, 2);
+75 -83
View File
@@ -19,29 +19,22 @@ use crate::{
error::{PrepareError, PrepareResult}, error::{PrepareError, PrepareResult},
worker_common::{ worker_common::{
bytes_to_path, cpu_time_monitor_loop, framed_recv, framed_send, path_to_bytes, bytes_to_path, cpu_time_monitor_loop, framed_recv, framed_send, path_to_bytes,
spawn_with_program_path, tmpfile_in, worker_event_loop, IdleWorker, JobKind, SpawnErr, spawn_with_program_path, tmpfile_in, worker_event_loop, IdleWorker, SpawnErr, WorkerHandle,
WorkerHandle, JOB_TIMEOUT_WALL_CLOCK_FACTOR, JOB_TIMEOUT_WALL_CLOCK_FACTOR,
}, },
LOG_TARGET, LOG_TARGET,
}; };
use async_std::{
io,
os::unix::net::UnixStream,
path::{Path, PathBuf},
task,
};
use cpu_time::ProcessTime; use cpu_time::ProcessTime;
use futures::{pin_mut, select_biased, FutureExt};
use parity_scale_codec::{Decode, Encode}; use parity_scale_codec::{Decode, Encode};
use sp_core::hexdisplay::HexDisplay; use sp_core::hexdisplay::HexDisplay;
use std::{ use std::{
panic, panic,
sync::{ path::{Path, PathBuf},
atomic::{AtomicBool, Ordering}, sync::{mpsc::channel, Arc},
Arc,
},
thread,
time::Duration, time::Duration,
}; };
use tokio::{io, net::UnixStream};
/// Spawns a new worker with the given program path that acts as the worker and the spawn timeout. /// Spawns a new worker with the given program path that acts as the worker and the spawn timeout.
/// ///
@@ -71,7 +64,7 @@ pub enum Outcome {
/// An IO error occurred while receiving the result from the worker process. /// An IO error occurred while receiving the result from the worker process.
/// ///
/// This doesn't return an idle worker instance, thus this worker is no longer usable. /// This doesn't return an idle worker instance, thus this worker is no longer usable.
IoErr, IoErr(String),
} }
/// Given the idle token of a worker and parameters of work, communicates with the worker and /// Given the idle token of a worker and parameters of work, communicates with the worker and
@@ -86,7 +79,7 @@ pub async fn start_work(
artifact_path: PathBuf, artifact_path: PathBuf,
preparation_timeout: Duration, preparation_timeout: Duration,
) -> Outcome { ) -> Outcome {
let IdleWorker { mut stream, pid } = worker; let IdleWorker { stream, pid } = worker;
gum::debug!( gum::debug!(
target: LOG_TARGET, target: LOG_TARGET,
@@ -95,7 +88,7 @@ pub async fn start_work(
artifact_path.display(), artifact_path.display(),
); );
with_tmp_file(stream.clone(), pid, cache_path, |tmp_file| async move { with_tmp_file(stream, pid, cache_path, |tmp_file, mut stream| async move {
if let Err(err) = send_request(&mut stream, code, &tmp_file, preparation_timeout).await { if let Err(err) = send_request(&mut stream, code, &tmp_file, preparation_timeout).await {
gum::warn!( gum::warn!(
target: LOG_TARGET, target: LOG_TARGET,
@@ -116,7 +109,7 @@ pub async fn start_work(
// load, but the CPU resources of the child can only be measured from the parent after the // load, but the CPU resources of the child can only be measured from the parent after the
// child process terminates. // child process terminates.
let timeout = preparation_timeout * JOB_TIMEOUT_WALL_CLOCK_FACTOR; let timeout = preparation_timeout * JOB_TIMEOUT_WALL_CLOCK_FACTOR;
let result = async_std::future::timeout(timeout, framed_recv(&mut stream)).await; let result = tokio::time::timeout(timeout, framed_recv(&mut stream)).await;
match result { match result {
// Received bytes from worker within the time limit. // Received bytes from worker within the time limit.
@@ -138,7 +131,7 @@ pub async fn start_work(
"failed to recv a prepare response: {:?}", "failed to recv a prepare response: {:?}",
err, err,
); );
Outcome::IoErr Outcome::IoErr(err.to_string())
}, },
Err(_) => { Err(_) => {
// Timed out here on the host. // Timed out here on the host.
@@ -169,7 +162,7 @@ async fn handle_response_bytes(
// By convention we expect encoded `PrepareResult`. // By convention we expect encoded `PrepareResult`.
let result = match PrepareResult::decode(&mut response_bytes.as_slice()) { let result = match PrepareResult::decode(&mut response_bytes.as_slice()) {
Ok(result) => result, Ok(result) => result,
Err(_) => { Err(err) => {
// We received invalid bytes from the worker. // We received invalid bytes from the worker.
let bound_bytes = &response_bytes[..response_bytes.len().min(4)]; let bound_bytes = &response_bytes[..response_bytes.len().min(4)];
gum::warn!( gum::warn!(
@@ -178,7 +171,7 @@ async fn handle_response_bytes(
"received unexpected response from the prepare worker: {}", "received unexpected response from the prepare worker: {}",
HexDisplay::from(&bound_bytes), HexDisplay::from(&bound_bytes),
); );
return Outcome::IoErr return Outcome::IoErr(err.to_string())
}, },
}; };
let cpu_time_elapsed = match result { let cpu_time_elapsed = match result {
@@ -198,11 +191,6 @@ async fn handle_response_bytes(
preparation_timeout.as_millis(), preparation_timeout.as_millis(),
tmp_file.display(), tmp_file.display(),
); );
// Return a timeout error.
//
// NOTE: The artifact exists, but is located in a temporary file which
// will be cleared by `with_tmp_file`.
return Outcome::TimedOut return Outcome::TimedOut
} }
@@ -214,8 +202,8 @@ async fn handle_response_bytes(
artifact_path.display(), artifact_path.display(),
); );
match async_std::fs::rename(&tmp_file, &artifact_path).await { match tokio::fs::rename(&tmp_file, &artifact_path).await {
Ok(_) => Outcome::Concluded { worker, result }, Ok(()) => Outcome::Concluded { worker, result },
Err(err) => { Err(err) => {
gum::warn!( gum::warn!(
target: LOG_TARGET, target: LOG_TARGET,
@@ -237,7 +225,7 @@ async fn handle_response_bytes(
async fn with_tmp_file<F, Fut>(stream: UnixStream, pid: u32, cache_path: &Path, f: F) -> Outcome async fn with_tmp_file<F, Fut>(stream: UnixStream, pid: u32, cache_path: &Path, f: F) -> Outcome
where where
Fut: futures::Future<Output = Outcome>, Fut: futures::Future<Output = Outcome>,
F: FnOnce(PathBuf) -> Fut, F: FnOnce(PathBuf, UnixStream) -> Fut,
{ {
let tmp_file = match tmpfile_in("prepare-artifact-", cache_path).await { let tmp_file = match tmpfile_in("prepare-artifact-", cache_path).await {
Ok(f) => f, Ok(f) => f,
@@ -255,14 +243,14 @@ where
}, },
}; };
let outcome = f(tmp_file.clone()).await; let outcome = f(tmp_file.clone(), stream).await;
// The function called above is expected to move `tmp_file` to a new location upon success. However, // The function called above is expected to move `tmp_file` to a new location upon success. However,
// the function may as well fail and in that case we should remove the tmp file here. // the function may as well fail and in that case we should remove the tmp file here.
// //
// In any case, we try to remove the file here so that there are no leftovers. We only report // In any case, we try to remove the file here so that there are no leftovers. We only report
// errors that are different from the `NotFound`. // errors that are different from the `NotFound`.
match async_std::fs::remove_file(tmp_file).await { match tokio::fs::remove_file(tmp_file).await {
Ok(()) => (), Ok(()) => (),
Err(err) if err.kind() == std::io::ErrorKind::NotFound => (), Err(err) if err.kind() == std::io::ErrorKind::NotFound => (),
Err(err) => { Err(err) => {
@@ -312,74 +300,78 @@ async fn recv_request(stream: &mut UnixStream) -> io::Result<(Vec<u8>, PathBuf,
/// The entrypoint that the spawned prepare worker should start with. The `socket_path` specifies /// The entrypoint that the spawned prepare worker should start with. The `socket_path` specifies
/// the path to the socket used to communicate with the host. /// the path to the socket used to communicate with the host.
pub fn worker_entrypoint(socket_path: &str) { pub fn worker_entrypoint(socket_path: &str) {
worker_event_loop("prepare", socket_path, |mut stream| async move { worker_event_loop("prepare", socket_path, |rt_handle, mut stream| async move {
loop { loop {
let (code, dest, preparation_timeout) = recv_request(&mut stream).await?; let (code, dest, preparation_timeout) = recv_request(&mut stream).await?;
gum::debug!( gum::debug!(
target: LOG_TARGET, target: LOG_TARGET,
worker_pid = %std::process::id(), worker_pid = %std::process::id(),
"worker: preparing artifact", "worker: preparing artifact",
); );
// Create a lock flag. We set it when either thread finishes. // Used to signal to the cpu time monitor thread that it can finish.
let lock = Arc::new(AtomicBool::new(false)); let (finished_tx, finished_rx) = channel::<()>();
let cpu_time_start = ProcessTime::now(); let cpu_time_start = ProcessTime::now();
// Spawn a new thread that runs the CPU time monitor. Continuously wakes up from // Spawn a new thread that runs the CPU time monitor.
// sleeping and then either sleeps for the remaining CPU time, or kills the process if let thread_fut = rt_handle
// we exceed the CPU timeout. .spawn_blocking(move || {
let (stream_2, cpu_time_start_2, preparation_timeout_2, lock_2) = cpu_time_monitor_loop(cpu_time_start, preparation_timeout, finished_rx)
(stream.clone(), cpu_time_start, preparation_timeout, lock.clone()); })
let handle = .fuse();
thread::Builder::new().name("CPU time monitor".into()).spawn(move || { let prepare_fut = rt_handle.spawn_blocking(move || prepare_artifact(&code)).fuse();
task::block_on(async {
cpu_time_monitor_loop(
JobKind::Prepare,
stream_2,
cpu_time_start_2,
preparation_timeout_2,
lock_2,
)
.await;
})
})?;
// Prepares the artifact in a separate thread. pin_mut!(thread_fut);
let result = match prepare_artifact(&code).await { pin_mut!(prepare_fut);
Err(err) => {
// Serialized error will be written into the socket.
Err(err)
},
Ok(compiled_artifact) => {
let cpu_time_elapsed = cpu_time_start.elapsed();
let lock_result = let result = select_biased! {
lock.compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed); // If this future is not selected, the join handle is dropped and the thread will
if lock_result.is_err() { // finish in the background.
// The other thread is still sending an error response over the socket. Wait on it and join_res = thread_fut => {
// return. match join_res {
let _ = handle.join(); Ok(Some(cpu_time_elapsed)) => {
// Monitor thread detected timeout and likely already terminated the // Log if we exceed the timeout and the other thread hasn't finished.
// process, nothing to do. gum::warn!(
continue target: LOG_TARGET,
worker_pid = %std::process::id(),
"prepare job took {}ms cpu time, exceeded prepare timeout {}ms",
cpu_time_elapsed.as_millis(),
preparation_timeout.as_millis(),
);
Err(PrepareError::TimedOut)
},
Ok(None) => Err(PrepareError::IoErr("error communicating over finished channel".into())),
Err(err) => Err(PrepareError::IoErr(err.to_string())),
} }
},
compilation_res = prepare_fut => {
let cpu_time_elapsed = cpu_time_start.elapsed();
let _ = finished_tx.send(());
// Write the serialized artifact into a temp file. match compilation_res.unwrap_or_else(|err| Err(PrepareError::IoErr(err.to_string()))) {
// Err(err) => {
// PVF host only keeps artifacts statuses in its memory, successfully compiled code gets stored // Serialized error will be written into the socket.
// on the disk (and consequently deserialized by execute-workers). The prepare worker is only Err(err)
// required to send `Ok` to the pool to indicate the success. },
Ok(compiled_artifact) => {
// Write the serialized artifact into a temp file.
//
// PVF host only keeps artifacts statuses in its memory, successfully
// compiled code gets stored on the disk (and consequently deserialized
// by execute-workers). The prepare worker is only required to send `Ok`
// to the pool to indicate the success.
gum::debug!( gum::debug!(
target: LOG_TARGET, target: LOG_TARGET,
worker_pid = %std::process::id(), worker_pid = %std::process::id(),
"worker: writing artifact to {}", "worker: writing artifact to {}",
dest.display(), dest.display(),
); );
async_std::fs::write(&dest, &compiled_artifact).await?; tokio::fs::write(&dest, &compiled_artifact).await?;
Ok(cpu_time_elapsed) Ok(cpu_time_elapsed)
},
}
}, },
}; };
@@ -388,7 +380,7 @@ pub fn worker_entrypoint(socket_path: &str) {
}); });
} }
async fn prepare_artifact(code: &[u8]) -> Result<CompiledArtifact, PrepareError> { fn prepare_artifact(code: &[u8]) -> Result<CompiledArtifact, PrepareError> {
panic::catch_unwind(|| { panic::catch_unwind(|| {
let blob = match crate::executor_intf::prevalidate(code) { let blob = match crate::executor_intf::prevalidate(code) {
Err(err) => return Err(PrepareError::Prevalidation(format!("{:?}", err))), Err(err) => return Err(PrepareError::Prevalidation(format!("{:?}", err))),
+77 -122
View File
@@ -16,31 +16,26 @@
//! Common logic for implementation of worker processes. //! Common logic for implementation of worker processes.
use crate::{execute::ExecuteResponse, PrepareError, LOG_TARGET}; use crate::LOG_TARGET;
use async_std::{
io,
net::Shutdown,
os::unix::net::{UnixListener, UnixStream},
path::{Path, PathBuf},
};
use cpu_time::ProcessTime; use cpu_time::ProcessTime;
use futures::{ use futures::{never::Never, FutureExt as _};
never::Never, AsyncRead, AsyncReadExt as _, AsyncWrite, AsyncWriteExt as _, FutureExt as _,
};
use futures_timer::Delay; use futures_timer::Delay;
use parity_scale_codec::Encode;
use pin_project::pin_project; use pin_project::pin_project;
use rand::Rng; use rand::Rng;
use std::{ use std::{
fmt, mem, fmt, mem,
path::{Path, PathBuf},
pin::Pin, pin::Pin,
sync::{ sync::mpsc::{Receiver, RecvTimeoutError},
atomic::{AtomicBool, Ordering},
Arc,
},
task::{Context, Poll}, task::{Context, Poll},
time::Duration, time::Duration,
}; };
use tokio::{
io::{self, AsyncRead, AsyncReadExt as _, AsyncWrite, AsyncWriteExt as _, ReadBuf},
net::{UnixListener, UnixStream},
process,
runtime::{Handle, Runtime},
};
/// A multiple of the job timeout (in CPU time) for which we are willing to wait on the host (in /// A multiple of the job timeout (in CPU time) for which we are willing to wait on the host (in
/// wall clock time). This is lenient because CPU time may go slower than wall clock time. /// wall clock time). This is lenient because CPU time may go slower than wall clock time.
@@ -50,21 +45,6 @@ pub const JOB_TIMEOUT_WALL_CLOCK_FACTOR: u32 = 4;
/// child process. /// child process.
pub const JOB_TIMEOUT_OVERHEAD: Duration = Duration::from_millis(50); pub const JOB_TIMEOUT_OVERHEAD: Duration = Duration::from_millis(50);
#[derive(Copy, Clone, Debug)]
pub enum JobKind {
Prepare,
Execute,
}
impl fmt::Display for JobKind {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Prepare => write!(f, "prepare"),
Self::Execute => write!(f, "execute"),
}
}
}
/// This is publicly exposed only for integration tests. /// This is publicly exposed only for integration tests.
#[doc(hidden)] #[doc(hidden)]
pub async fn spawn_with_program_path( pub async fn spawn_with_program_path(
@@ -77,7 +57,7 @@ pub async fn spawn_with_program_path(
with_transient_socket_path(debug_id, |socket_path| { with_transient_socket_path(debug_id, |socket_path| {
let socket_path = socket_path.to_owned(); let socket_path = socket_path.to_owned();
async move { async move {
let listener = UnixListener::bind(&socket_path).await.map_err(|err| { let listener = UnixListener::bind(&socket_path).map_err(|err| {
gum::warn!( gum::warn!(
target: LOG_TARGET, target: LOG_TARGET,
%debug_id, %debug_id,
@@ -132,7 +112,7 @@ where
// Best effort to remove the socket file. Under normal circumstances the socket will be removed // Best effort to remove the socket file. Under normal circumstances the socket will be removed
// by the worker. We make sure that it is removed here, just in case a failed rendezvous. // by the worker. We make sure that it is removed here, just in case a failed rendezvous.
let _ = async_std::fs::remove_file(socket_path).await; let _ = tokio::fs::remove_file(socket_path).await;
result result
} }
@@ -163,7 +143,7 @@ pub async fn tmpfile_in(prefix: &str, dir: &Path) -> io::Result<PathBuf> {
for _ in 0..NUM_RETRIES { for _ in 0..NUM_RETRIES {
let candidate_path = tmppath(prefix, dir); let candidate_path = tmppath(prefix, dir);
if !candidate_path.exists().await { if !candidate_path.exists() {
return Ok(candidate_path) return Ok(candidate_path)
} }
} }
@@ -179,28 +159,22 @@ pub async fn tmpfile(prefix: &str) -> io::Result<PathBuf> {
pub fn worker_event_loop<F, Fut>(debug_id: &'static str, socket_path: &str, mut event_loop: F) pub fn worker_event_loop<F, Fut>(debug_id: &'static str, socket_path: &str, mut event_loop: F)
where where
F: FnMut(UnixStream) -> Fut, F: FnMut(Handle, UnixStream) -> Fut,
Fut: futures::Future<Output = io::Result<Never>>, Fut: futures::Future<Output = io::Result<Never>>,
{ {
let err = async_std::task::block_on::<_, io::Result<Never>>(async move { let rt = Runtime::new().expect("Creates tokio runtime. If this panics the worker will die and the host will detect that and deal with it.");
let stream = UnixStream::connect(socket_path).await?; let handle = rt.handle();
let _ = async_std::fs::remove_file(socket_path).await; let err = rt
.block_on(async move {
let stream = UnixStream::connect(socket_path).await?;
let _ = tokio::fs::remove_file(socket_path).await;
let result = event_loop(stream.clone()).await; let result = event_loop(handle.clone(), stream).await;
if let Err(err) = stream.shutdown(Shutdown::Both) { result
// Log, but don't return error here, as it may shadow any error from `event_loop`. })
gum::debug!( // It's never `Ok` because it's `Ok(Never)`.
target: LOG_TARGET, .unwrap_err();
"error shutting down stream at path {}: {}",
socket_path,
err
);
}
result
})
.unwrap_err(); // it's never `Ok` because it's `Ok(Never)`
gum::debug!( gum::debug!(
target: LOG_TARGET, target: LOG_TARGET,
@@ -209,74 +183,45 @@ where
debug_id, debug_id,
err, err,
); );
// We don't want tokio to wait for the tasks to finish. We want to bring down the worker as fast
// as possible and not wait for stalled validation to finish. This isn't strictly necessary now,
// but may be in the future.
rt.shutdown_background();
} }
/// Loop that runs in the CPU time monitor thread on prepare and execute jobs. Continuously wakes up /// Loop that runs in the CPU time monitor thread on prepare and execute jobs. Continuously wakes up
/// from sleeping and then either sleeps for the remaining CPU time, or sends back a timeout error /// and then either blocks for the remaining CPU time, or returns if we exceed the CPU timeout.
/// if we exceed the CPU timeout.
/// ///
/// NOTE: If the job completes and this thread is still sleeping, it will continue sleeping in the /// Returning `Some` indicates that we should send a `TimedOut` error to the host. Will return
/// background. When it wakes, it will see that the flag has been set and return. /// `None` if the other thread finishes first, without us timing out.
pub async fn cpu_time_monitor_loop( ///
job_kind: JobKind, /// NOTE: Sending a `TimedOut` error to the host will cause the worker, whether preparation or
mut stream: UnixStream, /// execution, to be killed by the host. We do not kill the process here because it would interfere
/// with the proper handling of this error.
pub fn cpu_time_monitor_loop(
cpu_time_start: ProcessTime, cpu_time_start: ProcessTime,
timeout: Duration, timeout: Duration,
lock: Arc<AtomicBool>, finished_rx: Receiver<()>,
) { ) -> Option<Duration> {
loop { loop {
let cpu_time_elapsed = cpu_time_start.elapsed(); let cpu_time_elapsed = cpu_time_start.elapsed();
// Treat the timeout as CPU time, which is less subject to variance due to load. // Treat the timeout as CPU time, which is less subject to variance due to load.
if cpu_time_elapsed > timeout { if cpu_time_elapsed <= timeout {
let result = lock.compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed); // Sleep for the remaining CPU time, plus a bit to account for overhead. Note that the sleep
if result.is_err() { // is wall clock time. The CPU clock may be slower than the wall clock.
// Hit the job-completed case first, return from this thread. let sleep_interval = timeout.saturating_sub(cpu_time_elapsed) + JOB_TIMEOUT_OVERHEAD;
return match finished_rx.recv_timeout(sleep_interval) {
// Received finish signal.
Ok(()) => return None,
// Timed out, restart loop.
Err(RecvTimeoutError::Timeout) => continue,
Err(RecvTimeoutError::Disconnected) => return None,
} }
// Log if we exceed the timeout.
gum::warn!(
target: LOG_TARGET,
worker_pid = %std::process::id(),
"{job_kind} job took {}ms cpu time, exceeded {job_kind} timeout {}ms",
cpu_time_elapsed.as_millis(),
timeout.as_millis(),
);
// Send back a `TimedOut` error.
//
// NOTE: This will cause the worker, whether preparation or execution, to be killed by
// the host. We do not kill the process here because it would interfere with the proper
// handling of this error.
let encoded_result = match job_kind {
JobKind::Prepare => {
let result: Result<(), PrepareError> = Err(PrepareError::TimedOut);
result.encode()
},
JobKind::Execute => {
let result = ExecuteResponse::TimedOut;
result.encode()
},
};
// If we error here there is nothing we can do apart from log it. The receiving side
// will just have to time out.
if let Err(err) = framed_send(&mut stream, encoded_result.as_slice()).await {
gum::warn!(
target: LOG_TARGET,
worker_pid = %std::process::id(),
"{job_kind} worker -> pvf host: error sending result over the socket: {:?}",
err
);
}
return
} }
// Sleep for the remaining CPU time, plus a bit to account for overhead. Note that the sleep return Some(cpu_time_elapsed)
// is wall clock time. The CPU clock may be slower than the wall clock.
let sleep_interval = timeout - cpu_time_elapsed + JOB_TIMEOUT_OVERHEAD;
std::thread::sleep(sleep_interval);
} }
} }
@@ -317,9 +262,10 @@ pub enum SpawnErr {
/// This future relies on the fact that a child process's stdout `fd` is closed upon it's termination. /// This future relies on the fact that a child process's stdout `fd` is closed upon it's termination.
#[pin_project] #[pin_project]
pub struct WorkerHandle { pub struct WorkerHandle {
child: async_process::Child, child: process::Child,
child_id: u32,
#[pin] #[pin]
stdout: async_process::ChildStdout, stdout: process::ChildStdout,
program: PathBuf, program: PathBuf,
drop_box: Box<[u8]>, drop_box: Box<[u8]>,
} }
@@ -330,13 +276,16 @@ impl WorkerHandle {
extra_args: &[&str], extra_args: &[&str],
socket_path: impl AsRef<Path>, socket_path: impl AsRef<Path>,
) -> io::Result<Self> { ) -> io::Result<Self> {
let mut child = async_process::Command::new(program.as_ref()) let mut child = process::Command::new(program.as_ref())
.args(extra_args) .args(extra_args)
.arg(socket_path.as_ref().as_os_str()) .arg(socket_path.as_ref().as_os_str())
.stdout(async_process::Stdio::piped()) .stdout(std::process::Stdio::piped())
.kill_on_drop(true) .kill_on_drop(true)
.spawn()?; .spawn()?;
let child_id = child
.id()
.ok_or(io::Error::new(io::ErrorKind::Other, "could not get id of spawned process"))?;
let stdout = child let stdout = child
.stdout .stdout
.take() .take()
@@ -344,6 +293,7 @@ impl WorkerHandle {
Ok(WorkerHandle { Ok(WorkerHandle {
child, child,
child_id,
stdout, stdout,
program: program.as_ref().to_path_buf(), program: program.as_ref().to_path_buf(),
// We don't expect the bytes to be ever read. But in case we do, we should not use a buffer // We don't expect the bytes to be ever read. But in case we do, we should not use a buffer
@@ -361,7 +311,7 @@ impl WorkerHandle {
/// Returns the process id of this worker. /// Returns the process id of this worker.
pub fn id(&self) -> u32 { pub fn id(&self) -> u32 {
self.child.id() self.child_id
} }
} }
@@ -370,15 +320,20 @@ impl futures::Future for WorkerHandle {
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let me = self.project(); let me = self.project();
match futures::ready!(AsyncRead::poll_read(me.stdout, cx, &mut *me.drop_box)) { // Create a `ReadBuf` here instead of storing it in `WorkerHandle` to avoid a lifetime
Ok(0) => { // parameter on `WorkerHandle`. Creating the `ReadBuf` is fairly cheap.
// 0 means `EOF` means the child was terminated. Resolve. let mut read_buf = ReadBuf::new(&mut *me.drop_box);
Poll::Ready(()) match futures::ready!(AsyncRead::poll_read(me.stdout, cx, &mut read_buf)) {
}, Ok(()) => {
Ok(_bytes_read) => { if read_buf.filled().len() > 0 {
// weird, we've read something. Pretend that never happened and reschedule ourselves. // weird, we've read something. Pretend that never happened and reschedule
cx.waker().wake_by_ref(); // ourselves.
Poll::Pending cx.waker().wake_by_ref();
Poll::Pending
} else {
// Nothing read means `EOF` means the child was terminated. Resolve.
Poll::Ready(())
}
}, },
Err(err) => { Err(err) => {
// The implementation is guaranteed to not to return `WouldBlock` and Interrupted. This // The implementation is guaranteed to not to return `WouldBlock` and Interrupted. This
@@ -387,8 +342,8 @@ impl futures::Future for WorkerHandle {
// Log the status code. // Log the status code.
gum::debug!( gum::debug!(
target: LOG_TARGET, target: LOG_TARGET,
worker_pid = %me.child.id(), worker_pid = %me.child_id,
status_code = ?me.child.try_status(), status_code = ?me.child.try_wait().ok().flatten().map(|c| c.to_string()),
"pvf worker ({}): {:?}", "pvf worker ({}): {:?}",
me.program.display(), me.program.display(),
err, err,
+4 -4
View File
@@ -22,7 +22,7 @@ use polkadot_parachain::primitives::{
ValidationParams, ValidationParams,
}; };
#[async_std::test] #[tokio::test]
async fn execute_good_block_on_parent() { async fn execute_good_block_on_parent() {
let parent_head = HeadData { number: 0, parent_hash: [0; 32], post_state: hash_state(0) }; let parent_head = HeadData { number: 0, parent_hash: [0; 32], post_state: hash_state(0) };
@@ -50,7 +50,7 @@ async fn execute_good_block_on_parent() {
assert_eq!(new_head.post_state, hash_state(512)); assert_eq!(new_head.post_state, hash_state(512));
} }
#[async_std::test] #[tokio::test]
async fn execute_good_chain_on_parent() { async fn execute_good_chain_on_parent() {
let mut number = 0; let mut number = 0;
let mut parent_hash = [0; 32]; let mut parent_hash = [0; 32];
@@ -88,7 +88,7 @@ async fn execute_good_chain_on_parent() {
} }
} }
#[async_std::test] #[tokio::test]
async fn execute_bad_block_on_parent() { async fn execute_bad_block_on_parent() {
let parent_head = HeadData { number: 0, parent_hash: [0; 32], post_state: hash_state(0) }; let parent_head = HeadData { number: 0, parent_hash: [0; 32], post_state: hash_state(0) };
@@ -113,7 +113,7 @@ async fn execute_bad_block_on_parent() {
.unwrap_err(); .unwrap_err();
} }
#[async_std::test] #[tokio::test]
async fn stress_spawn() { async fn stress_spawn() {
let host = std::sync::Arc::new(TestHost::new()); let host = std::sync::Arc::new(TestHost::new());
+21 -7
View File
@@ -14,13 +14,15 @@
// You should have received a copy of the GNU General Public License // You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>. // along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use async_std::sync::Mutex; use assert_matches::assert_matches;
use parity_scale_codec::Encode as _; use parity_scale_codec::Encode as _;
use polkadot_node_core_pvf::{ use polkadot_node_core_pvf::{
start, Config, InvalidCandidate, Metrics, Pvf, ValidationError, ValidationHost, start, Config, InvalidCandidate, Metrics, Pvf, ValidationError, ValidationHost,
JOB_TIMEOUT_WALL_CLOCK_FACTOR,
}; };
use polkadot_parachain::primitives::{BlockData, ValidationParams, ValidationResult}; use polkadot_parachain::primitives::{BlockData, ValidationParams, ValidationResult};
use std::time::Duration; use std::time::Duration;
use tokio::sync::Mutex;
mod adder; mod adder;
mod worker_common; mod worker_common;
@@ -47,7 +49,7 @@ impl TestHost {
let mut config = Config::new(cache_dir.path().to_owned(), program_path); let mut config = Config::new(cache_dir.path().to_owned(), program_path);
f(&mut config); f(&mut config);
let (host, task) = start(config, Metrics::default()); let (host, task) = start(config, Metrics::default());
let _ = async_std::task::spawn(task); let _ = tokio::task::spawn(task);
Self { _cache_dir: cache_dir, host: Mutex::new(host) } Self { _cache_dir: cache_dir, host: Mutex::new(host) }
} }
@@ -77,10 +79,11 @@ impl TestHost {
} }
} }
#[async_std::test] #[tokio::test]
async fn terminates_on_timeout() { async fn terminates_on_timeout() {
let host = TestHost::new(); let host = TestHost::new();
let start = std::time::Instant::now();
let result = host let result = host
.validate_candidate( .validate_candidate(
halt::wasm_binary_unwrap(), halt::wasm_binary_unwrap(),
@@ -97,10 +100,14 @@ async fn terminates_on_timeout() {
Err(ValidationError::InvalidCandidate(InvalidCandidate::HardTimeout)) => {}, Err(ValidationError::InvalidCandidate(InvalidCandidate::HardTimeout)) => {},
r => panic!("{:?}", r), r => panic!("{:?}", r),
} }
let duration = std::time::Instant::now().duration_since(start);
assert!(duration >= TEST_EXECUTION_TIMEOUT);
assert!(duration < TEST_EXECUTION_TIMEOUT * JOB_TIMEOUT_WALL_CLOCK_FACTOR);
} }
#[async_std::test] #[tokio::test]
async fn parallel_execution() { async fn ensure_parallel_execution() {
// Run some jobs that do not complete, thus timing out. // Run some jobs that do not complete, thus timing out.
let host = TestHost::new(); let host = TestHost::new();
let execute_pvf_future_1 = host.validate_candidate( let execute_pvf_future_1 = host.validate_candidate(
@@ -123,7 +130,14 @@ async fn parallel_execution() {
); );
let start = std::time::Instant::now(); let start = std::time::Instant::now();
let (_, _) = futures::join!(execute_pvf_future_1, execute_pvf_future_2); let (res1, res2) = futures::join!(execute_pvf_future_1, execute_pvf_future_2);
assert_matches!(
(res1, res2),
(
Err(ValidationError::InvalidCandidate(InvalidCandidate::HardTimeout)),
Err(ValidationError::InvalidCandidate(InvalidCandidate::HardTimeout))
)
);
// Total time should be < 2 x TEST_EXECUTION_TIMEOUT (two workers run in parallel). // Total time should be < 2 x TEST_EXECUTION_TIMEOUT (two workers run in parallel).
let duration = std::time::Instant::now().duration_since(start); let duration = std::time::Instant::now().duration_since(start);
@@ -136,7 +150,7 @@ async fn parallel_execution() {
); );
} }
#[async_std::test] #[tokio::test]
async fn execute_queue_doesnt_stall_if_workers_died() { async fn execute_queue_doesnt_stall_if_workers_died() {
let host = TestHost::new_with_config(|cfg| { let host = TestHost::new_with_config(|cfg| {
cfg.execute_workers_max_num = 5; cfg.execute_workers_max_num = 5;
@@ -18,7 +18,7 @@ use crate::PUPPET_EXE;
use polkadot_node_core_pvf::testing::worker_common::{spawn_with_program_path, SpawnErr}; use polkadot_node_core_pvf::testing::worker_common::{spawn_with_program_path, SpawnErr};
use std::time::Duration; use std::time::Duration;
#[async_std::test] #[tokio::test]
async fn spawn_timeout() { async fn spawn_timeout() {
let result = let result =
spawn_with_program_path("integration-test", PUPPET_EXE, &["sleep"], Duration::from_secs(2)) spawn_with_program_path("integration-test", PUPPET_EXE, &["sleep"], Duration::from_secs(2))
@@ -26,7 +26,7 @@ async fn spawn_timeout() {
assert!(matches!(result, Err(SpawnErr::AcceptTimeout))); assert!(matches!(result, Err(SpawnErr::AcceptTimeout)));
} }
#[async_std::test] #[tokio::test]
async fn should_connect() { async fn should_connect() {
let _ = spawn_with_program_path( let _ = spawn_with_program_path(
"integration-test", "integration-test",
+1 -1
View File
@@ -227,7 +227,7 @@ pub type UncheckedSignedFullStatement = UncheckedSigned<Statement, CompactStatem
/// Candidate invalidity details /// Candidate invalidity details
#[derive(Debug)] #[derive(Debug)]
pub enum InvalidCandidate { pub enum InvalidCandidate {
/// Failed to execute.`validate_block`. This includes function panicking. /// Failed to execute `validate_block`. This includes function panicking.
ExecutionError(String), ExecutionError(String),
/// Validation outputs check doesn't pass. /// Validation outputs check doesn't pass.
InvalidOutputs, InvalidOutputs,