// Copyright 2017-2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see .
//! Utility module for subsystems
//!
//! Many subsystems have common interests such as canceling a bunch of spawned jobs,
//! or determining what their validator ID is. These common interests are factored into
//! this module.
//!
//! This crate also reexports Prometheus metric types which are expected to be implemented by subsystems.
#![warn(missing_docs)]
use polkadot_node_subsystem::{
errors::RuntimeApiError,
messages::{AllMessages, RuntimeApiMessage, RuntimeApiRequest, RuntimeApiSender},
FromOverseer, SpawnedSubsystem, Subsystem, SubsystemContext, SubsystemError, SubsystemResult,
};
use futures::{channel::{mpsc, oneshot}, prelude::*, select, stream::Stream};
use futures_timer::Delay;
use parity_scale_codec::Encode;
use pin_project::pin_project;
use polkadot_primitives::v1::{
CandidateEvent, CommittedCandidateReceipt, CoreState, EncodeAs, PersistedValidationData,
GroupRotationInfo, Hash, Id as ParaId, ValidationData, OccupiedCoreAssumption,
SessionIndex, Signed, SigningContext, ValidationCode, ValidatorId, ValidatorIndex,
};
use sp_core::{
traits::SpawnNamed,
Public
};
use sp_application_crypto::AppKey;
use sp_keystore::{
CryptoStore,
SyncCryptoStorePtr,
Error as KeystoreError,
};
use std::{
collections::{HashMap, hash_map::Entry},
convert::{TryFrom, TryInto},
marker::Unpin,
pin::Pin,
task::{Poll, Context},
time::Duration,
};
use streamunordered::{StreamUnordered, StreamYield};
use thiserror::Error;
pub mod validator_discovery;
/// These reexports are required so that external crates can use the `delegated_subsystem` macro properly.
pub mod reexports {
pub use sp_core::traits::SpawnNamed;
pub use polkadot_node_subsystem::{
SpawnedSubsystem,
Subsystem,
SubsystemContext,
};
}
/// Duration a job will wait after sending a stop signal before hard-aborting.
pub const JOB_GRACEFUL_STOP_DURATION: Duration = Duration::from_secs(1);
/// Capacity of channels to and from individual jobs
pub const JOB_CHANNEL_CAPACITY: usize = 64;
/// Utility errors
#[derive(Debug, Error)]
pub enum Error {
/// Attempted to send or receive on a oneshot channel which had been canceled
#[error(transparent)]
Oneshot(#[from] oneshot::Canceled),
/// Attempted to send on a MPSC channel which has been canceled
#[error(transparent)]
Mpsc(#[from] mpsc::SendError),
/// A subsystem error
#[error(transparent)]
Subsystem(#[from] SubsystemError),
/// An error in the Runtime API.
#[error(transparent)]
RuntimeApi(#[from] RuntimeApiError),
/// The type system wants this even though it doesn't make sense
#[error(transparent)]
Infallible(#[from] std::convert::Infallible),
/// Attempted to convert from an AllMessages to a FromJob, and failed.
#[error("AllMessage not relevant to Job")]
SenderConversion(String),
/// The local node is not a validator.
#[error("Node is not a validator")]
NotAValidator,
/// Already forwarding errors to another sender
#[error("AlreadyForwarding")]
AlreadyForwarding,
}
/// A type alias for Runtime API receivers.
pub type RuntimeApiReceiver = oneshot::Receiver>;
/// Request some data from the `RuntimeApi`.
pub async fn request_from_runtime(
parent: Hash,
sender: &mut mpsc::Sender,
request_builder: RequestBuilder,
) -> Result, Error>
where
RequestBuilder: FnOnce(RuntimeApiSender) -> RuntimeApiRequest,
FromJob: TryFrom,
>::Error: std::fmt::Debug,
{
let (tx, rx) = oneshot::channel();
sender
.send(
AllMessages::RuntimeApi(RuntimeApiMessage::Request(parent, request_builder(tx)))
.try_into()
.map_err(|err| Error::SenderConversion(format!("{:?}", err)))?,
)
.await?;
Ok(rx)
}
/// Construct specialized request functions for the runtime.
///
/// These would otherwise get pretty repetitive.
macro_rules! specialize_requests {
// expand return type name for documentation purposes
(fn $func_name:ident( $( $param_name:ident : $param_ty:ty ),* ) -> $return_ty:ty ; $request_variant:ident;) => {
specialize_requests!{
named stringify!($request_variant) ; fn $func_name( $( $param_name : $param_ty ),* ) -> $return_ty ; $request_variant;
}
};
// create a single specialized request function
(named $doc_name:expr ; fn $func_name:ident( $( $param_name:ident : $param_ty:ty ),* ) -> $return_ty:ty ; $request_variant:ident;) => {
#[doc = "Request `"]
#[doc = $doc_name]
#[doc = "` from the runtime"]
pub async fn $func_name(
parent: Hash,
$(
$param_name: $param_ty,
)*
sender: &mut mpsc::Sender,
) -> Result, Error>
where
FromJob: TryFrom,
>::Error: std::fmt::Debug,
{
request_from_runtime(parent, sender, |tx| RuntimeApiRequest::$request_variant(
$( $param_name, )* tx
)).await
}
};
// recursive decompose
(
fn $func_name:ident( $( $param_name:ident : $param_ty:ty ),* ) -> $return_ty:ty ; $request_variant:ident;
$(
fn $t_func_name:ident( $( $t_param_name:ident : $t_param_ty:ty ),* ) -> $t_return_ty:ty ; $t_request_variant:ident;
)+
) => {
specialize_requests!{
fn $func_name( $( $param_name : $param_ty ),* ) -> $return_ty ; $request_variant ;
}
specialize_requests!{
$(
fn $t_func_name( $( $t_param_name : $t_param_ty ),* ) -> $t_return_ty ; $t_request_variant ;
)+
}
};
}
specialize_requests! {
fn request_validators() -> Vec; Validators;
fn request_validator_groups() -> (Vec>, GroupRotationInfo); ValidatorGroups;
fn request_availability_cores() -> Vec; AvailabilityCores;
fn request_full_validation_data(para_id: ParaId, assumption: OccupiedCoreAssumption) -> Option; FullValidationData;
fn request_persisted_validation_data(para_id: ParaId, assumption: OccupiedCoreAssumption) -> Option; PersistedValidationData;
fn request_session_index_for_child() -> SessionIndex; SessionIndexForChild;
fn request_validation_code(para_id: ParaId, assumption: OccupiedCoreAssumption) -> Option; ValidationCode;
fn request_candidate_pending_availability(para_id: ParaId) -> Option; CandidatePendingAvailability;
fn request_candidate_events() -> Vec; CandidateEvents;
}
/// Request some data from the `RuntimeApi` via a SubsystemContext.
async fn request_from_runtime_ctx(
parent: Hash,
ctx: &mut Context,
request_builder: RequestBuilder,
) -> Result, Error>
where
RequestBuilder: FnOnce(RuntimeApiSender) -> RuntimeApiRequest,
Context: SubsystemContext,
{
let (tx, rx) = oneshot::channel();
ctx
.send_message(
AllMessages::RuntimeApi(RuntimeApiMessage::Request(parent, request_builder(tx)))
.try_into()
.map_err(|err| Error::SenderConversion(format!("{:?}", err)))?,
)
.await?;
Ok(rx)
}
/// Construct specialized request functions for the runtime.
///
/// These would otherwise get pretty repetitive.
macro_rules! specialize_requests_ctx {
// expand return type name for documentation purposes
(fn $func_name:ident( $( $param_name:ident : $param_ty:ty ),* ) -> $return_ty:ty ; $request_variant:ident;) => {
specialize_requests_ctx!{
named stringify!($request_variant) ; fn $func_name( $( $param_name : $param_ty ),* ) -> $return_ty ; $request_variant;
}
};
// create a single specialized request function
(named $doc_name:expr ; fn $func_name:ident( $( $param_name:ident : $param_ty:ty ),* ) -> $return_ty:ty ; $request_variant:ident;) => {
#[doc = "Request `"]
#[doc = $doc_name]
#[doc = "` from the runtime via a `SubsystemContext`"]
pub async fn $func_name(
parent: Hash,
$(
$param_name: $param_ty,
)*
ctx: &mut Context,
) -> Result, Error> {
request_from_runtime_ctx(parent, ctx, |tx| RuntimeApiRequest::$request_variant(
$( $param_name, )* tx
)).await
}
};
// recursive decompose
(
fn $func_name:ident( $( $param_name:ident : $param_ty:ty ),* ) -> $return_ty:ty ; $request_variant:ident;
$(
fn $t_func_name:ident( $( $t_param_name:ident : $t_param_ty:ty ),* ) -> $t_return_ty:ty ; $t_request_variant:ident;
)+
) => {
specialize_requests_ctx!{
fn $func_name( $( $param_name : $param_ty ),* ) -> $return_ty ; $request_variant ;
}
specialize_requests_ctx!{
$(
fn $t_func_name( $( $t_param_name : $t_param_ty ),* ) -> $t_return_ty ; $t_request_variant ;
)+
}
};
}
specialize_requests_ctx! {
fn request_validators_ctx() -> Vec; Validators;
fn request_validator_groups_ctx() -> (Vec>, GroupRotationInfo); ValidatorGroups;
fn request_availability_cores_ctx() -> Vec; AvailabilityCores;
fn request_full_validation_data_ctx(para_id: ParaId, assumption: OccupiedCoreAssumption) -> Option; FullValidationData;
fn request_persisted_validation_data_ctx(para_id: ParaId, assumption: OccupiedCoreAssumption) -> Option; PersistedValidationData;
fn request_session_index_for_child_ctx() -> SessionIndex; SessionIndexForChild;
fn request_validation_code_ctx(para_id: ParaId, assumption: OccupiedCoreAssumption) -> Option; ValidationCode;
fn request_candidate_pending_availability_ctx(para_id: ParaId) -> Option; CandidatePendingAvailability;
fn request_candidate_events_ctx() -> Vec; CandidateEvents;
}
/// From the given set of validators, find the first key we can sign with, if any.
pub async fn signing_key(validators: &[ValidatorId], keystore: SyncCryptoStorePtr) -> Option {
for v in validators.iter() {
if CryptoStore::has_keys(&*keystore, &[(v.to_raw_vec(), ValidatorId::ID)]).await {
return Some(v.clone());
}
}
None
}
/// Local validator information
///
/// It can be created if the local node is a validator in the context of a particular
/// relay chain block.
#[derive(Debug)]
pub struct Validator {
signing_context: SigningContext,
key: ValidatorId,
index: ValidatorIndex,
}
impl Validator {
/// Get a struct representing this node's validator if this node is in fact a validator in the context of the given block.
pub async fn new(
parent: Hash,
keystore: SyncCryptoStorePtr,
mut sender: mpsc::Sender,
) -> Result
where
FromJob: TryFrom,
>::Error: std::fmt::Debug,
{
// Note: request_validators and request_session_index_for_child do not and cannot
// run concurrently: they both have a mutable handle to the same sender.
// However, each of them returns a oneshot::Receiver, and those are resolved concurrently.
let (validators, session_index) = futures::try_join!(
request_validators(parent, &mut sender).await?,
request_session_index_for_child(parent, &mut sender).await?,
)?;
let signing_context = SigningContext {
session_index: session_index?,
parent_hash: parent,
};
let validators = validators?;
Self::construct(&validators, signing_context, keystore).await
}
/// Construct a validator instance without performing runtime fetches.
///
/// This can be useful if external code also needs the same data.
pub async fn construct(
validators: &[ValidatorId],
signing_context: SigningContext,
keystore: SyncCryptoStorePtr,
) -> Result {
let key = signing_key(validators, keystore).await.ok_or(Error::NotAValidator)?;
let index = validators
.iter()
.enumerate()
.find(|(_, k)| k == &&key)
.map(|(idx, _)| idx as ValidatorIndex)
.expect("signing_key would have already returned NotAValidator if the item we're searching for isn't in this list; qed");
Ok(Validator {
signing_context,
key,
index,
})
}
/// Get this validator's id.
pub fn id(&self) -> ValidatorId {
self.key.clone()
}
/// Get this validator's local index.
pub fn index(&self) -> ValidatorIndex {
self.index
}
/// Get the current signing context.
pub fn signing_context(&self) -> &SigningContext {
&self.signing_context
}
/// Sign a payload with this validator
pub async fn sign, RealPayload: Encode>(
&self,
keystore: SyncCryptoStorePtr,
payload: Payload,
) -> Result, KeystoreError> {
Signed::sign(&keystore, payload, &self.signing_context, self.index, &self.key).await
}
/// Validate the payload with this validator
///
/// Validation can only succeed if `signed.validator_index() == self.index()`.
/// Normally, this will always be the case for a properly operating program,
/// but it's double-checked here anyway.
pub fn check_payload, RealPayload: Encode>(
&self,
signed: Signed,
) -> Result<(), ()> {
if signed.validator_index() != self.index {
return Err(());
}
signed.check_signature(&self.signing_context, &self.id())
}
}
/// ToJob is expected to be an enum declaring the set of messages of interest to a particular job.
///
/// Normally, this will be some subset of `Allmessages`, and a `Stop` variant.
pub trait ToJobTrait: TryFrom {
/// The `Stop` variant of the ToJob enum.
const STOP: Self;
/// If the message variant contains its relay parent, return it here
fn relay_parent(&self) -> Option;
}
struct AbortOnDrop(future::AbortHandle);
impl Drop for AbortOnDrop {
fn drop(&mut self) {
self.0.abort();
}
}
/// A JobHandle manages a particular job for a subsystem.
struct JobHandle {
_abort_handle: AbortOnDrop,
to_job: mpsc::Sender,
finished: oneshot::Receiver<()>,
}
impl JobHandle {
/// Send a message to the job.
async fn send_msg(&mut self, msg: ToJob) -> Result<(), Error> {
self.to_job.send(msg).await.map_err(Into::into)
}
}
impl JobHandle {
/// Stop this job gracefully.
///
/// If it hasn't shut itself down after `JOB_GRACEFUL_STOP_DURATION`, abort it.
async fn stop(mut self) {
// we don't actually care if the message couldn't be sent
if self.to_job.send(ToJob::STOP).await.is_err() {
return;
}
let stop_timer = Delay::new(JOB_GRACEFUL_STOP_DURATION);
future::select(stop_timer, self.finished).await;
}
}
/// This module reexports Prometheus types and defines the [`Metrics`] trait.
pub mod metrics {
/// Reexport Prometheus types.
pub use substrate_prometheus_endpoint as prometheus;
/// Subsystem- or job-specific Prometheus metrics.
///
/// Usually implemented as a wrapper for `Option`
/// to ensure `Default` bounds or as a dummy type ().
/// Prometheus metrics internally hold an `Arc` reference, so cloning them is fine.
pub trait Metrics: Default + Clone {
/// Try to register metrics in the Prometheus registry.
fn try_register(registry: &prometheus::Registry) -> Result;
/// Convenience method to register metrics in the optional Promethius registry.
///
/// If no registry is provided, returns `Default::default()`. Otherwise, returns the same
/// thing that `try_register` does.
fn register(registry: Option<&prometheus::Registry>) -> Result {
match registry {
None => Ok(Self::default()),
Some(registry) => Self::try_register(registry),
}
}
}
// dummy impl
impl Metrics for () {
fn try_register(_registry: &prometheus::Registry) -> Result<(), prometheus::PrometheusError> {
Ok(())
}
}
}
/// This trait governs jobs.
///
/// Jobs are instantiated and killed automatically on appropriate overseer messages.
/// Other messages are passed along to and from the job via the overseer to other
/// subsystems.
pub trait JobTrait: Unpin {
/// Message type to the job. Typically a subset of AllMessages.
type ToJob: 'static + ToJobTrait + Send;
/// Message type from the job. Typically a subset of AllMessages.
type FromJob: 'static + Into + Send;
/// Job runtime error.
type Error: 'static + std::error::Error + Send;
/// Extra arguments this job needs to run properly.
///
/// If no extra information is needed, it is perfectly acceptable to set it to `()`.
type RunArgs: 'static + Send;
/// Subsystem-specific Prometheus metrics.
///
/// Jobs spawned by one subsystem should share the same
/// instance of metrics (use `.clone()`).
/// The `delegate_subsystem!` macro should take care of this.
type Metrics: 'static + metrics::Metrics + Send;
/// Name of the job, i.e. `CandidateBackingJob`
const NAME: &'static str;
/// Run a job for the parent block indicated
fn run(
parent: Hash,
run_args: Self::RunArgs,
metrics: Self::Metrics,
receiver: mpsc::Receiver,
sender: mpsc::Sender,
) -> Pin> + Send>>;
}
/// Error which can be returned by the jobs manager
///
/// Wraps the utility error type and the job-specific error
#[derive(Debug, Error)]
pub enum JobsError {
/// utility error
#[error("Utility")]
Utility(#[source] Error),
/// internal job error
#[error("Internal")]
Job(#[source] JobError),
}
/// Jobs manager for a subsystem
///
/// - Spawns new jobs for a given relay-parent on demand.
/// - Closes old jobs for a given relay-parent on demand.
/// - Dispatches messages to the appropriate job for a given relay-parent.
/// - When dropped, aborts all remaining jobs.
/// - implements `Stream`, collecting all messages from subordinate jobs.
#[pin_project]
pub struct Jobs {
spawner: Spawner,
running: HashMap>,
outgoing_msgs: StreamUnordered>,
#[pin]
job: std::marker::PhantomData,
errors: Option, JobsError)>>,
}
impl Jobs {
/// Create a new Jobs manager which handles spawning appropriate jobs.
pub fn new(spawner: Spawner) -> Self {
Self {
spawner,
running: HashMap::new(),
outgoing_msgs: StreamUnordered::new(),
job: std::marker::PhantomData,
errors: None,
}
}
/// Monitor errors which may occur during handling of a spawned job.
///
/// By default, an error in a job is simply logged. Once this is called,
/// the error is forwarded onto the provided channel.
///
/// Errors if the error channel already exists.
pub fn forward_errors(
&mut self,
tx: mpsc::Sender<(Option, JobsError)>,
) -> Result<(), Error> {
if self.errors.is_some() {
return Err(Error::AlreadyForwarding);
}
self.errors = Some(tx);
Ok(())
}
/// Spawn a new job for this `parent_hash`, with whatever args are appropriate.
fn spawn_job(&mut self, parent_hash: Hash, run_args: Job::RunArgs, metrics: Job::Metrics) -> Result<(), Error> {
let (to_job_tx, to_job_rx) = mpsc::channel(JOB_CHANNEL_CAPACITY);
let (from_job_tx, from_job_rx) = mpsc::channel(JOB_CHANNEL_CAPACITY);
let (finished_tx, finished) = oneshot::channel();
let err_tx = self.errors.clone();
let (future, abort_handle) = future::abortable(async move {
if let Err(e) = Job::run(parent_hash, run_args, metrics, to_job_rx, from_job_tx).await {
log::error!(
"{}({}) finished with an error {:?}",
Job::NAME,
parent_hash,
e,
);
if let Some(mut err_tx) = err_tx {
// if we can't send the notification of error on the error channel, then
// there's no point trying to propagate this error onto the channel too
// all we can do is warn that error propagation has failed
if let Err(e) = err_tx.send((Some(parent_hash), JobsError::Job(e))).await {
log::warn!("failed to forward error: {:?}", e);
}
}
}
});
let future = async move {
// job errors are already handled within the future, meaning
// that any errors here are due to the abortable mechanism.
// failure to abort isn't of interest.
let _ = future.await;
// transmission failure here is only possible if the receiver is closed,
// which means the handle is dropped, which means we don't care anymore
let _ = finished_tx.send(());
};
self.spawner.spawn(Job::NAME, future.boxed());
self.outgoing_msgs.push(from_job_rx);
let handle = JobHandle {
_abort_handle: AbortOnDrop(abort_handle),
to_job: to_job_tx,
finished,
};
self.running.insert(parent_hash, handle);
Ok(())
}
/// Stop the job associated with this `parent_hash`.
pub async fn stop_job(&mut self, parent_hash: Hash) {
if let Some(handle) = self.running.remove(&parent_hash) {
handle.stop().await;
}
}
/// Send a message to the appropriate job for this `parent_hash`.
async fn send_msg(&mut self, parent_hash: Hash, msg: Job::ToJob) {
if let Entry::Occupied(mut job) = self.running.entry(parent_hash) {
if job.get_mut().send_msg(msg).await.is_err() {
log::debug!("failed to send message to job ({}), will remove it", Job::NAME);
job.remove();
}
}
}
}
impl Stream for Jobs
where
Spawner: SpawnNamed,
Job: JobTrait,
{
type Item = Job::FromJob;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll