Files
pezkuwi-subxt/polkadot/node/subsystem-util/src/lib.rs
T
s0me0ne-unkn0wn dd0a556665 Executor Environment parameterization (#6161)
* Re-apply changes without Diener, rebase to the lastest master

* Cache pruning

* Bit-pack InstantiationStrategy

* Move ExecutorParams version inside the structure itself

* Rework runtime API and executor parameters storage

* Pass executor parameters through backing subsystem

* Update Cargo.lock

* Introduce `ExecutorParams` to approval voting subsys

* Introduce `ExecutorParams` to dispute coordinator

* `cargo fmt`

* Simplify requests from backing subsys

* Fix tests

* Replace manual config cloning with `.clone()`

* Move constants to module

* Parametrize executor performing PVF pre-check

* Fix Malus

* Fix test runtime

* Introduce session executor params as a constant defined by session info
pallet

* Use Parity SCALE codec instead of hand-crafted binary encoding

* Get rid of constants; Add docs

* Get rid of constants

* Minor typo

* Fix Malus after rebase

* `cargo fmt`

* Use transparent SCALE encoding instead of explicit

* Clean up

* Get rid of relay parent to session index mapping

* Join environment type and version in a single enum element

* Use default execution parameters if running an old runtime

* `unwrap()` -> `expect()`

* Correct API version

* Constants are back in town

* Use constants for execution environment types

* Artifact separation, first try

* Get rid of explicit version

* PVF execution queue worker separation

* Worker handshake

* Global renaming

* Minor fixes resolving discussions

* Two-stage requesting of executor params to make use of runtime API cache

* Proper error handling in pvf-checker

* Executor params storage bootstrapping

* Propagate migration to v3 network runtimes

* Fix storage versioning

* Ensure `ExecutorParams` serialization determinism; Add comments

* Rename constants to make things a bit more deterministic
Get rid of stale code

* Tidy up a structure of active PVFs

* Minor formatting

* Fix comment

* Add try-runtime hooks

* Add storage version write on upgrade

Co-authored-by: Andronik <write@reusable.software>

* Add pre- and post-upgrade assertions

* Require to specify environment type; Remove redundant `impl`s

* Add `ExecutorParamHash` creation from `H256`

* Fix candidate validation subsys tests

* Return splittable error from executor params request fn

* Revert "Return splittable error from executor params request fn"

This reverts commit a0b274177d8bb2f6e13c066741892ecd2e72a456.

* Decompose approval voting metrics

* Use more relevant errors

* Minor formatting fix

* Assert a valid environment type instead of checking

* Fix `try-runtime` hooks

* After-merge fixes

* Add migration logs

* Remove dead code

* Fix tests

* Fix tests

* Back to the strongly typed implementation

* Promote strong types to executor interface

* Remove stale comment

* Move executor params to `SessionInfo`: primitives and runtime

* Move executor params to `SessionInfo`: node

* Try to bump primitives and API version

* Get rid of `MallocSizeOf`

* Bump target API version to v4

* Make use of session index already in place

* Back to v3

* Fix all the tests

* Add migrations to all the runtimes

* Make use of existing `SessionInfo` in approval voting subsys

* Rename `TARGET` -> `LOG_TARGET`

* Bump all the primitives to v3

* Fix Rococo ParachainHost API version

* Use `RollingSessionWindow` to acquire `ExecutorParams` in disputes

* Fix nits from discussions; add comments

* Re-evaluate queue logic

* Rework job assignment in execution queue

* Add documentation

* Use `RuntimeInfo` to obtain `SessionInfo` (with blackjack and caching)

* Couple `Pvf` with `ExecutorParams` wherever possible

* Put members of `PvfWithExecutorParams` under `Arc` for cheap cloning

* Fix comment

* Fix CI tests

* Fix clippy warnings

* Address nits from discussions

* Add a placeholder for raw data

* Fix non exhaustive match

* Remove redundant reexports and fix imports

* Keep only necessary semantic features, as discussed

* Rework `RuntimeInfo` to support mock implementation for tests

* Remove unneeded bound

* `cargo fmt`

* Revert "Remove unneeded bound"

This reverts commit 932463f26b00ce290e1e61848eb9328632ef8a61.

* Fix PVF host tests

* Fix PVF checker tests

* Fix overseer declarations

* Simplify tests

* `MAX_KEEP_WAITING` timeout based on `BACKGING_EXECUTION_TIMEOUT`

* Add a unit test for varying executor parameters

* Minor fixes from discussions

* Add prechecking max. memory parameter (see paritytech/srlabs_findings#110)

* Fix and improve a test

* Remove `ExecutionEnvironment` and `RawData`

* New primitives versioning in parachain host API

* `disputes()` implementation for Kusama and Polkadot

* Move `ExecutorParams` from `vstaging` to stable primitives

* Move disputes from `vstaging` to stable implementation

* Fix `try-runtime`

* Fixes after merge

* Move `ExecutorParams` to the bottom of `SessionInfo`

* Revert "Move executor params to `SessionInfo`: primitives and runtime"

This reverts commit dfcfb85fefd1c5be6c8a8f72dc09fd1809cfa9ce.

* Always use fresh activated live hash in pvf precheck
(re-apply 34b09a4c20de17e7926ed942cd0d657d18f743fa)

* Fixing tests (broken commit)

* Fix candidate validation tests

* Fix PVF host test

* Minor fixes

* Address discussions

* Restore migration

* Fix `use` to only include what is needed instead of `*`

* Add comment to never touch `DEFAULT_CONFIG`

* Update migration to set default `ExecutorParams` for `dispute_period`
sessions back

* Use `earliest_stored_session` instead of calculations

* Nit

* Add logs

* Treat any runtime error as `NotSupported` again

* Always return default executor params if not available

* Revert "Always return default executor params if not available"

This reverts commit b58ac4482ef444c67a9852d5776550d08e312f30.

* Add paritytech/substrate#9997 workaround

* `cargo fmt`

* Remove migration (again!)

* Bump executor params to API v4 (backport from #6698)

---------

Co-authored-by: Andronik <write@reusable.software>
2023-02-15 11:26:09 +00:00

436 lines
15 KiB
Rust

// Copyright 2017-2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! Utility module for subsystems
//!
//! Many subsystems have common interests such as canceling a bunch of spawned jobs,
//! or determining what their validator ID is. These common interests are factored into
//! this module.
//!
//! This crate also reexports Prometheus metric types which are expected to be implemented by subsystems.
#![warn(missing_docs)]
use polkadot_node_subsystem::{
errors::{RuntimeApiError, SubsystemError},
messages::{RuntimeApiMessage, RuntimeApiRequest, RuntimeApiSender},
overseer, SubsystemSender,
};
use polkadot_primitives::vstaging::ExecutorParams;
pub use overseer::{
gen::{OrchestraError as OverseerError, Timeout},
Subsystem, TimeoutExt,
};
pub use polkadot_node_metrics::{metrics, Metronome};
use futures::channel::{mpsc, oneshot};
use parity_scale_codec::Encode;
use polkadot_primitives::{
AuthorityDiscoveryId, CandidateEvent, CommittedCandidateReceipt, CoreState, EncodeAs,
GroupIndex, GroupRotationInfo, Hash, Id as ParaId, OccupiedCoreAssumption,
PersistedValidationData, ScrapedOnChainVotes, SessionIndex, SessionInfo, Signed,
SigningContext, ValidationCode, ValidationCodeHash, ValidatorId, ValidatorIndex,
ValidatorSignature,
};
pub use rand;
use sp_application_crypto::AppKey;
use sp_core::ByteArray;
use sp_keystore::{CryptoStore, Error as KeystoreError, SyncCryptoStorePtr};
use std::time::Duration;
use thiserror::Error;
pub use metered;
pub use polkadot_node_network_protocol::MIN_GOSSIP_PEERS;
pub use determine_new_blocks::determine_new_blocks;
/// These reexports are required so that external crates can use the `delegated_subsystem` macro properly.
pub mod reexports {
pub use polkadot_overseer::gen::{SpawnedSubsystem, Spawner, Subsystem, SubsystemContext};
}
/// A rolling session window cache.
pub mod rolling_session_window;
/// Convenient and efficient runtime info access.
pub mod runtime;
/// Database trait for subsystem.
pub mod database;
/// Nested message sending
///
/// Useful for having mostly synchronous code, with submodules spawning short lived asynchronous
/// tasks, sending messages back.
pub mod nesting_sender;
mod determine_new_blocks;
#[cfg(test)]
mod tests;
/// Duration a job will wait after sending a stop signal before hard-aborting.
pub const JOB_GRACEFUL_STOP_DURATION: Duration = Duration::from_secs(1);
/// Capacity of channels to and from individual jobs
pub const JOB_CHANNEL_CAPACITY: usize = 64;
/// Utility errors
#[derive(Debug, Error)]
pub enum Error {
/// Attempted to send or receive on a oneshot channel which had been canceled
#[error(transparent)]
Oneshot(#[from] oneshot::Canceled),
/// Attempted to send on a MPSC channel which has been canceled
#[error(transparent)]
Mpsc(#[from] mpsc::SendError),
/// A subsystem error
#[error(transparent)]
Subsystem(#[from] SubsystemError),
/// An error in the Runtime API.
#[error(transparent)]
RuntimeApi(#[from] RuntimeApiError),
/// The type system wants this even though it doesn't make sense
#[error(transparent)]
Infallible(#[from] std::convert::Infallible),
/// Attempted to convert from an `AllMessages` to a `FromJob`, and failed.
#[error("AllMessage not relevant to Job")]
SenderConversion(String),
/// The local node is not a validator.
#[error("Node is not a validator")]
NotAValidator,
/// Already forwarding errors to another sender
#[error("AlreadyForwarding")]
AlreadyForwarding,
/// Data that are supposed to be there a not there
#[error("Data are not available")]
DataNotAvailable,
}
impl From<OverseerError> for Error {
fn from(e: OverseerError) -> Self {
Self::from(SubsystemError::from(e))
}
}
/// A type alias for Runtime API receivers.
pub type RuntimeApiReceiver<T> = oneshot::Receiver<Result<T, RuntimeApiError>>;
/// Request some data from the `RuntimeApi`.
pub async fn request_from_runtime<RequestBuilder, Response, Sender>(
parent: Hash,
sender: &mut Sender,
request_builder: RequestBuilder,
) -> RuntimeApiReceiver<Response>
where
RequestBuilder: FnOnce(RuntimeApiSender<Response>) -> RuntimeApiRequest,
Sender: SubsystemSender<RuntimeApiMessage>,
{
let (tx, rx) = oneshot::channel();
sender
.send_message(RuntimeApiMessage::Request(parent, request_builder(tx)).into())
.await;
rx
}
/// Construct specialized request functions for the runtime.
///
/// These would otherwise get pretty repetitive.
macro_rules! specialize_requests {
// expand return type name for documentation purposes
(fn $func_name:ident( $( $param_name:ident : $param_ty:ty ),* ) -> $return_ty:ty ; $request_variant:ident;) => {
specialize_requests!{
named stringify!($request_variant) ; fn $func_name( $( $param_name : $param_ty ),* ) -> $return_ty ; $request_variant;
}
};
// create a single specialized request function
(named $doc_name:expr ; fn $func_name:ident( $( $param_name:ident : $param_ty:ty ),* ) -> $return_ty:ty ; $request_variant:ident;) => {
#[doc = "Request `"]
#[doc = $doc_name]
#[doc = "` from the runtime"]
pub async fn $func_name (
parent: Hash,
$(
$param_name: $param_ty,
)*
sender: &mut impl overseer::SubsystemSender<RuntimeApiMessage>,
) -> RuntimeApiReceiver<$return_ty>
{
request_from_runtime(parent, sender, |tx| RuntimeApiRequest::$request_variant(
$( $param_name, )* tx
)).await
}
};
// recursive decompose
(
fn $func_name:ident( $( $param_name:ident : $param_ty:ty ),* ) -> $return_ty:ty ; $request_variant:ident;
$(
fn $t_func_name:ident( $( $t_param_name:ident : $t_param_ty:ty ),* ) -> $t_return_ty:ty ; $t_request_variant:ident;
)+
) => {
specialize_requests!{
fn $func_name( $( $param_name : $param_ty ),* ) -> $return_ty ; $request_variant ;
}
specialize_requests!{
$(
fn $t_func_name( $( $t_param_name : $t_param_ty ),* ) -> $t_return_ty ; $t_request_variant ;
)+
}
};
}
specialize_requests! {
fn request_authorities() -> Vec<AuthorityDiscoveryId>; Authorities;
fn request_validators() -> Vec<ValidatorId>; Validators;
fn request_validator_groups() -> (Vec<Vec<ValidatorIndex>>, GroupRotationInfo); ValidatorGroups;
fn request_availability_cores() -> Vec<CoreState>; AvailabilityCores;
fn request_persisted_validation_data(para_id: ParaId, assumption: OccupiedCoreAssumption) -> Option<PersistedValidationData>; PersistedValidationData;
fn request_assumed_validation_data(para_id: ParaId, expected_persisted_validation_data_hash: Hash) -> Option<(PersistedValidationData, ValidationCodeHash)>; AssumedValidationData;
fn request_session_index_for_child() -> SessionIndex; SessionIndexForChild;
fn request_validation_code(para_id: ParaId, assumption: OccupiedCoreAssumption) -> Option<ValidationCode>; ValidationCode;
fn request_validation_code_by_hash(validation_code_hash: ValidationCodeHash) -> Option<ValidationCode>; ValidationCodeByHash;
fn request_candidate_pending_availability(para_id: ParaId) -> Option<CommittedCandidateReceipt>; CandidatePendingAvailability;
fn request_candidate_events() -> Vec<CandidateEvent>; CandidateEvents;
fn request_session_info(index: SessionIndex) -> Option<SessionInfo>; SessionInfo;
fn request_validation_code_hash(para_id: ParaId, assumption: OccupiedCoreAssumption)
-> Option<ValidationCodeHash>; ValidationCodeHash;
fn request_on_chain_votes() -> Option<ScrapedOnChainVotes>; FetchOnChainVotes;
fn request_session_executor_params(session_index: SessionIndex) -> Option<ExecutorParams>; SessionExecutorParams;
}
/// Requests executor parameters from the runtime effective at given relay-parent. First obtains
/// session index at the relay-parent, relying on the fact that it should be cached by the runtime
/// API caching layer even if the block itself has already been pruned. Then requests executor
/// parameters by session index.
/// Returns an error if failed to communicate to the runtime, or the parameters are not in the
/// storage, which should never happen.
/// Returns default execution parameters if the runtime doesn't yet support `SessionExecutorParams`
/// API call.
/// Otherwise, returns execution parameters returned by the runtime.
pub async fn executor_params_at_relay_parent(
relay_parent: Hash,
sender: &mut impl overseer::SubsystemSender<RuntimeApiMessage>,
) -> Result<ExecutorParams, Error> {
match request_session_index_for_child(relay_parent, sender).await.await {
Err(err) => {
// Failed to communicate with the runtime
Err(Error::Oneshot(err))
},
Ok(Err(err)) => {
// Runtime has failed to obtain a session index at the relay-parent.
Err(Error::RuntimeApi(err))
},
Ok(Ok(session_index)) => {
match request_session_executor_params(relay_parent, session_index, sender).await.await {
Err(err) => {
// Failed to communicate with the runtime
Err(Error::Oneshot(err))
},
Ok(Err(RuntimeApiError::NotSupported { .. })) => {
// Runtime doesn't yet support the api requested, should execute anyway
// with default set of parameters
Ok(ExecutorParams::default())
},
Ok(Err(err)) => {
// Runtime failed to execute the request
Err(Error::RuntimeApi(err))
},
Ok(Ok(None)) => {
// Storage doesn't contain a parameter set for the given session; should
// never happen
Err(Error::DataNotAvailable)
},
Ok(Ok(Some(executor_params))) => Ok(executor_params),
}
},
}
}
/// From the given set of validators, find the first key we can sign with, if any.
pub async fn signing_key(
validators: &[ValidatorId],
keystore: &SyncCryptoStorePtr,
) -> Option<ValidatorId> {
signing_key_and_index(validators, keystore).await.map(|(k, _)| k)
}
/// From the given set of validators, find the first key we can sign with, if any, and return it
/// along with the validator index.
pub async fn signing_key_and_index(
validators: &[ValidatorId],
keystore: &SyncCryptoStorePtr,
) -> Option<(ValidatorId, ValidatorIndex)> {
for (i, v) in validators.iter().enumerate() {
if CryptoStore::has_keys(&**keystore, &[(v.to_raw_vec(), ValidatorId::ID)]).await {
return Some((v.clone(), ValidatorIndex(i as _)))
}
}
None
}
/// Sign the given data with the given validator ID.
///
/// Returns `Ok(None)` if the private key that correponds to that validator ID is not found in the
/// given keystore. Returns an error if the key could not be used for signing.
pub async fn sign(
keystore: &SyncCryptoStorePtr,
key: &ValidatorId,
data: &[u8],
) -> Result<Option<ValidatorSignature>, KeystoreError> {
let signature =
CryptoStore::sign_with(&**keystore, ValidatorId::ID, &key.into(), &data).await?;
match signature {
Some(sig) =>
Ok(Some(sig.try_into().map_err(|_| KeystoreError::KeyNotSupported(ValidatorId::ID))?)),
None => Ok(None),
}
}
/// Find the validator group the given validator index belongs to.
pub fn find_validator_group(
groups: &[Vec<ValidatorIndex>],
index: ValidatorIndex,
) -> Option<GroupIndex> {
groups.iter().enumerate().find_map(|(i, g)| {
if g.contains(&index) {
Some(GroupIndex(i as _))
} else {
None
}
})
}
/// Choose a random subset of `min` elements.
/// But always include `is_priority` elements.
pub fn choose_random_subset<T, F: FnMut(&T) -> bool>(is_priority: F, v: &mut Vec<T>, min: usize) {
choose_random_subset_with_rng(is_priority, v, &mut rand::thread_rng(), min)
}
/// Choose a random subset of `min` elements using a specific Random Generator `Rng`
/// But always include `is_priority` elements.
pub fn choose_random_subset_with_rng<T, F: FnMut(&T) -> bool, R: rand::Rng>(
is_priority: F,
v: &mut Vec<T>,
rng: &mut R,
min: usize,
) {
use rand::seq::SliceRandom as _;
// partition the elements into priority first
// the returned index is when non_priority elements start
let i = itertools::partition(v.iter_mut(), is_priority);
if i >= min || v.len() <= i {
v.truncate(i);
return
}
v[i..].shuffle(rng);
v.truncate(min);
}
/// Returns a `bool` with a probability of `a / b` of being true.
pub fn gen_ratio(a: usize, b: usize) -> bool {
gen_ratio_rng(a, b, &mut rand::thread_rng())
}
/// Returns a `bool` with a probability of `a / b` of being true.
pub fn gen_ratio_rng<R: rand::Rng>(a: usize, b: usize, rng: &mut R) -> bool {
rng.gen_ratio(a as u32, b as u32)
}
/// Local validator information
///
/// It can be created if the local node is a validator in the context of a particular
/// relay chain block.
#[derive(Debug)]
pub struct Validator {
signing_context: SigningContext,
key: ValidatorId,
index: ValidatorIndex,
}
impl Validator {
/// Get a struct representing this node's validator if this node is in fact a validator in the context of the given block.
pub async fn new<S>(
parent: Hash,
keystore: SyncCryptoStorePtr,
sender: &mut S,
) -> Result<Self, Error>
where
S: SubsystemSender<RuntimeApiMessage>,
{
// Note: request_validators and request_session_index_for_child do not and cannot
// run concurrently: they both have a mutable handle to the same sender.
// However, each of them returns a oneshot::Receiver, and those are resolved concurrently.
let (validators, session_index) = futures::try_join!(
request_validators(parent, sender).await,
request_session_index_for_child(parent, sender).await,
)?;
let signing_context = SigningContext { session_index: session_index?, parent_hash: parent };
let validators = validators?;
Self::construct(&validators, signing_context, keystore).await
}
/// Construct a validator instance without performing runtime fetches.
///
/// This can be useful if external code also needs the same data.
pub async fn construct(
validators: &[ValidatorId],
signing_context: SigningContext,
keystore: SyncCryptoStorePtr,
) -> Result<Self, Error> {
let (key, index) =
signing_key_and_index(validators, &keystore).await.ok_or(Error::NotAValidator)?;
Ok(Validator { signing_context, key, index })
}
/// Get this validator's id.
pub fn id(&self) -> ValidatorId {
self.key.clone()
}
/// Get this validator's local index.
pub fn index(&self) -> ValidatorIndex {
self.index
}
/// Get the current signing context.
pub fn signing_context(&self) -> &SigningContext {
&self.signing_context
}
/// Sign a payload with this validator
pub async fn sign<Payload: EncodeAs<RealPayload>, RealPayload: Encode>(
&self,
keystore: SyncCryptoStorePtr,
payload: Payload,
) -> Result<Option<Signed<Payload, RealPayload>>, KeystoreError> {
Signed::sign(&keystore, payload, &self.signing_context, self.index, &self.key).await
}
}