// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see .
//! Implements the Runtime API Subsystem
//!
//! This provides a clean, ownerless wrapper around the parachain-related runtime APIs. This crate
//! can also be used to cache responses from heavy runtime APIs.
#![deny(unused_crate_dependencies)]
#![warn(missing_docs)]
use polkadot_subsystem::{
Subsystem, SpawnedSubsystem, SubsystemResult, SubsystemContext,
FromOverseer, OverseerSignal,
messages::{
RuntimeApiMessage, RuntimeApiRequest as Request,
},
errors::RuntimeApiError,
};
use polkadot_node_subsystem_util::{
metrics::{self, prometheus},
};
use polkadot_primitives::v1::{Block, BlockId, Hash, ParachainHost};
use std::sync::Arc;
use sp_api::{ProvideRuntimeApi};
use futures::prelude::*;
/// The `RuntimeApiSubsystem`. See module docs for more details.
pub struct RuntimeApiSubsystem {
client: Arc,
metrics: Metrics,
}
impl RuntimeApiSubsystem {
/// Create a new Runtime API subsystem wrapping the given client and metrics.
pub fn new(client: Arc, metrics: Metrics) -> Self {
RuntimeApiSubsystem { client, metrics }
}
}
impl Subsystem for RuntimeApiSubsystem where
Client: ProvideRuntimeApi + Send + 'static + Sync,
Client::Api: ParachainHost,
Context: SubsystemContext
{
fn start(self, ctx: Context) -> SpawnedSubsystem {
SpawnedSubsystem {
future: run(ctx, self).boxed(),
name: "runtime-api-subsystem",
}
}
}
async fn run(
mut ctx: impl SubsystemContext,
subsystem: RuntimeApiSubsystem,
) -> SubsystemResult<()> where
Client: ProvideRuntimeApi,
Client::Api: ParachainHost,
{
loop {
match ctx.recv().await? {
FromOverseer::Signal(OverseerSignal::Conclude) => return Ok(()),
FromOverseer::Signal(OverseerSignal::ActiveLeaves(_)) => {},
FromOverseer::Signal(OverseerSignal::BlockFinalized(_)) => {},
FromOverseer::Communication { msg } => match msg {
RuntimeApiMessage::Request(relay_parent, request) => make_runtime_api_request(
&*subsystem.client,
&subsystem.metrics,
relay_parent,
request,
),
}
}
}
}
fn make_runtime_api_request(
client: &Client,
metrics: &Metrics,
relay_parent: Hash,
request: Request,
) where
Client: ProvideRuntimeApi,
Client::Api: ParachainHost,
{
macro_rules! query {
($api_name:ident ($($param:expr),*), $sender:expr) => {{
let sender = $sender;
let api = client.runtime_api();
let res = api.$api_name(&BlockId::Hash(relay_parent), $($param),*)
.map_err(|e| RuntimeApiError::from(format!("{:?}", e)));
metrics.on_request(res.is_ok());
let _ = sender.send(res);
}}
}
match request {
Request::Validators(sender) => query!(validators(), sender),
Request::ValidatorGroups(sender) => query!(validator_groups(), sender),
Request::AvailabilityCores(sender) => query!(availability_cores(), sender),
Request::PersistedValidationData(para, assumption, sender) =>
query!(persisted_validation_data(para, assumption), sender),
Request::FullValidationData(para, assumption, sender) =>
query!(full_validation_data(para, assumption), sender),
Request::CheckValidationOutputs(para, commitments, sender) =>
query!(check_validation_outputs(para, commitments), sender),
Request::SessionIndexForChild(sender) => query!(session_index_for_child(), sender),
Request::ValidationCode(para, assumption, sender) =>
query!(validation_code(para, assumption), sender),
Request::HistoricalValidationCode(para, at, sender) =>
query!(historical_validation_code(para, at), sender),
Request::CandidatePendingAvailability(para, sender) =>
query!(candidate_pending_availability(para), sender),
Request::CandidateEvents(sender) => query!(candidate_events(), sender),
Request::ValidatorDiscovery(ids, sender) => query!(validator_discovery(ids), sender),
Request::DmqContents(id, sender) => query!(dmq_contents(id), sender),
Request::InboundHrmpChannelsContents(id, sender) => query!(inbound_hrmp_channels_contents(id), sender),
}
}
#[derive(Clone)]
struct MetricsInner {
chain_api_requests: prometheus::CounterVec,
}
/// Runtime API metrics.
#[derive(Default, Clone)]
pub struct Metrics(Option);
impl Metrics {
fn on_request(&self, succeeded: bool) {
if let Some(metrics) = &self.0 {
if succeeded {
metrics.chain_api_requests.with_label_values(&["succeeded"]).inc();
} else {
metrics.chain_api_requests.with_label_values(&["failed"]).inc();
}
}
}
}
impl metrics::Metrics for Metrics {
fn try_register(registry: &prometheus::Registry) -> Result {
let metrics = MetricsInner {
chain_api_requests: prometheus::register(
prometheus::CounterVec::new(
prometheus::Opts::new(
"parachain_runtime_api_requests_total",
"Number of Runtime API requests served.",
),
&["success"],
)?,
registry,
)?,
};
Ok(Metrics(Some(metrics)))
}
}
#[cfg(test)]
mod tests {
use super::*;
use polkadot_primitives::v1::{
ValidatorId, ValidatorIndex, GroupRotationInfo, CoreState, PersistedValidationData,
Id as ParaId, OccupiedCoreAssumption, ValidationData, SessionIndex, ValidationCode,
CommittedCandidateReceipt, CandidateEvent, AuthorityDiscoveryId, InboundDownwardMessage,
BlockNumber, InboundHrmpMessage,
};
use polkadot_node_subsystem_test_helpers as test_helpers;
use sp_core::testing::TaskExecutor;
use std::collections::{HashMap, BTreeMap};
use futures::channel::oneshot;
#[derive(Default, Clone)]
struct MockRuntimeApi {
validators: Vec,
validator_groups: Vec>,
availability_cores: Vec,
validation_data: HashMap,
session_index_for_child: SessionIndex,
validation_code: HashMap,
historical_validation_code: HashMap>,
validation_outputs_results: HashMap,
candidate_pending_availability: HashMap,
candidate_events: Vec,
dmq_contents: HashMap>,
hrmp_channels: HashMap>>,
}
impl ProvideRuntimeApi for MockRuntimeApi {
type Api = Self;
fn runtime_api<'a>(&'a self) -> sp_api::ApiRef<'a, Self::Api> {
self.clone().into()
}
}
sp_api::mock_impl_runtime_apis! {
impl ParachainHost for MockRuntimeApi {
type Error = String;
fn validators(&self) -> Vec {
self.validators.clone()
}
fn validator_groups(&self) -> (Vec>, GroupRotationInfo) {
(
self.validator_groups.clone(),
GroupRotationInfo {
session_start_block: 1,
group_rotation_frequency: 100,
now: 10,
},
)
}
fn availability_cores(&self) -> Vec {
self.availability_cores.clone()
}
fn persisted_validation_data(
&self,
para: ParaId,
_assumption: OccupiedCoreAssumption,
) -> Option {
self.validation_data.get(¶).map(|l| l.persisted.clone())
}
fn full_validation_data(
&self,
para: ParaId,
_assumption: OccupiedCoreAssumption,
) -> Option {
self.validation_data.get(¶).map(|l| l.clone())
}
fn check_validation_outputs(
&self,
para_id: ParaId,
_commitments: polkadot_primitives::v1::ValidationOutputs,
) -> bool {
self.validation_outputs_results
.get(¶_id)
.cloned()
.expect(
"`check_validation_outputs` called but the expected result hasn't been supplied"
)
}
fn session_index_for_child(&self) -> SessionIndex {
self.session_index_for_child.clone()
}
fn validation_code(
&self,
para: ParaId,
_assumption: OccupiedCoreAssumption,
) -> Option {
self.validation_code.get(¶).map(|c| c.clone())
}
fn historical_validation_code(
&self,
para: ParaId,
at: BlockNumber,
) -> Option {
self.historical_validation_code.get(¶).and_then(|h_code| {
h_code.iter()
.take_while(|(changed_at, _)| changed_at <= &at)
.last()
.map(|(_, code)| code.clone())
})
}
fn candidate_pending_availability(
&self,
para: ParaId,
) -> Option {
self.candidate_pending_availability.get(¶).map(|c| c.clone())
}
fn candidate_events(&self) -> Vec {
self.candidate_events.clone()
}
fn validator_discovery(ids: Vec) -> Vec