Make candidate validation timeouts configurable (#4001)

* pvf: make execution timeout configurable

* guide: add timeouts to candidate validation params

* add timeouts to candidate validation messages

* fmt

* port backing to use the backing pvf timeout

* port approval-voting to use the execution timeout

* port dispute participation to use the correct timeout

* fmt

* address grumbles & test failure
This commit is contained in:
Robert Habermeier
2021-10-04 16:53:36 +02:00
committed by GitHub
parent 114e757988
commit 6002865874
19 changed files with 192 additions and 62 deletions
+16 -4
View File
@@ -38,11 +38,17 @@ slotmap::new_key_type! { struct Worker; }
#[derive(Debug)]
pub enum ToQueue {
Enqueue { artifact: ArtifactPathId, params: Vec<u8>, result_tx: ResultSender },
Enqueue {
artifact: ArtifactPathId,
execution_timeout: Duration,
params: Vec<u8>,
result_tx: ResultSender,
},
}
struct ExecuteJob {
artifact: ArtifactPathId,
execution_timeout: Duration,
params: Vec<u8>,
result_tx: ResultSender,
}
@@ -167,14 +173,14 @@ async fn purge_dead(metrics: &Metrics, workers: &mut Workers) {
}
fn handle_to_queue(queue: &mut Queue, to_queue: ToQueue) {
let ToQueue::Enqueue { artifact, params, result_tx } = to_queue;
let ToQueue::Enqueue { artifact, execution_timeout, params, result_tx } = to_queue;
tracing::debug!(
target: LOG_TARGET,
validation_code_hash = ?artifact.id.code_hash,
"enqueueing an artifact for execution",
);
queue.metrics.execute_enqueued();
let job = ExecuteJob { artifact, params, result_tx };
let job = ExecuteJob { artifact, execution_timeout, params, result_tx };
if let Some(available) = queue.workers.find_available() {
assign(queue, available, job);
@@ -326,7 +332,13 @@ fn assign(queue: &mut Queue, worker: Worker, job: ExecuteJob) {
queue.mux.push(
async move {
let _timer = execution_timer;
let outcome = super::worker::start_work(idle, job.artifact.clone(), job.params).await;
let outcome = super::worker::start_work(
idle,
job.artifact.clone(),
job.execution_timeout,
job.params,
)
.await;
QueueEvent::StartWork(worker, outcome, job.artifact.id, job.result_tx)
}
.boxed(),
+2 -3
View File
@@ -34,8 +34,6 @@ use parity_scale_codec::{Decode, Encode};
use polkadot_parachain::primitives::ValidationResult;
use std::time::{Duration, Instant};
const EXECUTION_TIMEOUT: Duration = Duration::from_secs(3);
/// Spawns a new worker with the given program path that acts as the worker and the spawn timeout.
///
/// The program should be able to handle `<program-path> execute-worker <socket-path>` invocation.
@@ -69,6 +67,7 @@ pub enum Outcome {
pub async fn start_work(
worker: IdleWorker,
artifact: ArtifactPathId,
execution_timeout: Duration,
validation_params: Vec<u8>,
) -> Outcome {
let IdleWorker { mut stream, pid } = worker;
@@ -108,7 +107,7 @@ pub async fn start_work(
Ok(response) => response,
}
},
_ = Delay::new(EXECUTION_TIMEOUT).fuse() => {
_ = Delay::new(execution_timeout).fuse() => {
tracing::warn!(
target: LOG_TARGET,
worker_pid = %pid,
+74 -26
View File
@@ -48,8 +48,8 @@ pub struct ValidationHost {
}
impl ValidationHost {
/// Execute PVF with the given code, parameters and priority. The result of execution will be sent
/// to the provided result sender.
/// Execute PVF with the given code, execution timeout, parameters and priority.
/// The result of execution will be sent to the provided result sender.
///
/// This is async to accommodate the fact a possibility of back-pressure. In the vast majority of
/// situations this function should return immediately.
@@ -58,12 +58,13 @@ impl ValidationHost {
pub async fn execute_pvf(
&mut self,
pvf: Pvf,
execution_timeout: Duration,
params: Vec<u8>,
priority: Priority,
result_tx: ResultSender,
) -> Result<(), String> {
self.to_host_tx
.send(ToHost::ExecutePvf { pvf, params, priority, result_tx })
.send(ToHost::ExecutePvf { pvf, execution_timeout, params, priority, result_tx })
.await
.map_err(|_| "the inner loop hung up".to_string())
}
@@ -83,8 +84,16 @@ impl ValidationHost {
}
enum ToHost {
ExecutePvf { pvf: Pvf, params: Vec<u8>, priority: Priority, result_tx: ResultSender },
HeadsUp { active_pvfs: Vec<Pvf> },
ExecutePvf {
pvf: Pvf,
execution_timeout: Duration,
params: Vec<u8>,
priority: Priority,
result_tx: ResultSender,
},
HeadsUp {
active_pvfs: Vec<Pvf>,
},
}
/// Configuration for the validation host.
@@ -200,6 +209,7 @@ pub fn start(config: Config, metrics: Metrics) -> (ValidationHost, impl Future<O
/// to the given result sender.
#[derive(Debug)]
struct PendingExecutionRequest {
execution_timeout: Duration,
params: Vec<u8>,
result_tx: ResultSender,
}
@@ -210,11 +220,18 @@ struct PendingExecutionRequest {
struct AwaitingPrepare(HashMap<ArtifactId, Vec<PendingExecutionRequest>>);
impl AwaitingPrepare {
fn add(&mut self, artifact_id: ArtifactId, params: Vec<u8>, result_tx: ResultSender) {
self.0
.entry(artifact_id)
.or_default()
.push(PendingExecutionRequest { params, result_tx });
fn add(
&mut self,
artifact_id: ArtifactId,
execution_timeout: Duration,
params: Vec<u8>,
result_tx: ResultSender,
) {
self.0.entry(artifact_id).or_default().push(PendingExecutionRequest {
execution_timeout,
params,
result_tx,
});
}
fn take(&mut self, artifact_id: &ArtifactId) -> Vec<PendingExecutionRequest> {
@@ -360,7 +377,7 @@ async fn handle_to_host(
to_host: ToHost,
) -> Result<(), Fatal> {
match to_host {
ToHost::ExecutePvf { pvf, params, priority, result_tx } => {
ToHost::ExecutePvf { pvf, execution_timeout, params, priority, result_tx } => {
handle_execute_pvf(
cache_path,
artifacts,
@@ -368,6 +385,7 @@ async fn handle_to_host(
execute_queue,
awaiting_prepare,
pvf,
execution_timeout,
params,
priority,
result_tx,
@@ -389,6 +407,7 @@ async fn handle_execute_pvf(
execute_queue: &mut mpsc::Sender<execute::ToQueue>,
awaiting_prepare: &mut AwaitingPrepare,
pvf: Pvf,
execution_timeout: Duration,
params: Vec<u8>,
priority: Priority,
result_tx: ResultSender,
@@ -404,6 +423,7 @@ async fn handle_execute_pvf(
execute_queue,
execute::ToQueue::Enqueue {
artifact: ArtifactPathId::new(artifact_id, cache_path),
execution_timeout,
params,
result_tx,
},
@@ -417,7 +437,7 @@ async fn handle_execute_pvf(
)
.await?;
awaiting_prepare.add(artifact_id, params, result_tx);
awaiting_prepare.add(artifact_id, execution_timeout, params, result_tx);
},
}
} else {
@@ -426,7 +446,7 @@ async fn handle_execute_pvf(
artifacts.insert_preparing(artifact_id.clone());
send_prepare(prepare_queue, prepare::ToQueue::Enqueue { priority, pvf }).await?;
awaiting_prepare.add(artifact_id, params, result_tx);
awaiting_prepare.add(artifact_id, execution_timeout, params, result_tx);
}
return Ok(())
@@ -499,7 +519,7 @@ async fn handle_prepare_done(
// It's finally time to dispatch all the execution requests that were waiting for this artifact
// to be prepared.
let pending_requests = awaiting_prepare.take(&artifact_id);
for PendingExecutionRequest { params, result_tx } in pending_requests {
for PendingExecutionRequest { execution_timeout, params, result_tx } in pending_requests {
if result_tx.is_canceled() {
// Preparation could've taken quite a bit of time and the requester may be not interested
// in execution anymore, in which case we just skip the request.
@@ -510,6 +530,7 @@ async fn handle_prepare_done(
execute_queue,
execute::ToQueue::Enqueue {
artifact: ArtifactPathId::new(artifact_id.clone(), cache_path),
execution_timeout,
params,
result_tx,
},
@@ -597,6 +618,8 @@ mod tests {
use assert_matches::assert_matches;
use futures::future::BoxFuture;
const TEST_EXECUTION_TIMEOUT: Duration = Duration::from_secs(3);
#[async_std::test]
async fn pulse_test() {
let pulse = pulse_every(Duration::from_millis(100));
@@ -840,9 +863,15 @@ mod tests {
.await;
let (result_tx, _result_rx) = oneshot::channel();
host.execute_pvf(Pvf::from_discriminator(1), vec![], Priority::Critical, result_tx)
.await
.unwrap();
host.execute_pvf(
Pvf::from_discriminator(1),
TEST_EXECUTION_TIMEOUT,
vec![],
Priority::Critical,
result_tx,
)
.await
.unwrap();
run_until(
&mut test.run,
@@ -862,13 +891,20 @@ mod tests {
let mut host = test.host_handle();
let (result_tx, result_rx_pvf_1_1) = oneshot::channel();
host.execute_pvf(Pvf::from_discriminator(1), b"pvf1".to_vec(), Priority::Normal, result_tx)
.await
.unwrap();
host.execute_pvf(
Pvf::from_discriminator(1),
TEST_EXECUTION_TIMEOUT,
b"pvf1".to_vec(),
Priority::Normal,
result_tx,
)
.await
.unwrap();
let (result_tx, result_rx_pvf_1_2) = oneshot::channel();
host.execute_pvf(
Pvf::from_discriminator(1),
TEST_EXECUTION_TIMEOUT,
b"pvf1".to_vec(),
Priority::Critical,
result_tx,
@@ -877,9 +913,15 @@ mod tests {
.unwrap();
let (result_tx, result_rx_pvf_2) = oneshot::channel();
host.execute_pvf(Pvf::from_discriminator(2), b"pvf2".to_vec(), Priority::Normal, result_tx)
.await
.unwrap();
host.execute_pvf(
Pvf::from_discriminator(2),
TEST_EXECUTION_TIMEOUT,
b"pvf2".to_vec(),
Priority::Normal,
result_tx,
)
.await
.unwrap();
assert_matches!(
test.poll_and_recv_to_prepare_queue().await,
@@ -947,9 +989,15 @@ mod tests {
let mut host = test.host_handle();
let (result_tx, result_rx) = oneshot::channel();
host.execute_pvf(Pvf::from_discriminator(1), b"pvf1".to_vec(), Priority::Normal, result_tx)
.await
.unwrap();
host.execute_pvf(
Pvf::from_discriminator(1),
TEST_EXECUTION_TIMEOUT,
b"pvf1".to_vec(),
Priority::Normal,
result_tx,
)
.await
.unwrap();
assert_matches!(
test.poll_and_recv_to_prepare_queue().await,