Make candidate validation bounded again (#2125)

This PR aims to channel the backpressure of the PVF host's preparation
and execution queues to the candidate validation subsystem consumers.

Related: #708
This commit is contained in:
s0me0ne-unkn0wn
2024-01-21 14:56:44 +01:00
committed by GitHub
parent 21ef949b6e
commit d37a45650e
13 changed files with 199 additions and 113 deletions
@@ -55,10 +55,11 @@ use polkadot_primitives::{
use parity_scale_codec::Encode;
use futures::{channel::oneshot, prelude::*};
use futures::{channel::oneshot, prelude::*, stream::FuturesUnordered};
use std::{
path::PathBuf,
pin::Pin,
sync::Arc,
time::{Duration, Instant},
};
@@ -81,6 +82,11 @@ const PVF_APPROVAL_EXECUTION_RETRY_DELAY: Duration = Duration::from_secs(3);
#[cfg(test)]
const PVF_APPROVAL_EXECUTION_RETRY_DELAY: Duration = Duration::from_millis(200);
// The task queue size is chosen to be somewhat bigger than the PVF host incoming queue size
// to allow exhaustive validation messages to fall through in case the tasks are clogged with
// `ValidateFromChainState` messages awaiting data from the runtime
const TASK_LIMIT: usize = 30;
/// Configuration for the candidate validation subsystem
#[derive(Clone)]
pub struct Config {
@@ -130,6 +136,83 @@ impl<Context> CandidateValidationSubsystem {
}
}
fn handle_validation_message<S>(
mut sender: S,
validation_host: ValidationHost,
metrics: Metrics,
msg: CandidateValidationMessage,
) -> Pin<Box<dyn Future<Output = ()> + Send>>
where
S: SubsystemSender<RuntimeApiMessage>,
{
match msg {
CandidateValidationMessage::ValidateFromChainState {
candidate_receipt,
pov,
executor_params,
exec_kind,
response_sender,
..
} => async move {
let _timer = metrics.time_validate_from_chain_state();
let res = validate_from_chain_state(
&mut sender,
validation_host,
candidate_receipt,
pov,
executor_params,
exec_kind,
&metrics,
)
.await;
metrics.on_validation_event(&res);
let _ = response_sender.send(res);
}
.boxed(),
CandidateValidationMessage::ValidateFromExhaustive {
validation_data,
validation_code,
candidate_receipt,
pov,
executor_params,
exec_kind,
response_sender,
..
} => async move {
let _timer = metrics.time_validate_from_exhaustive();
let res = validate_candidate_exhaustive(
validation_host,
validation_data,
validation_code,
candidate_receipt,
pov,
executor_params,
exec_kind,
&metrics,
)
.await;
metrics.on_validation_event(&res);
let _ = response_sender.send(res);
}
.boxed(),
CandidateValidationMessage::PreCheck {
relay_parent,
validation_code_hash,
response_sender,
..
} => async move {
let precheck_result =
precheck_pvf(&mut sender, validation_host, relay_parent, validation_code_hash)
.await;
let _ = response_sender.send(precheck_result);
}
.boxed(),
}
}
#[overseer::contextbounds(CandidateValidation, prefix = self::overseer)]
async fn run<Context>(
mut ctx: Context,
@@ -156,106 +239,48 @@ async fn run<Context>(
.await?;
ctx.spawn_blocking("pvf-validation-host", task.boxed())?;
let mut tasks = FuturesUnordered::new();
loop {
match ctx.recv().await? {
FromOrchestra::Signal(OverseerSignal::ActiveLeaves(_)) => {},
FromOrchestra::Signal(OverseerSignal::BlockFinalized(..)) => {},
FromOrchestra::Signal(OverseerSignal::Conclude) => return Ok(()),
FromOrchestra::Communication { msg } => match msg {
CandidateValidationMessage::ValidateFromChainState {
candidate_receipt,
pov,
executor_params,
exec_kind,
response_sender,
..
} => {
let bg = {
let mut sender = ctx.sender().clone();
let metrics = metrics.clone();
let validation_host = validation_host.clone();
async move {
let _timer = metrics.time_validate_from_chain_state();
let res = validate_from_chain_state(
&mut sender,
validation_host,
candidate_receipt,
pov,
executor_params,
exec_kind,
&metrics,
)
.await;
metrics.on_validation_event(&res);
let _ = response_sender.send(res);
}
};
ctx.spawn("validate-from-chain-state", bg.boxed())?;
loop {
futures::select! {
comm = ctx.recv().fuse() => {
match comm {
Ok(FromOrchestra::Signal(OverseerSignal::ActiveLeaves(_))) => {},
Ok(FromOrchestra::Signal(OverseerSignal::BlockFinalized(..))) => {},
Ok(FromOrchestra::Signal(OverseerSignal::Conclude)) => return Ok(()),
Ok(FromOrchestra::Communication { msg }) => {
let task = handle_validation_message(ctx.sender().clone(), validation_host.clone(), metrics.clone(), msg);
tasks.push(task);
if tasks.len() >= TASK_LIMIT {
break
}
},
Err(e) => return Err(SubsystemError::from(e)),
}
},
CandidateValidationMessage::ValidateFromExhaustive {
validation_data,
validation_code,
candidate_receipt,
pov,
executor_params,
exec_kind,
response_sender,
..
} => {
let bg = {
let metrics = metrics.clone();
let validation_host = validation_host.clone();
_ = tasks.select_next_some() => ()
}
}
async move {
let _timer = metrics.time_validate_from_exhaustive();
let res = validate_candidate_exhaustive(
validation_host,
validation_data,
validation_code,
candidate_receipt,
pov,
executor_params,
exec_kind,
&metrics,
)
.await;
gum::debug!(target: LOG_TARGET, "Validation task limit hit");
metrics.on_validation_event(&res);
let _ = response_sender.send(res);
}
};
ctx.spawn("validate-from-exhaustive", bg.boxed())?;
loop {
futures::select! {
signal = ctx.recv_signal().fuse() => {
match signal {
Ok(OverseerSignal::ActiveLeaves(_)) => {},
Ok(OverseerSignal::BlockFinalized(..)) => {},
Ok(OverseerSignal::Conclude) => return Ok(()),
Err(e) => return Err(SubsystemError::from(e)),
}
},
CandidateValidationMessage::PreCheck {
relay_parent,
validation_code_hash,
response_sender,
..
} => {
let bg = {
let mut sender = ctx.sender().clone();
let validation_host = validation_host.clone();
async move {
let precheck_result = precheck_pvf(
&mut sender,
validation_host,
relay_parent,
validation_code_hash,
)
.await;
let _ = response_sender.send(precheck_result);
}
};
ctx.spawn("candidate-validation-pre-check", bg.boxed())?;
},
},
_ = tasks.select_next_some() => {
if tasks.len() < TASK_LIMIT {
break
}
}
}
}
}
}