Separate preparation timeouts for PVF prechecking and execution (#6139)

* Add some documentation

* Add `compilation_timeout` parameter for PVF preparation job

* Update buckets in prometheus metrics

* Update prepare/queue tests

* Update pvf-prechecking overview in implementer docs

* Fix some CI checks
This commit is contained in:
Marcin S
2022-10-13 07:00:57 -04:00
committed by GitHub
parent b3532393b8
commit 17730b85be
8 changed files with 164 additions and 53 deletions
+11 -3
View File
@@ -61,7 +61,12 @@ pub enum ToPool {
///
/// In either case, the worker is considered busy and no further `StartWork` messages should be
/// sent until either `Concluded` or `Rip` message is received.
StartWork { worker: Worker, code: Arc<Vec<u8>>, artifact_path: PathBuf },
StartWork {
worker: Worker,
code: Arc<Vec<u8>>,
artifact_path: PathBuf,
compilation_timeout: Duration,
},
}
/// A message sent from pool to its client.
@@ -205,7 +210,7 @@ fn handle_to_pool(
metrics.prepare_worker().on_begin_spawn();
mux.push(spawn_worker_task(program_path.to_owned(), spawn_timeout).boxed());
},
ToPool::StartWork { worker, code, artifact_path } => {
ToPool::StartWork { worker, code, artifact_path, compilation_timeout } => {
if let Some(data) = spawned.get_mut(worker) {
if let Some(idle) = data.idle.take() {
let preparation_timer = metrics.time_preparation();
@@ -216,6 +221,7 @@ fn handle_to_pool(
code,
cache_path.to_owned(),
artifact_path,
compilation_timeout,
preparation_timer,
)
.boxed(),
@@ -263,9 +269,11 @@ async fn start_work_task<Timer>(
code: Arc<Vec<u8>>,
cache_path: PathBuf,
artifact_path: PathBuf,
compilation_timeout: Duration,
_preparation_timer: Option<Timer>,
) -> PoolEvent {
let outcome = worker::start_work(idle, code, &cache_path, artifact_path).await;
let outcome =
worker::start_work(idle, code, &cache_path, artifact_path, compilation_timeout).await;
PoolEvent::StartWork(worker, outcome)
}
+66 -23
View File
@@ -21,7 +21,10 @@ use crate::{artifacts::ArtifactId, metrics::Metrics, PrepareResult, Priority, Pv
use always_assert::{always, never};
use async_std::path::PathBuf;
use futures::{channel::mpsc, stream::StreamExt as _, Future, SinkExt};
use std::collections::{HashMap, VecDeque};
use std::{
collections::{HashMap, VecDeque},
time::Duration,
};
/// A request to pool.
#[derive(Debug)]
@@ -30,7 +33,7 @@ pub enum ToQueue {
///
/// Note that it is incorrect to enqueue the same PVF again without first receiving the
/// [`FromQueue`] response.
Enqueue { priority: Priority, pvf: Pvf },
Enqueue { priority: Priority, pvf: Pvf, compilation_timeout: Duration },
}
/// A response from queue.
@@ -76,6 +79,8 @@ struct JobData {
/// The priority of this job. Can be bumped.
priority: Priority,
pvf: Pvf,
/// The timeout for the preparation job.
compilation_timeout: Duration,
worker: Option<Worker>,
}
@@ -91,7 +96,7 @@ impl WorkerData {
}
/// A queue structured like this is prone to starving, however, we don't care that much since we expect
/// there is going to be a limited number of critical jobs and we don't really care if background starve.
/// there is going to be a limited number of critical jobs and we don't really care if background starve.
#[derive(Default)]
struct Unscheduled {
normal: VecDeque<Job>,
@@ -203,18 +208,24 @@ impl Queue {
async fn handle_to_queue(queue: &mut Queue, to_queue: ToQueue) -> Result<(), Fatal> {
match to_queue {
ToQueue::Enqueue { priority, pvf } => {
handle_enqueue(queue, priority, pvf).await?;
ToQueue::Enqueue { priority, pvf, compilation_timeout } => {
handle_enqueue(queue, priority, pvf, compilation_timeout).await?;
},
}
Ok(())
}
async fn handle_enqueue(queue: &mut Queue, priority: Priority, pvf: Pvf) -> Result<(), Fatal> {
async fn handle_enqueue(
queue: &mut Queue,
priority: Priority,
pvf: Pvf,
compilation_timeout: Duration,
) -> Result<(), Fatal> {
gum::debug!(
target: LOG_TARGET,
validation_code_hash = ?pvf.code_hash,
?priority,
?compilation_timeout,
"PVF is enqueued for preparation.",
);
queue.metrics.prepare_enqueued();
@@ -225,7 +236,7 @@ async fn handle_enqueue(queue: &mut Queue, priority: Priority, pvf: Pvf) -> Resu
"second Enqueue sent for a known artifact"
) {
// This function is called in response to a `Enqueue` message;
// Precondtion for `Enqueue` is that it is sent only once for a PVF;
// Precondition for `Enqueue` is that it is sent only once for a PVF;
// Thus this should always be `false`;
// qed.
gum::warn!(
@@ -236,7 +247,7 @@ async fn handle_enqueue(queue: &mut Queue, priority: Priority, pvf: Pvf) -> Resu
return Ok(())
}
let job = queue.jobs.insert(JobData { priority, pvf, worker: None });
let job = queue.jobs.insert(JobData { priority, pvf, compilation_timeout, worker: None });
queue.artifact_id_to_job.insert(artifact_id, job);
if let Some(available) = find_idle_worker(queue) {
@@ -424,7 +435,12 @@ async fn assign(queue: &mut Queue, worker: Worker, job: Job) -> Result<(), Fatal
send_pool(
&mut queue.to_pool_tx,
pool::ToPool::StartWork { worker, code: job_data.pvf.code.clone(), artifact_path },
pool::ToPool::StartWork {
worker,
code: job_data.pvf.code.clone(),
artifact_path,
compilation_timeout: job_data.compilation_timeout,
},
)
.await?;
@@ -478,7 +494,7 @@ pub fn start(
#[cfg(test)]
mod tests {
use super::*;
use crate::error::PrepareError;
use crate::{error::PrepareError, host::PRECHECK_COMPILATION_TIMEOUT};
use assert_matches::assert_matches;
use futures::{future::BoxFuture, FutureExt};
use slotmap::SlotMap;
@@ -571,7 +587,6 @@ mod tests {
async fn poll_ensure_to_pool_is_empty(&mut self) {
use futures_timer::Delay;
use std::time::Duration;
let to_pool_rx = &mut self.to_pool_rx;
run_until(
@@ -594,7 +609,11 @@ mod tests {
async fn properly_concludes() {
let mut test = Test::new(2, 2);
test.send_queue(ToQueue::Enqueue { priority: Priority::Normal, pvf: pvf(1) });
test.send_queue(ToQueue::Enqueue {
priority: Priority::Normal,
pvf: pvf(1),
compilation_timeout: PRECHECK_COMPILATION_TIMEOUT,
});
assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn);
let w = test.workers.insert(());
@@ -607,10 +626,12 @@ mod tests {
#[async_std::test]
async fn dont_spawn_over_soft_limit_unless_critical() {
let mut test = Test::new(2, 3);
let compilation_timeout = PRECHECK_COMPILATION_TIMEOUT;
test.send_queue(ToQueue::Enqueue { priority: Priority::Normal, pvf: pvf(1) });
test.send_queue(ToQueue::Enqueue { priority: Priority::Normal, pvf: pvf(2) });
test.send_queue(ToQueue::Enqueue { priority: Priority::Normal, pvf: pvf(3) });
let priority = Priority::Normal;
test.send_queue(ToQueue::Enqueue { priority, pvf: pvf(1), compilation_timeout });
test.send_queue(ToQueue::Enqueue { priority, pvf: pvf(2), compilation_timeout });
test.send_queue(ToQueue::Enqueue { priority, pvf: pvf(3), compilation_timeout });
// Receive only two spawns.
assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn);
@@ -631,7 +652,11 @@ mod tests {
assert_matches!(test.poll_and_recv_to_pool().await, pool::ToPool::StartWork { .. });
// Enqueue a critical job.
test.send_queue(ToQueue::Enqueue { priority: Priority::Critical, pvf: pvf(4) });
test.send_queue(ToQueue::Enqueue {
priority: Priority::Critical,
pvf: pvf(4),
compilation_timeout,
});
// 2 out of 2 are working, but there is a critical job incoming. That means that spawning
// another worker is warranted.
@@ -641,15 +666,24 @@ mod tests {
#[async_std::test]
async fn cull_unwanted() {
let mut test = Test::new(1, 2);
let compilation_timeout = PRECHECK_COMPILATION_TIMEOUT;
test.send_queue(ToQueue::Enqueue { priority: Priority::Normal, pvf: pvf(1) });
test.send_queue(ToQueue::Enqueue {
priority: Priority::Normal,
pvf: pvf(1),
compilation_timeout,
});
assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn);
let w1 = test.workers.insert(());
test.send_from_pool(pool::FromPool::Spawned(w1));
assert_matches!(test.poll_and_recv_to_pool().await, pool::ToPool::StartWork { .. });
// Enqueue a critical job, which warrants spawning over the soft limit.
test.send_queue(ToQueue::Enqueue { priority: Priority::Critical, pvf: pvf(2) });
test.send_queue(ToQueue::Enqueue {
priority: Priority::Critical,
pvf: pvf(2),
compilation_timeout,
});
assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn);
// However, before the new worker had a chance to spawn, the first worker finishes with its
@@ -667,9 +701,10 @@ mod tests {
async fn worker_mass_die_out_doesnt_stall_queue() {
let mut test = Test::new(2, 2);
test.send_queue(ToQueue::Enqueue { priority: Priority::Normal, pvf: pvf(1) });
test.send_queue(ToQueue::Enqueue { priority: Priority::Normal, pvf: pvf(2) });
test.send_queue(ToQueue::Enqueue { priority: Priority::Normal, pvf: pvf(3) });
let (priority, compilation_timeout) = (Priority::Normal, PRECHECK_COMPILATION_TIMEOUT);
test.send_queue(ToQueue::Enqueue { priority, pvf: pvf(1), compilation_timeout });
test.send_queue(ToQueue::Enqueue { priority, pvf: pvf(2), compilation_timeout });
test.send_queue(ToQueue::Enqueue { priority, pvf: pvf(3), compilation_timeout });
assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn);
assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn);
@@ -696,7 +731,11 @@ mod tests {
async fn doesnt_resurrect_ripped_worker_if_no_work() {
let mut test = Test::new(2, 2);
test.send_queue(ToQueue::Enqueue { priority: Priority::Normal, pvf: pvf(1) });
test.send_queue(ToQueue::Enqueue {
priority: Priority::Normal,
pvf: pvf(1),
compilation_timeout: PRECHECK_COMPILATION_TIMEOUT,
});
assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn);
@@ -717,7 +756,11 @@ mod tests {
async fn rip_for_start_work() {
let mut test = Test::new(2, 2);
test.send_queue(ToQueue::Enqueue { priority: Priority::Normal, pvf: pvf(1) });
test.send_queue(ToQueue::Enqueue {
priority: Priority::Normal,
pvf: pvf(1),
compilation_timeout: PRECHECK_COMPILATION_TIMEOUT,
});
assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn);
+2 -5
View File
@@ -32,10 +32,6 @@ use parity_scale_codec::{Decode, Encode};
use sp_core::hexdisplay::HexDisplay;
use std::{panic, sync::Arc, time::Duration};
/// The time period after which the preparation worker is considered unresponsive and will be killed.
// NOTE: If you change this make sure to fix the buckets of `pvf_preparation_time` metric.
const COMPILATION_TIMEOUT: Duration = Duration::from_secs(60);
/// Spawns a new worker with the given program path that acts as the worker and the spawn timeout.
///
/// The program should be able to handle `<program-path> prepare-worker <socket-path>` invocation.
@@ -69,6 +65,7 @@ pub async fn start_work(
code: Arc<Vec<u8>>,
cache_path: &Path,
artifact_path: PathBuf,
compilation_timeout: Duration,
) -> Outcome {
let IdleWorker { mut stream, pid } = worker;
@@ -103,7 +100,7 @@ pub async fn start_work(
}
let selected =
match async_std::future::timeout(COMPILATION_TIMEOUT, framed_recv(&mut stream)).await {
match async_std::future::timeout(compilation_timeout, framed_recv(&mut stream)).await {
Ok(Ok(response_bytes)) => {
// Received bytes from worker within the time limit.
// By convention we expect encoded `PrepareResult`.