// Copyright (C) Parity Technologies (UK) Ltd. and Dijital Kurdistan Tech Institute
// This file is part of Pezkuwi.
// Pezkuwi is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Pezkuwi is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Pezkuwi. If not, see .
//! The Candidate Validation subsystem.
//!
//! This handles incoming requests from other subsystems to validate candidates
//! according to a validation function. This delegates validation to an underlying
//! pool of processes used for execution of the Wasm.
#![deny(unused_crate_dependencies, unused_results)]
#![warn(missing_docs)]
use pezkuwi_node_core_pvf::{
InternalValidationError, InvalidCandidate as WasmInvalidCandidate, PossiblyInvalidError,
PrepareError, PrepareJobKind, PvfPrepData, ValidationError, ValidationHost,
};
use pezkuwi_node_subsystem::{
errors::RuntimeApiError,
messages::{
CandidateValidationMessage, ChainApiMessage, PreCheckOutcome, PvfExecKind,
RuntimeApiMessage, RuntimeApiRequest, ValidationFailed,
},
overseer, FromOrchestra, OverseerSignal, SpawnedSubsystem, SubsystemError, SubsystemResult,
SubsystemSender,
};
use pezkuwi_node_subsystem_util::{
self as util,
runtime::{fetch_scheduling_lookahead, ClaimQueueSnapshot},
};
use pezkuwi_overseer::{ActivatedLeaf, ActiveLeavesUpdate};
use pezkuwi_pez_node_primitives::{InvalidCandidate, PoV, ValidationResult};
use pezkuwi_primitives::{
executor_params::{
DEFAULT_APPROVAL_EXECUTION_TIMEOUT, DEFAULT_BACKING_EXECUTION_TIMEOUT,
DEFAULT_LENIENT_PREPARATION_TIMEOUT, DEFAULT_PRECHECK_PREPARATION_TIMEOUT,
},
transpose_claim_queue, AuthorityDiscoveryId, CandidateCommitments,
CandidateDescriptorV2 as CandidateDescriptor, CandidateEvent,
CandidateReceiptV2 as CandidateReceipt,
CommittedCandidateReceiptV2 as CommittedCandidateReceipt, ExecutorParams, Hash,
PersistedValidationData, PvfExecKind as RuntimePvfExecKind, PvfPrepKind, SessionIndex,
ValidationCode, ValidationCodeHash, ValidatorId,
};
use pezkuwi_teyrchain_primitives::primitives::ValidationResult as WasmValidationResult;
use pezsp_application_crypto::{AppCrypto, ByteArray};
use pezsp_keystore::KeystorePtr;
use codec::Encode;
use futures::{channel::oneshot, prelude::*, stream::FuturesUnordered};
use std::{
collections::HashSet,
path::PathBuf,
pin::Pin,
sync::Arc,
time::{Duration, Instant},
};
use async_trait::async_trait;
mod metrics;
use self::metrics::Metrics;
#[cfg(test)]
mod tests;
const LOG_TARGET: &'static str = "teyrchain::candidate-validation";
/// The amount of time to wait before retrying after a retry-able approval validation error. We use
/// a higher value for the approval case since we have more time, and if we wait longer it is more
/// likely that transient conditions will resolve.
#[cfg(not(test))]
const PVF_APPROVAL_EXECUTION_RETRY_DELAY: Duration = Duration::from_secs(3);
#[cfg(test)]
const PVF_APPROVAL_EXECUTION_RETRY_DELAY: Duration = Duration::from_millis(200);
// The task queue size is chosen to be somewhat bigger than the PVF host incoming queue size
// to allow exhaustive validation messages to fall through in case the tasks are clogged
const TASK_LIMIT: usize = 30;
/// Configuration for the candidate validation subsystem
#[derive(Clone, Default)]
pub struct Config {
/// The path where candidate validation can store compiled artifacts for PVFs.
pub artifacts_cache_path: PathBuf,
/// The version of the node. `None` can be passed to skip the version check (only for tests).
pub node_version: Option,
/// Whether the node is attempting to run as a secure validator.
pub secure_validator_mode: bool,
/// Path to the preparation worker binary
pub prep_worker_path: PathBuf,
/// Path to the execution worker binary
pub exec_worker_path: PathBuf,
/// The maximum number of pvf execution workers.
pub pvf_execute_workers_max_num: usize,
/// The maximum number of pvf workers that can be spawned in the pvf prepare pool for tasks
/// with the priority below critical.
pub pvf_prepare_workers_soft_max_num: usize,
/// The absolute number of pvf workers that can be spawned in the pvf prepare pool.
pub pvf_prepare_workers_hard_max_num: usize,
}
/// The candidate validation subsystem.
pub struct CandidateValidationSubsystem {
keystore: KeystorePtr,
#[allow(missing_docs)]
pub metrics: Metrics,
#[allow(missing_docs)]
pub pvf_metrics: pezkuwi_node_core_pvf::Metrics,
config: Option,
}
impl CandidateValidationSubsystem {
/// Create a new `CandidateValidationSubsystem`.
pub fn with_config(
config: Option,
keystore: KeystorePtr,
metrics: Metrics,
pvf_metrics: pezkuwi_node_core_pvf::Metrics,
) -> Self {
CandidateValidationSubsystem { keystore, config, metrics, pvf_metrics }
}
}
#[overseer::subsystem(CandidateValidation, error=SubsystemError, prefix=self::overseer)]
impl CandidateValidationSubsystem {
fn start(self, ctx: Context) -> SpawnedSubsystem {
if let Some(config) = self.config {
let future = run(ctx, self.keystore, self.metrics, self.pvf_metrics, config)
.map_err(|e| SubsystemError::with_origin("candidate-validation", e))
.boxed();
SpawnedSubsystem { name: "candidate-validation-subsystem", future }
} else {
pezkuwi_overseer::DummySubsystem.start(ctx)
}
}
}
// Returns the claim queue at relay parent and logs a warning if it is not available.
async fn claim_queue(relay_parent: Hash, sender: &mut Sender) -> Option
where
Sender: SubsystemSender,
{
match util::runtime::fetch_claim_queue(sender, relay_parent).await {
Ok(cq) => Some(cq),
Err(err) => {
gum::warn!(
target: LOG_TARGET,
?relay_parent,
?err,
"Claim queue not available"
);
None
},
}
}
fn handle_validation_message(
mut sender: S,
validation_host: ValidationHost,
metrics: Metrics,
msg: CandidateValidationMessage,
) -> Pin + Send>>
where
S: SubsystemSender,
{
match msg {
CandidateValidationMessage::ValidateFromExhaustive {
validation_data,
validation_code,
candidate_receipt,
pov,
executor_params,
exec_kind,
response_sender,
..
} => async move {
let _timer = metrics.time_validate_from_exhaustive();
let relay_parent = candidate_receipt.descriptor.relay_parent();
let maybe_claim_queue = claim_queue(relay_parent, &mut sender).await;
let Some(session_index) = get_session_index(&mut sender, relay_parent).await else {
let error = "cannot fetch session index from the runtime";
gum::warn!(
target: LOG_TARGET,
?relay_parent,
error,
);
let _ = response_sender
.send(Err(ValidationFailed("Session index not found".to_string())));
return;
};
// This will return a default value for the limit if runtime API is not available.
// however we still error out if there is a weird runtime API error.
let Ok(validation_code_bomb_limit) = util::runtime::fetch_validation_code_bomb_limit(
relay_parent,
session_index,
&mut sender,
)
.await
else {
let error = "cannot fetch validation code bomb limit from the runtime";
gum::warn!(
target: LOG_TARGET,
?relay_parent,
error,
);
let _ = response_sender.send(Err(ValidationFailed(
"Validation code bomb limit not available".to_string(),
)));
return;
};
let res = validate_candidate_exhaustive(
session_index,
validation_host,
validation_data,
validation_code,
candidate_receipt,
pov,
executor_params,
exec_kind,
&metrics,
maybe_claim_queue,
validation_code_bomb_limit,
)
.await;
metrics.on_validation_event(&res);
let _ = response_sender.send(res);
}
.boxed(),
CandidateValidationMessage::PreCheck {
relay_parent,
validation_code_hash,
response_sender,
..
} => async move {
let Some(session_index) = get_session_index(&mut sender, relay_parent).await else {
let error = "cannot fetch session index from the runtime";
gum::warn!(
target: LOG_TARGET,
?relay_parent,
error,
);
let _ = response_sender.send(PreCheckOutcome::Failed);
return;
};
// This will return a default value for the limit if runtime API is not available.
// however we still error out if there is a weird runtime API error.
let Ok(validation_code_bomb_limit) = util::runtime::fetch_validation_code_bomb_limit(
relay_parent,
session_index,
&mut sender,
)
.await
else {
let error = "cannot fetch validation code bomb limit from the runtime";
gum::warn!(
target: LOG_TARGET,
?relay_parent,
error,
);
let _ = response_sender.send(PreCheckOutcome::Failed);
return;
};
let precheck_result = precheck_pvf(
&mut sender,
validation_host,
relay_parent,
validation_code_hash,
validation_code_bomb_limit,
)
.await;
let _ = response_sender.send(precheck_result);
}
.boxed(),
}
}
#[overseer::contextbounds(CandidateValidation, prefix = self::overseer)]
async fn run(
mut ctx: Context,
keystore: KeystorePtr,
metrics: Metrics,
pvf_metrics: pezkuwi_node_core_pvf::Metrics,
Config {
artifacts_cache_path,
node_version,
secure_validator_mode,
prep_worker_path,
exec_worker_path,
pvf_execute_workers_max_num,
pvf_prepare_workers_soft_max_num,
pvf_prepare_workers_hard_max_num,
}: Config,
) -> SubsystemResult<()> {
let (mut validation_host, task) = pezkuwi_node_core_pvf::start(
pezkuwi_node_core_pvf::Config::new(
artifacts_cache_path,
node_version,
secure_validator_mode,
prep_worker_path,
exec_worker_path,
pvf_execute_workers_max_num,
pvf_prepare_workers_soft_max_num,
pvf_prepare_workers_hard_max_num,
),
pvf_metrics,
)
.await?;
ctx.spawn_blocking("pvf-validation-host", task.boxed())?;
let mut tasks = FuturesUnordered::new();
let mut prepare_state = PrepareValidationState::default();
loop {
loop {
futures::select! {
comm = ctx.recv().fuse() => {
match comm {
Ok(FromOrchestra::Signal(OverseerSignal::ActiveLeaves(update))) => {
handle_active_leaves_update(
ctx.sender(),
keystore.clone(),
&mut validation_host,
update,
&mut prepare_state,
).await
},
Ok(FromOrchestra::Signal(OverseerSignal::BlockFinalized(..))) => {},
Ok(FromOrchestra::Signal(OverseerSignal::Conclude)) => return Ok(()),
Ok(FromOrchestra::Communication { msg }) => {
let task = handle_validation_message(ctx.sender().clone(), validation_host.clone(), metrics.clone(), msg);
tasks.push(task);
if tasks.len() >= TASK_LIMIT {
break
}
},
Err(e) => return Err(SubsystemError::from(e)),
}
},
_ = tasks.select_next_some() => ()
}
}
gum::debug!(target: LOG_TARGET, "Validation task limit hit");
loop {
futures::select! {
signal = ctx.recv_signal().fuse() => {
match signal {
Ok(OverseerSignal::ActiveLeaves(_)) => {},
Ok(OverseerSignal::BlockFinalized(..)) => {},
Ok(OverseerSignal::Conclude) => return Ok(()),
Err(e) => return Err(SubsystemError::from(e)),
}
},
_ = tasks.select_next_some() => {
if tasks.len() < TASK_LIMIT {
break
}
}
}
}
}
}
struct PrepareValidationState {
session_index: Option,
is_next_session_authority: bool,
// PVF host won't prepare the same code hash twice, so here we just avoid extra communication
already_prepared_code_hashes: HashSet,
// How many PVFs per block we take to prepare themselves for the next session validation
per_block_limit: usize,
}
impl Default for PrepareValidationState {
fn default() -> Self {
Self {
session_index: None,
is_next_session_authority: false,
already_prepared_code_hashes: HashSet::new(),
per_block_limit: 1,
}
}
}
async fn handle_active_leaves_update(
sender: &mut Sender,
keystore: KeystorePtr,
validation_host: &mut impl ValidationBackend,
update: ActiveLeavesUpdate,
prepare_state: &mut PrepareValidationState,
) where
Sender: SubsystemSender + SubsystemSender,
{
let maybe_session_index = update_active_leaves(sender, validation_host, update.clone()).await;
if let Some(activated) = update.activated {
let maybe_new_session_index = match (prepare_state.session_index, maybe_session_index) {
(Some(existing_index), Some(new_index)) => {
(new_index > existing_index).then_some(new_index)
},
(None, Some(new_index)) => Some(new_index),
_ => None,
};
maybe_prepare_validation(
sender,
keystore.clone(),
validation_host,
activated,
prepare_state,
maybe_new_session_index,
)
.await;
}
}
async fn maybe_prepare_validation(
sender: &mut Sender,
keystore: KeystorePtr,
validation_backend: &mut impl ValidationBackend,
leaf: ActivatedLeaf,
state: &mut PrepareValidationState,
new_session_index: Option,
) where
Sender: SubsystemSender,
{
if new_session_index.is_some() {
state.session_index = new_session_index;
state.already_prepared_code_hashes.clear();
state.is_next_session_authority = check_next_session_authority(
sender,
keystore,
leaf.hash,
state.session_index.expect("qed: just checked above"),
)
.await;
}
// On every active leaf check candidates and prepare PVFs our node doesn't have yet.
if state.is_next_session_authority {
let code_hashes = prepare_pvfs_for_backed_candidates(
sender,
validation_backend,
leaf.hash,
&state.already_prepared_code_hashes,
state.per_block_limit,
)
.await;
state.already_prepared_code_hashes.extend(code_hashes.unwrap_or_default());
}
}
async fn get_session_index(sender: &mut Sender, relay_parent: Hash) -> Option
where
Sender: SubsystemSender,
{
let Ok(Ok(session_index)) =
util::request_session_index_for_child(relay_parent, sender).await.await
else {
gum::warn!(
target: LOG_TARGET,
?relay_parent,
"cannot fetch session index from runtime API",
);
return None;
};
Some(session_index)
}
// Returns true if the node is an authority in the next session.
async fn check_next_session_authority(
sender: &mut Sender,
keystore: KeystorePtr,
relay_parent: Hash,
session_index: SessionIndex,
) -> bool
where
Sender: SubsystemSender,
{
// In spite of function name here we request past, present and future authorities.
// It's ok to stil prepare PVFs in other cases, but better to request only future ones.
let Ok(Ok(authorities)) = util::request_authorities(relay_parent, sender).await.await else {
gum::warn!(
target: LOG_TARGET,
?relay_parent,
"cannot fetch authorities from runtime API",
);
return false;
};
// We need to exclude at least current session authority from the previous request
let Ok(Ok(Some(session_info))) =
util::request_session_info(relay_parent, session_index, sender).await.await
else {
gum::warn!(
target: LOG_TARGET,
?relay_parent,
"cannot fetch session info from runtime API",
);
return false;
};
let is_past_present_or_future_authority = authorities
.iter()
.any(|v| keystore.has_keys(&[(v.to_raw_vec(), AuthorityDiscoveryId::ID)]));
// We could've checked discovery_keys but on Dicle validators.len() < discovery_keys.len().
let is_present_validator = session_info
.validators
.iter()
.any(|v| keystore.has_keys(&[(v.to_raw_vec(), ValidatorId::ID)]));
// There is still a chance to be a previous session authority, but this extra work does not
// affect the finalization.
is_past_present_or_future_authority && !is_present_validator
}
// Sends PVF with unknown code hashes to the validation host returning the list of code hashes sent.
async fn prepare_pvfs_for_backed_candidates(
sender: &mut Sender,
validation_backend: &mut impl ValidationBackend,
relay_parent: Hash,
already_prepared: &HashSet,
per_block_limit: usize,
) -> Option>
where
Sender: SubsystemSender,
{
let Ok(Ok(events)) = util::request_candidate_events(relay_parent, sender).await.await else {
gum::warn!(
target: LOG_TARGET,
?relay_parent,
"cannot fetch candidate events from runtime API",
);
return None;
};
let code_hashes = events
.into_iter()
.filter_map(|e| match e {
CandidateEvent::CandidateBacked(receipt, ..) => {
let h = receipt.descriptor.validation_code_hash();
if already_prepared.contains(&h) {
None
} else {
Some(h)
}
},
_ => None,
})
.take(per_block_limit)
.collect::>();
let Ok(executor_params) = util::executor_params_at_relay_parent(relay_parent, sender).await
else {
gum::warn!(
target: LOG_TARGET,
?relay_parent,
"cannot fetch executor params for the session",
);
return None;
};
let timeout = pvf_prep_timeout(&executor_params, PvfPrepKind::Prepare);
let mut active_pvfs = vec![];
let mut processed_code_hashes = vec![];
for code_hash in code_hashes {
let Ok(Ok(Some(validation_code))) =
util::request_validation_code_by_hash(relay_parent, code_hash, sender)
.await
.await
else {
gum::warn!(
target: LOG_TARGET,
?relay_parent,
?code_hash,
"cannot fetch validation code hash from runtime API",
);
continue;
};
let Some(session_index) = get_session_index(sender, relay_parent).await else { continue };
let validation_code_bomb_limit = match util::runtime::fetch_validation_code_bomb_limit(
relay_parent,
session_index,
sender,
)
.await
{
Ok(limit) => limit,
Err(err) => {
gum::warn!(
target: LOG_TARGET,
?relay_parent,
?err,
"cannot fetch validation code bomb limit from runtime API",
);
continue;
},
};
let pvf = PvfPrepData::from_code(
validation_code.0,
executor_params.clone(),
timeout,
PrepareJobKind::Prechecking,
validation_code_bomb_limit,
);
active_pvfs.push(pvf);
processed_code_hashes.push(code_hash);
}
if active_pvfs.is_empty() {
return None;
}
if let Err(err) = validation_backend.heads_up(active_pvfs).await {
gum::warn!(
target: LOG_TARGET,
?relay_parent,
?err,
"cannot prepare PVF for the next session",
);
return None;
};
gum::debug!(
target: LOG_TARGET,
?relay_parent,
?processed_code_hashes,
"Prepared PVF for the next session",
);
Some(processed_code_hashes)
}
async fn update_active_leaves(
sender: &mut Sender,
validation_backend: &mut impl ValidationBackend,
update: ActiveLeavesUpdate,
) -> Option
where
Sender: SubsystemSender + SubsystemSender,
{
let maybe_new_leaf = if let Some(activated) = &update.activated {
get_session_index(sender, activated.hash)
.await
.map(|index| (activated.hash, index))
} else {
None
};
let ancestors = get_block_ancestors(sender, maybe_new_leaf).await;
if let Err(err) = validation_backend.update_active_leaves(update, ancestors).await {
gum::warn!(
target: LOG_TARGET,
?err,
"cannot update active leaves in validation backend",
);
};
maybe_new_leaf.map(|l| l.1)
}
async fn get_block_ancestors(
sender: &mut Sender,
maybe_new_leaf: Option<(Hash, SessionIndex)>,
) -> Vec
where
Sender: SubsystemSender + SubsystemSender,
{
let Some((relay_parent, session_index)) = maybe_new_leaf else { return vec![] };
let scheduling_lookahead =
match fetch_scheduling_lookahead(relay_parent, session_index, sender).await {
Ok(scheduling_lookahead) => scheduling_lookahead,
res => {
gum::warn!(target: LOG_TARGET, ?res, "Failed to request scheduling lookahead");
return vec![];
},
};
let (tx, rx) = oneshot::channel();
sender
.send_message(ChainApiMessage::Ancestors {
hash: relay_parent,
// Subtract 1 from the claim queue length, as it includes current `relay_parent`.
k: scheduling_lookahead.saturating_sub(1) as usize,
response_channel: tx,
})
.await;
match rx.await {
Ok(Ok(x)) => x,
res => {
gum::warn!(target: LOG_TARGET, ?res, "cannot request ancestors");
vec![]
},
}
}
struct RuntimeRequestFailed;
async fn runtime_api_request(
sender: &mut Sender,
relay_parent: Hash,
request: RuntimeApiRequest,
receiver: oneshot::Receiver>,
) -> Result
where
Sender: SubsystemSender,
{
sender
.send_message(RuntimeApiMessage::Request(relay_parent, request).into())
.await;
receiver
.await
.map_err(|_| {
gum::debug!(target: LOG_TARGET, ?relay_parent, "Runtime API request dropped");
RuntimeRequestFailed
})
.and_then(|res| {
res.map_err(|e| {
gum::debug!(
target: LOG_TARGET,
?relay_parent,
err = ?e,
"Runtime API request internal error"
);
RuntimeRequestFailed
})
})
}
async fn request_validation_code_by_hash(
sender: &mut Sender,
relay_parent: Hash,
validation_code_hash: ValidationCodeHash,
) -> Result