Use CPU clock timeout for PVF jobs (#6282)

* Put in skeleton logic for CPU-time-preparation

Still needed:
- Flesh out logic
- Refactor some spots
- Tests

* Continue filling in logic for prepare worker CPU time changes

* Fix compiler errors

* Update lenience factor

* Fix some clippy lints for PVF module

* Fix compilation errors

* Address some review comments

* Add logging

* Add another log

* Address some review comments; change Mutex to AtomicBool

* Refactor handling response bytes

* Add CPU clock timeout logic for execute jobs

* Properly handle AtomicBool flag

* Use `Ordering::Relaxed`

* Refactor thread coordination logic

* Fix bug

* Add some timing information to execute tests

* Add section about the mitigation to the IG

* minor: Change more `Ordering`s to `Relaxed`

* candidate-validation: Fix build errors
This commit is contained in:
Marcin S
2022-11-30 07:17:31 -05:00
committed by GitHub
parent c61860e9be
commit 28a4e90912
17 changed files with 536 additions and 170 deletions
+11
View File
@@ -1027,6 +1027,16 @@ dependencies = [
"cfg-if",
]
[[package]]
name = "cpu-time"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e9e393a7668fe1fad3075085b86c781883000b4ede868f43627b34a87c8b7ded"
dependencies = [
"libc",
"winapi",
]
[[package]]
name = "cpufeatures"
version = "0.2.1"
@@ -6487,6 +6497,7 @@ dependencies = [
"assert_matches",
"async-process",
"async-std",
"cpu-time",
"futures",
"futures-timer",
"hex-literal",
@@ -638,7 +638,7 @@ trait ValidationBackend {
}
}
async fn precheck_pvf(&mut self, pvf: Pvf) -> Result<(), PrepareError>;
async fn precheck_pvf(&mut self, pvf: Pvf) -> Result<Duration, PrepareError>;
}
#[async_trait]
@@ -664,7 +664,7 @@ impl ValidationBackend for ValidationHost {
.map_err(|_| ValidationError::InternalError("validation was cancelled".into()))?
}
async fn precheck_pvf(&mut self, pvf: Pvf) -> Result<(), PrepareError> {
async fn precheck_pvf(&mut self, pvf: Pvf) -> Result<Duration, PrepareError> {
let (tx, rx) = oneshot::channel();
if let Err(_) = self.precheck_pvf(pvf, tx).await {
return Err(PrepareError::DidNotMakeIt)
@@ -377,7 +377,7 @@ impl ValidationBackend for MockValidateCandidateBackend {
result
}
async fn precheck_pvf(&mut self, _pvf: Pvf) -> Result<(), PrepareError> {
async fn precheck_pvf(&mut self, _pvf: Pvf) -> Result<Duration, PrepareError> {
unreachable!()
}
}
@@ -894,11 +894,11 @@ fn pov_decompression_failure_is_invalid() {
}
struct MockPreCheckBackend {
result: Result<(), PrepareError>,
result: Result<Duration, PrepareError>,
}
impl MockPreCheckBackend {
fn with_hardcoded_result(result: Result<(), PrepareError>) -> Self {
fn with_hardcoded_result(result: Result<Duration, PrepareError>) -> Self {
Self { result }
}
}
@@ -914,7 +914,7 @@ impl ValidationBackend for MockPreCheckBackend {
unreachable!()
}
async fn precheck_pvf(&mut self, _pvf: Pvf) -> Result<(), PrepareError> {
async fn precheck_pvf(&mut self, _pvf: Pvf) -> Result<Duration, PrepareError> {
self.result.clone()
}
}
@@ -931,7 +931,7 @@ fn precheck_works() {
let (check_fut, check_result) = precheck_pvf(
ctx.sender(),
MockPreCheckBackend::with_hardcoded_result(Ok(())),
MockPreCheckBackend::with_hardcoded_result(Ok(Duration::default())),
relay_parent,
validation_code_hash,
)
@@ -977,7 +977,7 @@ fn precheck_invalid_pvf_blob_compression() {
let (check_fut, check_result) = precheck_pvf(
ctx.sender(),
MockPreCheckBackend::with_hardcoded_result(Ok(())),
MockPreCheckBackend::with_hardcoded_result(Ok(Duration::default())),
relay_parent,
validation_code_hash,
)
+4
View File
@@ -13,6 +13,7 @@ always-assert = "0.1"
async-std = { version = "1.11.0", features = ["attributes"] }
async-process = "1.3.0"
assert_matches = "1.4.0"
cpu-time = "1.0.0"
futures = "0.3.21"
futures-timer = "3.0.2"
slotmap = "1.0"
@@ -21,10 +22,13 @@ pin-project = "1.0.9"
rand = "0.8.5"
tempfile = "3.3.0"
rayon = "1.5.1"
parity-scale-codec = { version = "3.1.5", default-features = false, features = ["derive"] }
polkadot-parachain = { path = "../../../parachain" }
polkadot-core-primitives = { path = "../../../core-primitives" }
polkadot-node-metrics = { path = "../../metrics"}
sc-executor = { git = "https://github.com/paritytech/substrate", branch = "master" }
sc-executor-wasmtime = { git = "https://github.com/paritytech/substrate", branch = "master" }
sc-executor-common = { git = "https://github.com/paritytech/substrate", branch = "master" }
+9 -2
View File
@@ -101,6 +101,8 @@ pub enum ArtifactState {
/// This is updated when we get the heads up for this artifact or when we just discover
/// this file.
last_time_needed: SystemTime,
/// The CPU time that was taken preparing this artifact.
cpu_time_elapsed: Duration,
},
/// A task to prepare this artifact is scheduled.
Preparing {
@@ -171,11 +173,16 @@ impl Artifacts {
/// This function must be used only for brand-new artifacts and should never be used for
/// replacing existing ones.
#[cfg(test)]
pub fn insert_prepared(&mut self, artifact_id: ArtifactId, last_time_needed: SystemTime) {
pub fn insert_prepared(
&mut self,
artifact_id: ArtifactId,
last_time_needed: SystemTime,
cpu_time_elapsed: Duration,
) {
// See the precondition.
always!(self
.artifacts
.insert(artifact_id, ArtifactState::Prepared { last_time_needed })
.insert(artifact_id, ArtifactState::Prepared { last_time_needed, cpu_time_elapsed })
.is_none());
}
+4 -3
View File
@@ -15,10 +15,11 @@
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use parity_scale_codec::{Decode, Encode};
use std::any::Any;
use std::{any::Any, time::Duration};
/// Result of PVF preparation performed by the validation host.
pub type PrepareResult = Result<(), PrepareError>;
/// Result of PVF preparation performed by the validation host. Contains the elapsed CPU time if
/// successful
pub type PrepareResult = Result<Duration, PrepareError>;
/// An error that occurred during the prepare part of the PVF pipeline.
#[derive(Debug, Clone, Encode, Decode)]
+1 -1
View File
@@ -24,4 +24,4 @@ mod queue;
mod worker;
pub use queue::{start, ToQueue};
pub use worker::worker_entrypoint;
pub use worker::{worker_entrypoint, Response as ExecuteResponse};
+2 -1
View File
@@ -225,8 +225,9 @@ fn handle_job_finish(
result_tx: ResultSender,
) {
let (idle_worker, result) = match outcome {
Outcome::Ok { result_descriptor, duration_ms: _, idle_worker } => {
Outcome::Ok { result_descriptor, duration: _, idle_worker } => {
// TODO: propagate the soft timeout
(Some(idle_worker), Ok(result_descriptor))
},
Outcome::InvalidCandidate { err, idle_worker } => (
+109 -21
View File
@@ -18,8 +18,9 @@ use crate::{
artifacts::ArtifactPathId,
executor_intf::Executor,
worker_common::{
bytes_to_path, framed_recv, framed_send, path_to_bytes, spawn_with_program_path,
worker_event_loop, IdleWorker, SpawnErr, WorkerHandle,
bytes_to_path, cpu_time_monitor_loop, framed_recv, framed_send, path_to_bytes,
spawn_with_program_path, worker_event_loop, IdleWorker, JobKind, SpawnErr, WorkerHandle,
JOB_TIMEOUT_WALL_CLOCK_FACTOR,
},
LOG_TARGET,
};
@@ -27,12 +28,21 @@ use async_std::{
io,
os::unix::net::UnixStream,
path::{Path, PathBuf},
task,
};
use cpu_time::ProcessTime;
use futures::FutureExt;
use futures_timer::Delay;
use parity_scale_codec::{Decode, Encode};
use polkadot_parachain::primitives::ValidationResult;
use std::time::{Duration, Instant};
use std::{
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
thread,
time::Duration,
};
/// Spawns a new worker with the given program path that acts as the worker and the spawn timeout.
///
@@ -48,7 +58,7 @@ pub async fn spawn(
pub enum Outcome {
/// PVF execution completed successfully and the result is returned. The worker is ready for
/// another job.
Ok { result_descriptor: ValidationResult, duration_ms: u64, idle_worker: IdleWorker },
Ok { result_descriptor: ValidationResult, duration: Duration, idle_worker: IdleWorker },
/// The candidate validation failed. It may be for example because the wasm execution triggered a trap.
/// Errors related to the preparation process are not expected to be encountered by the execution workers.
InvalidCandidate { err: String, idle_worker: IdleWorker },
@@ -80,7 +90,9 @@ pub async fn start_work(
artifact.path.display(),
);
if let Err(error) = send_request(&mut stream, &artifact.path, &validation_params).await {
if let Err(error) =
send_request(&mut stream, &artifact.path, &validation_params, execution_timeout).await
{
gum::warn!(
target: LOG_TARGET,
worker_pid = %pid,
@@ -91,6 +103,12 @@ pub async fn start_work(
return Outcome::IoErr
}
// We use a generous timeout here. This is in addition to the one in the child process, in
// case the child stalls. We have a wall clock timeout here in the host, but a CPU timeout
// in the child. We want to use CPU time because it varies less than wall clock time under
// load, but the CPU resources of the child can only be measured from the parent after the
// child process terminates.
let timeout = execution_timeout * JOB_TIMEOUT_WALL_CLOCK_FACTOR;
let response = futures::select! {
response = recv_response(&mut stream).fuse() => {
match response {
@@ -104,25 +122,47 @@ pub async fn start_work(
);
return Outcome::IoErr
},
Ok(response) => response,
Ok(response) => {
if let Response::Ok{duration, ..} = response {
if duration > execution_timeout {
// The job didn't complete within the timeout.
gum::warn!(
target: LOG_TARGET,
worker_pid = %pid,
"execute job took {}ms cpu time, exceeded execution timeout {}ms.",
duration.as_millis(),
execution_timeout.as_millis(),
);
// Return a timeout error.
return Outcome::HardTimeout;
}
}
response
},
}
},
_ = Delay::new(execution_timeout).fuse() => {
_ = Delay::new(timeout).fuse() => {
gum::warn!(
target: LOG_TARGET,
worker_pid = %pid,
validation_code_hash = ?artifact.id.code_hash,
"execution worker exceeded alloted time for execution",
);
return Outcome::HardTimeout;
// TODO: This case is not really a hard timeout as the timeout here in the host is
// lenient. Should fix this as part of
// https://github.com/paritytech/polkadot/issues/3754.
Response::TimedOut
},
};
match response {
Response::Ok { result_descriptor, duration_ms } =>
Outcome::Ok { result_descriptor, duration_ms, idle_worker: IdleWorker { stream, pid } },
Response::Ok { result_descriptor, duration } =>
Outcome::Ok { result_descriptor, duration, idle_worker: IdleWorker { stream, pid } },
Response::InvalidCandidate(err) =>
Outcome::InvalidCandidate { err, idle_worker: IdleWorker { stream, pid } },
Response::TimedOut => Outcome::HardTimeout,
Response::InternalError(err) =>
Outcome::InternalError { err, idle_worker: IdleWorker { stream, pid } },
}
@@ -132,12 +172,14 @@ async fn send_request(
stream: &mut UnixStream,
artifact_path: &Path,
validation_params: &[u8],
execution_timeout: Duration,
) -> io::Result<()> {
framed_send(stream, path_to_bytes(artifact_path)).await?;
framed_send(stream, validation_params).await
framed_send(stream, validation_params).await?;
framed_send(stream, &execution_timeout.encode()).await
}
async fn recv_request(stream: &mut UnixStream) -> io::Result<(PathBuf, Vec<u8>)> {
async fn recv_request(stream: &mut UnixStream) -> io::Result<(PathBuf, Vec<u8>, Duration)> {
let artifact_path = framed_recv(stream).await?;
let artifact_path = bytes_to_path(&artifact_path).ok_or_else(|| {
io::Error::new(
@@ -146,7 +188,14 @@ async fn recv_request(stream: &mut UnixStream) -> io::Result<(PathBuf, Vec<u8>)>
)
})?;
let params = framed_recv(stream).await?;
Ok((artifact_path, params))
let execution_timeout = framed_recv(stream).await?;
let execution_timeout = Duration::decode(&mut &execution_timeout[..]).map_err(|_| {
io::Error::new(
io::ErrorKind::Other,
"execute pvf recv_request: failed to decode duration".to_string(),
)
})?;
Ok((artifact_path, params, execution_timeout))
}
async fn send_response(stream: &mut UnixStream, response: Response) -> io::Result<()> {
@@ -164,9 +213,10 @@ async fn recv_response(stream: &mut UnixStream) -> io::Result<Response> {
}
#[derive(Encode, Decode)]
enum Response {
Ok { result_descriptor: ValidationResult, duration_ms: u64 },
pub enum Response {
Ok { result_descriptor: ValidationResult, duration: Duration },
InvalidCandidate(String),
TimedOut,
InternalError(String),
}
@@ -187,15 +237,53 @@ pub fn worker_entrypoint(socket_path: &str) {
let executor = Executor::new().map_err(|e| {
io::Error::new(io::ErrorKind::Other, format!("cannot create executor: {}", e))
})?;
loop {
let (artifact_path, params) = recv_request(&mut stream).await?;
let (artifact_path, params, execution_timeout) = recv_request(&mut stream).await?;
gum::debug!(
target: LOG_TARGET,
worker_pid = %std::process::id(),
"worker: validating artifact {}",
artifact_path.display(),
);
let response = validate_using_artifact(&artifact_path, &params, &executor).await;
// Create a lock flag. We set it when either thread finishes.
let lock = Arc::new(AtomicBool::new(false));
let cpu_time_start = ProcessTime::now();
// Spawn a new thread that runs the CPU time monitor. Continuously wakes up from
// sleeping and then either sleeps for the remaining CPU time, or kills the process if
// we exceed the CPU timeout.
let (stream_2, cpu_time_start_2, execution_timeout_2, lock_2) =
(stream.clone(), cpu_time_start, execution_timeout, lock.clone());
let handle =
thread::Builder::new().name("CPU time monitor".into()).spawn(move || {
task::block_on(async {
cpu_time_monitor_loop(
JobKind::Execute,
stream_2,
cpu_time_start_2,
execution_timeout_2,
lock_2,
)
.await;
})
})?;
let response =
validate_using_artifact(&artifact_path, &params, &executor, cpu_time_start).await;
let lock_result =
lock.compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed);
if lock_result.is_err() {
// The other thread is still sending an error response over the socket. Wait on it
// and return.
let _ = handle.join();
// Monitor thread detected timeout and likely already terminated the process,
// nothing to do.
continue
}
send_response(&mut stream, response).await?;
}
});
@@ -205,19 +293,19 @@ async fn validate_using_artifact(
artifact_path: &Path,
params: &[u8],
executor: &Executor,
cpu_time_start: ProcessTime,
) -> Response {
let validation_started_at = Instant::now();
let descriptor_bytes = match unsafe {
// SAFETY: this should be safe since the compiled artifact passed here comes from the
// file created by the prepare workers. These files are obtained by calling
// [`executor_intf::prepare`].
executor.execute(artifact_path.as_ref(), params)
} {
Err(err) => return Response::format_invalid("execute", &err.to_string()),
Err(err) => return Response::format_invalid("execute", &err),
Ok(d) => d,
};
let duration_ms = validation_started_at.elapsed().as_millis() as u64;
let duration = cpu_time_start.elapsed();
let result_descriptor = match ValidationResult::decode(&mut &descriptor_bytes[..]) {
Err(err) =>
@@ -225,5 +313,5 @@ async fn validate_using_artifact(
Ok(r) => r,
};
Response::Ok { result_descriptor, duration_ms }
Response::Ok { result_descriptor, duration }
}
+35 -19
View File
@@ -218,7 +218,7 @@ pub fn start(config: Config, metrics: Metrics) -> (ValidationHost, impl Future<O
);
let (to_execute_queue_tx, run_execute_queue) = execute::start(
metrics.clone(),
metrics,
config.execute_worker_program_path.to_owned(),
config.execute_workers_max_num,
config.execute_worker_spawn_timeout,
@@ -443,7 +443,7 @@ async fn handle_to_host(
/// Handles PVF prechecking requests.
///
/// This tries to prepare the PVF by compiling the WASM blob within a given timeout ([`PRECHECK_COMPILATION_TIMEOUT`]).
/// This tries to prepare the PVF by compiling the WASM blob within a given timeout ([`PRECHECK_PREPARATION_TIMEOUT`]).
///
/// If the prepare job failed previously, we may retry it under certain conditions.
async fn handle_precheck_pvf(
@@ -456,9 +456,9 @@ async fn handle_precheck_pvf(
if let Some(state) = artifacts.artifact_state_mut(&artifact_id) {
match state {
ArtifactState::Prepared { last_time_needed } => {
ArtifactState::Prepared { last_time_needed, cpu_time_elapsed } => {
*last_time_needed = SystemTime::now();
let _ = result_sender.send(Ok(()));
let _ = result_sender.send(Ok(*cpu_time_elapsed));
},
ArtifactState::Preparing { waiting_for_response, num_failures: _ } =>
waiting_for_response.push(result_sender),
@@ -490,7 +490,7 @@ async fn handle_precheck_pvf(
///
/// If the prepare job failed previously, we may retry it under certain conditions.
///
/// When preparing for execution, we use a more lenient timeout ([`EXECUTE_COMPILATION_TIMEOUT`])
/// When preparing for execution, we use a more lenient timeout ([`EXECUTE_PREPARATION_TIMEOUT`])
/// than when prechecking.
async fn handle_execute_pvf(
cache_path: &Path,
@@ -505,7 +505,7 @@ async fn handle_execute_pvf(
if let Some(state) = artifacts.artifact_state_mut(&artifact_id) {
match state {
ArtifactState::Prepared { last_time_needed } => {
ArtifactState::Prepared { last_time_needed, .. } => {
*last_time_needed = SystemTime::now();
// This artifact has already been prepared, send it to the execute queue.
@@ -563,7 +563,7 @@ async fn handle_execute_pvf(
awaiting_prepare.add(artifact_id, execution_timeout, params, result_tx);
}
return Ok(())
Ok(())
}
async fn handle_heads_up(
@@ -701,11 +701,12 @@ async fn handle_prepare_done(
}
*state = match result {
Ok(()) => ArtifactState::Prepared { last_time_needed: SystemTime::now() },
Ok(cpu_time_elapsed) =>
ArtifactState::Prepared { last_time_needed: SystemTime::now(), cpu_time_elapsed },
Err(error) => ArtifactState::FailedToProcess {
last_time_failed: SystemTime::now(),
num_failures: *num_failures + 1,
error: error.clone(),
error,
},
};
@@ -780,7 +781,7 @@ fn can_retry_prepare_after_failure(
// Gracefully returned an error, so it will probably be reproducible. Don't retry.
Prevalidation(_) | Preparation(_) => false,
// Retry if the retry cooldown has elapsed and if we have already retried less than
// `NUM_PREPARE_RETRIES` times.
// `NUM_PREPARE_RETRIES` times. IO errors may resolve themselves.
Panic(_) | TimedOut | DidNotMakeIt =>
SystemTime::now() >= last_time_failed + PREPARE_FAILURE_COOLDOWN &&
num_failures <= NUM_PREPARE_RETRIES,
@@ -1016,8 +1017,8 @@ mod tests {
let mut builder = Builder::default();
builder.cleanup_pulse_interval = Duration::from_millis(100);
builder.artifact_ttl = Duration::from_millis(500);
builder.artifacts.insert_prepared(artifact_id(1), mock_now);
builder.artifacts.insert_prepared(artifact_id(2), mock_now);
builder.artifacts.insert_prepared(artifact_id(1), mock_now, Duration::default());
builder.artifacts.insert_prepared(artifact_id(2), mock_now, Duration::default());
let mut test = builder.build();
let mut host = test.host_handle();
@@ -1087,7 +1088,10 @@ mod tests {
);
test.from_prepare_queue_tx
.send(prepare::FromQueue { artifact_id: artifact_id(1), result: Ok(()) })
.send(prepare::FromQueue {
artifact_id: artifact_id(1),
result: Ok(Duration::default()),
})
.await
.unwrap();
let result_tx_pvf_1_1 = assert_matches!(
@@ -1100,7 +1104,10 @@ mod tests {
);
test.from_prepare_queue_tx
.send(prepare::FromQueue { artifact_id: artifact_id(2), result: Ok(()) })
.send(prepare::FromQueue {
artifact_id: artifact_id(2),
result: Ok(Duration::default()),
})
.await
.unwrap();
let result_tx_pvf_2 = assert_matches!(
@@ -1149,13 +1156,16 @@ mod tests {
);
// Send `Ok` right away and poll the host.
test.from_prepare_queue_tx
.send(prepare::FromQueue { artifact_id: artifact_id(1), result: Ok(()) })
.send(prepare::FromQueue {
artifact_id: artifact_id(1),
result: Ok(Duration::default()),
})
.await
.unwrap();
// No pending execute requests.
test.poll_ensure_to_execute_queue_is_empty().await;
// Received the precheck result.
assert_matches!(result_rx.now_or_never().unwrap().unwrap(), Ok(()));
assert_matches!(result_rx.now_or_never().unwrap().unwrap(), Ok(_));
// Send multiple requests for the same PVF.
let mut precheck_receivers = Vec::new();
@@ -1253,7 +1263,10 @@ mod tests {
prepare::ToQueue::Enqueue { .. }
);
test.from_prepare_queue_tx
.send(prepare::FromQueue { artifact_id: artifact_id(2), result: Ok(()) })
.send(prepare::FromQueue {
artifact_id: artifact_id(2),
result: Ok(Duration::default()),
})
.await
.unwrap();
// The execute queue receives new request, preckecking is finished and we can
@@ -1263,7 +1276,7 @@ mod tests {
execute::ToQueue::Enqueue { .. }
);
for result_rx in precheck_receivers {
assert_matches!(result_rx.now_or_never().unwrap().unwrap(), Ok(()));
assert_matches!(result_rx.now_or_never().unwrap().unwrap(), Ok(_));
}
}
@@ -1511,7 +1524,10 @@ mod tests {
);
test.from_prepare_queue_tx
.send(prepare::FromQueue { artifact_id: artifact_id(1), result: Ok(()) })
.send(prepare::FromQueue {
artifact_id: artifact_id(1),
result: Ok(Duration::default()),
})
.await
.unwrap();
+27 -13
View File
@@ -364,16 +364,14 @@ async fn handle_worker_concluded(
// the pool up to the hard cap.
spawn_extra_worker(queue, false).await?;
}
} else if queue.limits.should_cull(queue.workers.len() + queue.spawn_inflight) {
// We no longer need services of this worker. Kill it.
queue.workers.remove(worker);
send_pool(&mut queue.to_pool_tx, pool::ToPool::Kill(worker)).await?;
} else {
if queue.limits.should_cull(queue.workers.len() + queue.spawn_inflight) {
// We no longer need services of this worker. Kill it.
queue.workers.remove(worker);
send_pool(&mut queue.to_pool_tx, pool::ToPool::Kill(worker)).await?;
} else {
// see if there are more work available and schedule it.
if let Some(job) = queue.unscheduled.next() {
assign(queue, worker, job).await?;
}
// see if there are more work available and schedule it.
if let Some(job) = queue.unscheduled.next() {
assign(queue, worker, job).await?;
}
}
@@ -618,7 +616,11 @@ mod tests {
let w = test.workers.insert(());
test.send_from_pool(pool::FromPool::Spawned(w));
test.send_from_pool(pool::FromPool::Concluded { worker: w, rip: false, result: Ok(()) });
test.send_from_pool(pool::FromPool::Concluded {
worker: w,
rip: false,
result: Ok(Duration::default()),
});
assert_eq!(test.poll_and_recv_from_queue().await.artifact_id, pvf(1).as_artifact_id());
}
@@ -647,7 +649,11 @@ mod tests {
assert_matches!(test.poll_and_recv_to_pool().await, pool::ToPool::StartWork { .. });
assert_matches!(test.poll_and_recv_to_pool().await, pool::ToPool::StartWork { .. });
test.send_from_pool(pool::FromPool::Concluded { worker: w1, rip: false, result: Ok(()) });
test.send_from_pool(pool::FromPool::Concluded {
worker: w1,
rip: false,
result: Ok(Duration::default()),
});
assert_matches!(test.poll_and_recv_to_pool().await, pool::ToPool::StartWork { .. });
@@ -693,7 +699,11 @@ mod tests {
// That's a bit silly in this context, but in production there will be an entire pool up
// to the `soft_capacity` of workers and it doesn't matter which one to cull. Either way,
// we just check that edge case of an edge case works.
test.send_from_pool(pool::FromPool::Concluded { worker: w1, rip: false, result: Ok(()) });
test.send_from_pool(pool::FromPool::Concluded {
worker: w1,
rip: false,
result: Ok(Duration::default()),
});
assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Kill(w1));
}
@@ -719,7 +729,11 @@ mod tests {
assert_matches!(test.poll_and_recv_to_pool().await, pool::ToPool::StartWork { .. });
// Conclude worker 1 and rip it.
test.send_from_pool(pool::FromPool::Concluded { worker: w1, rip: true, result: Ok(()) });
test.send_from_pool(pool::FromPool::Concluded {
worker: w1,
rip: true,
result: Ok(Duration::default()),
});
// Since there is still work, the queue requested one extra worker to spawn to handle the
// remaining enqueued work items.
+190 -83
View File
@@ -18,8 +18,9 @@ use crate::{
artifacts::CompiledArtifact,
error::{PrepareError, PrepareResult},
worker_common::{
bytes_to_path, framed_recv, framed_send, path_to_bytes, spawn_with_program_path,
tmpfile_in, worker_event_loop, IdleWorker, SpawnErr, WorkerHandle,
bytes_to_path, cpu_time_monitor_loop, framed_recv, framed_send, path_to_bytes,
spawn_with_program_path, tmpfile_in, worker_event_loop, IdleWorker, JobKind, SpawnErr,
WorkerHandle, JOB_TIMEOUT_WALL_CLOCK_FACTOR,
},
LOG_TARGET,
};
@@ -27,10 +28,20 @@ use async_std::{
io,
os::unix::net::UnixStream,
path::{Path, PathBuf},
task,
};
use cpu_time::ProcessTime;
use parity_scale_codec::{Decode, Encode};
use sp_core::hexdisplay::HexDisplay;
use std::{panic, sync::Arc, time::Duration};
use std::{
panic,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
thread,
time::Duration,
};
/// Spawns a new worker with the given program path that acts as the worker and the spawn timeout.
///
@@ -58,6 +69,13 @@ pub enum Outcome {
DidNotMakeIt,
}
#[derive(Debug)]
enum Selected {
Done(PrepareResult),
IoErr,
Deadline,
}
/// Given the idle token of a worker and parameters of work, communicates with the worker and
/// returns the outcome.
pub async fn start_work(
@@ -77,7 +95,7 @@ pub async fn start_work(
);
with_tmp_file(pid, cache_path, |tmp_file| async move {
if let Err(err) = send_request(&mut stream, code, &tmp_file).await {
if let Err(err) = send_request(&mut stream, code, &tmp_file, preparation_timeout).await {
gum::warn!(
target: LOG_TARGET,
worker_pid = %pid,
@@ -88,78 +106,52 @@ pub async fn start_work(
}
// Wait for the result from the worker, keeping in mind that there may be a timeout, the
// worker may get killed, or something along these lines.
// worker may get killed, or something along these lines. In that case we should propagate
// the error to the pool.
//
// In that case we should propagate the error to the pool.
// We use a generous timeout here. This is in addition to the one in the child process, in
// case the child stalls. We have a wall clock timeout here in the host, but a CPU timeout
// in the child. We want to use CPU time because it varies less than wall clock time under
// load, but the CPU resources of the child can only be measured from the parent after the
// child process terminates.
let timeout = preparation_timeout * JOB_TIMEOUT_WALL_CLOCK_FACTOR;
let result = async_std::future::timeout(timeout, framed_recv(&mut stream)).await;
#[derive(Debug)]
enum Selected {
Done(PrepareResult),
IoErr,
Deadline,
}
let selected =
match async_std::future::timeout(preparation_timeout, framed_recv(&mut stream)).await {
Ok(Ok(response_bytes)) => {
// Received bytes from worker within the time limit.
// By convention we expect encoded `PrepareResult`.
if let Ok(result) = PrepareResult::decode(&mut response_bytes.as_slice()) {
if result.is_ok() {
gum::debug!(
target: LOG_TARGET,
worker_pid = %pid,
"promoting WIP artifact {} to {}",
tmp_file.display(),
artifact_path.display(),
);
async_std::fs::rename(&tmp_file, &artifact_path)
.await
.map(|_| Selected::Done(result))
.unwrap_or_else(|err| {
gum::warn!(
target: LOG_TARGET,
worker_pid = %pid,
"failed to rename the artifact from {} to {}: {:?}",
tmp_file.display(),
artifact_path.display(),
err,
);
Selected::IoErr
})
} else {
Selected::Done(result)
}
} else {
// We received invalid bytes from the worker.
let bound_bytes = &response_bytes[..response_bytes.len().min(4)];
gum::warn!(
target: LOG_TARGET,
worker_pid = %pid,
"received unexpected response from the prepare worker: {}",
HexDisplay::from(&bound_bytes),
);
Selected::IoErr
}
},
Ok(Err(err)) => {
// Communication error within the time limit.
gum::warn!(
target: LOG_TARGET,
worker_pid = %pid,
"failed to recv a prepare response: {:?}",
err,
);
Selected::IoErr
},
Err(_) => {
// Timed out.
Selected::Deadline
},
};
let selected = match result {
// Received bytes from worker within the time limit.
Ok(Ok(response_bytes)) =>
handle_response_bytes(
response_bytes,
pid,
tmp_file,
artifact_path,
preparation_timeout,
)
.await,
Ok(Err(err)) => {
// Communication error within the time limit.
gum::warn!(
target: LOG_TARGET,
worker_pid = %pid,
"failed to recv a prepare response: {:?}",
err,
);
Selected::IoErr
},
Err(_) => {
// Timed out here on the host.
gum::warn!(
target: LOG_TARGET,
worker_pid = %pid,
"did not recv a prepare response within the time limit",
);
Selected::Deadline
},
};
match selected {
// Timed out on the child. This should already be logged by the child.
Selected::Done(Err(PrepareError::TimedOut)) => Outcome::TimedOut,
Selected::Done(result) =>
Outcome::Concluded { worker: IdleWorker { stream, pid }, result },
Selected::Deadline => Outcome::TimedOut,
@@ -169,6 +161,76 @@ pub async fn start_work(
.await
}
/// Handles the case where we successfully received response bytes on the host from the child.
async fn handle_response_bytes(
response_bytes: Vec<u8>,
pid: u32,
tmp_file: PathBuf,
artifact_path: PathBuf,
preparation_timeout: Duration,
) -> Selected {
// By convention we expect encoded `PrepareResult`.
let result = match PrepareResult::decode(&mut response_bytes.as_slice()) {
Ok(result) => result,
Err(_) => {
// We received invalid bytes from the worker.
let bound_bytes = &response_bytes[..response_bytes.len().min(4)];
gum::warn!(
target: LOG_TARGET,
worker_pid = %pid,
"received unexpected response from the prepare worker: {}",
HexDisplay::from(&bound_bytes),
);
return Selected::IoErr
},
};
let cpu_time_elapsed = match result {
Ok(result) => result,
Err(_) => return Selected::Done(result),
};
if cpu_time_elapsed > preparation_timeout {
// The job didn't complete within the timeout.
gum::warn!(
target: LOG_TARGET,
worker_pid = %pid,
"prepare job took {}ms cpu time, exceeded preparation timeout {}ms. Clearing WIP artifact {}",
cpu_time_elapsed.as_millis(),
preparation_timeout.as_millis(),
tmp_file.display(),
);
// Return a timeout error.
//
// NOTE: The artifact exists, but is located in a temporary file which
// will be cleared by `with_tmp_file`.
return Selected::Deadline
}
gum::debug!(
target: LOG_TARGET,
worker_pid = %pid,
"promoting WIP artifact {} to {}",
tmp_file.display(),
artifact_path.display(),
);
async_std::fs::rename(&tmp_file, &artifact_path)
.await
.map(|_| Selected::Done(result))
.unwrap_or_else(|err| {
gum::warn!(
target: LOG_TARGET,
worker_pid = %pid,
"failed to rename the artifact from {} to {}: {:?}",
tmp_file.display(),
artifact_path.display(),
err,
);
Selected::IoErr
})
}
/// Create a temporary file for an artifact at the given cache path and execute the given
/// future/closure passing the file path in.
///
@@ -218,13 +280,15 @@ async fn send_request(
stream: &mut UnixStream,
code: Arc<Vec<u8>>,
tmp_file: &Path,
preparation_timeout: Duration,
) -> io::Result<()> {
framed_send(stream, &code).await?;
framed_send(stream, path_to_bytes(tmp_file)).await?;
framed_send(stream, &preparation_timeout.encode()).await?;
Ok(())
}
async fn recv_request(stream: &mut UnixStream) -> io::Result<(Vec<u8>, PathBuf)> {
async fn recv_request(stream: &mut UnixStream) -> io::Result<(Vec<u8>, PathBuf, Duration)> {
let code = framed_recv(stream).await?;
let tmp_file = framed_recv(stream).await?;
let tmp_file = bytes_to_path(&tmp_file).ok_or_else(|| {
@@ -233,7 +297,14 @@ async fn recv_request(stream: &mut UnixStream) -> io::Result<(Vec<u8>, PathBuf)>
"prepare pvf recv_request: non utf-8 artifact path".to_string(),
)
})?;
Ok((code, tmp_file))
let preparation_timeout = framed_recv(stream).await?;
let preparation_timeout = Duration::decode(&mut &preparation_timeout[..]).map_err(|_| {
io::Error::new(
io::ErrorKind::Other,
"prepare pvf recv_request: failed to decode duration".to_string(),
)
})?;
Ok((code, tmp_file, preparation_timeout))
}
/// The entrypoint that the spawned prepare worker should start with. The `socket_path` specifies
@@ -241,7 +312,7 @@ async fn recv_request(stream: &mut UnixStream) -> io::Result<(Vec<u8>, PathBuf)>
pub fn worker_entrypoint(socket_path: &str) {
worker_event_loop("prepare", socket_path, |mut stream| async move {
loop {
let (code, dest) = recv_request(&mut stream).await?;
let (code, dest, preparation_timeout) = recv_request(&mut stream).await?;
gum::debug!(
target: LOG_TARGET,
@@ -249,18 +320,54 @@ pub fn worker_entrypoint(socket_path: &str) {
"worker: preparing artifact",
);
let result = match prepare_artifact(&code) {
// Create a lock flag. We set it when either thread finishes.
let lock = Arc::new(AtomicBool::new(false));
let cpu_time_start = ProcessTime::now();
// Spawn a new thread that runs the CPU time monitor. Continuously wakes up from
// sleeping and then either sleeps for the remaining CPU time, or kills the process if
// we exceed the CPU timeout.
let (stream_2, cpu_time_start_2, preparation_timeout_2, lock_2) =
(stream.clone(), cpu_time_start, preparation_timeout, lock.clone());
let handle =
thread::Builder::new().name("CPU time monitor".into()).spawn(move || {
task::block_on(async {
cpu_time_monitor_loop(
JobKind::Prepare,
stream_2,
cpu_time_start_2,
preparation_timeout_2,
lock_2,
)
.await;
})
})?;
// Prepares the artifact in a separate thread.
let result = match prepare_artifact(&code).await {
Err(err) => {
// Serialized error will be written into the socket.
Err(err)
},
Ok(compiled_artifact) => {
let cpu_time_elapsed = cpu_time_start.elapsed();
let lock_result =
lock.compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed);
if lock_result.is_err() {
// The other thread is still sending an error response over the socket. Wait on it and
// return.
let _ = handle.join();
// Monitor thread detected timeout and likely already terminated the
// process, nothing to do.
continue
}
// Write the serialized artifact into a temp file.
// PVF host only keeps artifacts statuses in its memory,
// successfully compiled code gets stored on the disk (and
// consequently deserialized by execute-workers). The prepare
// worker is only required to send an empty `Ok` to the pool
// to indicate the success.
//
// PVF host only keeps artifacts statuses in its memory, successfully compiled code gets stored
// on the disk (and consequently deserialized by execute-workers). The prepare worker is only
// required to send `Ok` to the pool to indicate the success.
gum::debug!(
target: LOG_TARGET,
@@ -270,7 +377,7 @@ pub fn worker_entrypoint(socket_path: &str) {
);
async_std::fs::write(&dest, &compiled_artifact).await?;
Ok(())
Ok(cpu_time_elapsed)
},
};
@@ -279,7 +386,7 @@ pub fn worker_entrypoint(socket_path: &str) {
});
}
fn prepare_artifact(code: &[u8]) -> Result<CompiledArtifact, PrepareError> {
async fn prepare_artifact(code: &[u8]) -> Result<CompiledArtifact, PrepareError> {
panic::catch_unwind(|| {
let blob = match crate::executor_intf::prevalidate(code) {
Err(err) => return Err(PrepareError::Prevalidation(format!("{:?}", err))),
+100 -3
View File
@@ -16,25 +16,54 @@
//! Common logic for implementation of worker processes.
use crate::LOG_TARGET;
use crate::{execute::ExecuteResponse, PrepareError, LOG_TARGET};
use async_std::{
io,
os::unix::net::{UnixListener, UnixStream},
path::{Path, PathBuf},
};
use cpu_time::ProcessTime;
use futures::{
never::Never, AsyncRead, AsyncReadExt as _, AsyncWrite, AsyncWriteExt as _, FutureExt as _,
};
use futures_timer::Delay;
use parity_scale_codec::Encode;
use pin_project::pin_project;
use rand::Rng;
use std::{
fmt, mem,
pin::Pin,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
task::{Context, Poll},
time::Duration,
};
/// A multiple of the job timeout (in CPU time) for which we are willing to wait on the host (in
/// wall clock time). This is lenient because CPU time may go slower than wall clock time.
pub const JOB_TIMEOUT_WALL_CLOCK_FACTOR: u32 = 4;
/// Some allowed overhead that we account for in the "CPU time monitor" thread's sleeps, on the
/// child process.
pub const JOB_TIMEOUT_OVERHEAD: Duration = Duration::from_millis(50);
#[derive(Copy, Clone, Debug)]
pub enum JobKind {
Prepare,
Execute,
}
impl fmt::Display for JobKind {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Prepare => write!(f, "prepare"),
Self::Execute => write!(f, "execute"),
}
}
}
/// This is publicly exposed only for integration tests.
#[doc(hidden)]
pub async fn spawn_with_program_path(
@@ -169,6 +198,74 @@ where
);
}
/// Loop that runs in the CPU time monitor thread on prepare and execute jobs. Continuously wakes up
/// from sleeping and then either sleeps for the remaining CPU time, or kills the process if we
/// exceed the CPU timeout.
///
/// NOTE: Killed processes are detected and cleaned up in `purge_dead`.
///
/// NOTE: If the job completes and this thread is still sleeping, it will continue sleeping in the
/// background. When it wakes, it will see that the flag has been set and return.
pub async fn cpu_time_monitor_loop(
job_kind: JobKind,
mut stream: UnixStream,
cpu_time_start: ProcessTime,
timeout: Duration,
lock: Arc<AtomicBool>,
) {
loop {
let cpu_time_elapsed = cpu_time_start.elapsed();
// Treat the timeout as CPU time, which is less subject to variance due to load.
if cpu_time_elapsed > timeout {
let result = lock.compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed);
if result.is_err() {
// Hit the job-completed case first, return from this thread.
return
}
// Log if we exceed the timeout.
gum::warn!(
target: LOG_TARGET,
worker_pid = %std::process::id(),
"{job_kind} job took {}ms cpu time, exceeded {job_kind} timeout {}ms",
cpu_time_elapsed.as_millis(),
timeout.as_millis(),
);
// Send back a TimedOut error on timeout.
let encoded_result = match job_kind {
JobKind::Prepare => {
let result: Result<(), PrepareError> = Err(PrepareError::TimedOut);
result.encode()
},
JobKind::Execute => {
let result = ExecuteResponse::TimedOut;
result.encode()
},
};
// If we error there is nothing else we can do here, and we are killing the process,
// anyway. The receiving side will just have to time out.
if let Err(err) = framed_send(&mut stream, encoded_result.as_slice()).await {
gum::warn!(
target: LOG_TARGET,
worker_pid = %std::process::id(),
"{job_kind} worker -> pvf host: error sending result over the socket: {:?}",
err
);
}
// Kill the process.
std::process::exit(1);
}
// Sleep for the remaining CPU time, plus a bit to account for overhead. Note that the sleep
// is wall clock time. The CPU clock may be slower than the wall clock.
let sleep_interval = timeout - cpu_time_elapsed + JOB_TIMEOUT_OVERHEAD;
std::thread::sleep(sleep_interval);
}
}
/// A struct that represents an idle worker.
///
/// This struct is supposed to be used as a token that is passed by move into a subroutine that
@@ -200,8 +297,8 @@ pub enum SpawnErr {
/// This is a representation of a potentially running worker. Drop it and the process will be killed.
///
/// A worker's handle is also a future that resolves when it's detected that the worker's process
/// has been terminated. Since the worker is running in another process it is obviously not necessary
/// to poll this future to make the worker run, it's only for termination detection.
/// has been terminated. Since the worker is running in another process it is obviously not
/// necessary to poll this future to make the worker run, it's only for termination detection.
///
/// This future relies on the fact that a child process's stdout `fd` is closed upon it's termination.
#[pin_project]
+2 -2
View File
@@ -23,7 +23,7 @@ use polkadot_parachain::primitives::{
};
#[async_std::test]
async fn execute_good_on_parent() {
async fn execute_good_block_on_parent() {
let parent_head = HeadData { number: 0, parent_hash: [0; 32], post_state: hash_state(0) };
let block_data = BlockData { state: 0, add: 512 };
@@ -89,7 +89,7 @@ async fn execute_good_chain_on_parent() {
}
#[async_std::test]
async fn execute_bad_on_parent() {
async fn execute_bad_block_on_parent() {
let parent_head = HeadData { number: 0, parent_hash: [0; 32], post_state: hash_state(0) };
let block_data = BlockData {
+20 -4
View File
@@ -101,6 +101,7 @@ async fn terminates_on_timeout() {
#[async_std::test]
async fn parallel_execution() {
// Run some jobs that do not complete, thus timing out.
let host = TestHost::new();
let execute_pvf_future_1 = host.validate_candidate(
halt::wasm_binary_unwrap(),
@@ -124,11 +125,14 @@ async fn parallel_execution() {
let start = std::time::Instant::now();
let (_, _) = futures::join!(execute_pvf_future_1, execute_pvf_future_2);
// total time should be < 2 x EXECUTION_TIMEOUT_SEC
const EXECUTION_TIMEOUT_SEC: u64 = 3;
// Total time should be < 2 x TEST_EXECUTION_TIMEOUT (two workers run in parallel).
let duration = std::time::Instant::now().duration_since(start);
let max_duration = 2 * TEST_EXECUTION_TIMEOUT;
assert!(
std::time::Instant::now().duration_since(start) <
std::time::Duration::from_secs(EXECUTION_TIMEOUT_SEC * 2)
duration < max_duration,
"Expected duration {}ms to be less than {}ms",
duration.as_millis(),
max_duration.as_millis()
);
}
@@ -141,6 +145,7 @@ async fn execute_queue_doesnt_stall_if_workers_died() {
// Here we spawn 8 validation jobs for the `halt` PVF and share those between 5 workers. The
// first five jobs should timeout and the workers killed. For the next 3 jobs a new batch of
// workers should be spun up.
let start = std::time::Instant::now();
futures::future::join_all((0u8..=8).map(|_| {
host.validate_candidate(
halt::wasm_binary_unwrap(),
@@ -153,4 +158,15 @@ async fn execute_queue_doesnt_stall_if_workers_died() {
)
}))
.await;
// Total time should be >= 2 x TEST_EXECUTION_TIMEOUT (two separate sets of workers that should
// both timeout).
let duration = std::time::Instant::now().duration_since(start);
let max_duration = 2 * TEST_EXECUTION_TIMEOUT;
assert!(
duration >= max_duration,
"Expected duration {}ms to be greater than or equal to {}ms",
duration.as_millis(),
max_duration.as_millis()
);
}
@@ -77,10 +77,18 @@ time they can take. As the time for a job can vary depending on the machine and
load on the machine, this can potentially lead to disputes where some validators
successfuly execute a PVF and others don't.
One mitigation we have in place is a more lenient timeout for preparation during
execution than during pre-checking. The rationale is that the PVF has already
passed pre-checking, so we know it should be valid, and we allow it to take
longer than expected, as this is likely due to an issue with the machine and not
the PVF.
One dispute mitigation we have in place is a more lenient timeout for
preparation during execution than during pre-checking. The rationale is that the
PVF has already passed pre-checking, so we know it should be valid, and we allow
it to take longer than expected, as this is likely due to an issue with the
machine and not the PVF.
#### CPU clock timeouts
Another timeout-related mitigation we employ is to measure the time taken by
jobs using CPU time, rather than wall clock time. This is because the CPU time
of a process is less variable under different system conditions. When the
overall system is under heavy load, the wall clock time of a job is affected
more than the CPU time.
[CVM]: ../../types/overseer-protocol.md#validationrequesttype
+1 -5
View File
@@ -108,11 +108,7 @@ where
+ Sync
+ 'static,
C::Api: frame_rpc_system::AccountNonceApi<Block, AccountId, Nonce>,
C::Api: mmr_rpc::MmrRuntimeApi<
Block,
<Block as sp_runtime::traits::Block>::Hash,
BlockNumber,
>,
C::Api: mmr_rpc::MmrRuntimeApi<Block, <Block as sp_runtime::traits::Block>::Hash, BlockNumber>,
C::Api: pallet_transaction_payment_rpc::TransactionPaymentRuntimeApi<Block, Balance>,
C::Api: BabeApi<Block>,
C::Api: BlockBuilder<Block>,