initial prometheus metrics (#1536)

* service-new: cosmetic changes

* overseer: draft of prometheus metrics

* metrics: update active_leaves metrics

* metrics: extract into functions

* metrics: resolve XXX

* metrics: it's ugly, but it works

* Bump Substrate

* metrics: move a bunch of code around

* Bumb substrate again

* metrics: fix a warning

* fix a warning in runtime

* metrics: statements signed

* metrics: statements impl RegisterMetrics

* metrics: refactor Metrics trait

* metrics: add Metrics assoc type to JobTrait

* metrics: move Metrics trait to util

* metrics: fix overseer

* metrics: fix backing

* metrics: fix candidate validation

* metrics: derive Default

* metrics: docs

* metrics: add stubs for other subsystems

* metrics: add more stubs and fix compilation

* metrics: fix doctest

* metrics: move to subsystem

* metrics: fix candidate validation

* metrics: bitfield signing

* metrics: av store

* metrics: chain API

* metrics: runtime API

* metrics: stub for avad

* metrics: candidates seconded

* metrics: ok I gave up

* metrics: provisioner

* metrics: remove a clone by requiring Metrics: Sync

* metrics: YAGNI

* metrics: remove another TODO

* metrics: for later

* metrics: add parachain_ prefix

* metrics: s/signed_statement/signed_statements

* utils: add a comment for job metrics

* metrics: address review comments

* metrics: oops

* metrics: make sure to save files before commit 😅

* use _total suffix for requests metrics

Co-authored-by: Max Inden <mail@max-inden.de>

* metrics: add tests for overseer

* update Cargo.lock

* overseer: add a test for CollationGeneration

* collation-generation: impl metrics

* collation-generation: use kebab-case for name

* collation-generation: add a constructor

Co-authored-by: Gav Wood <gavin@parity.io>
Co-authored-by: Ashley Ruglys <ashley.ruglys@gmail.com>
Co-authored-by: Max Inden <mail@max-inden.de>
This commit is contained in:
Andronik Ordian
2020-08-18 11:18:54 +02:00
committed by GitHub
parent ae37a00c17
commit e7ead40255
20 changed files with 742 additions and 106 deletions
+55 -11
View File
@@ -34,6 +34,7 @@ use polkadot_primitives::v1::{
};
use polkadot_subsystem::{
FromOverseer, SubsystemError, Subsystem, SubsystemContext, SpawnedSubsystem,
metrics::{self, prometheus},
};
use polkadot_subsystem::messages::AvailabilityStoreMessage;
@@ -59,6 +60,7 @@ enum Error {
/// An implementation of the Availability Store subsystem.
pub struct AvailabilityStoreSubsystem {
inner: Arc<dyn KeyValueDB>,
metrics: Metrics,
}
fn available_data_key(candidate_hash: &Hash) -> Vec<u8> {
@@ -85,7 +87,7 @@ pub struct Config {
impl AvailabilityStoreSubsystem {
/// Create a new `AvailabilityStoreSubsystem` with a given config on disk.
pub fn new_on_disk(config: Config) -> io::Result<Self> {
pub fn new_on_disk(config: Config, metrics: Metrics) -> io::Result<Self> {
let mut db_config = DatabaseConfig::with_columns(columns::NUM_COLUMNS);
if let Some(cache_size) = config.cache_size {
@@ -106,6 +108,7 @@ impl AvailabilityStoreSubsystem {
Ok(Self {
inner: Arc::new(db),
metrics,
})
}
@@ -113,6 +116,7 @@ impl AvailabilityStoreSubsystem {
fn new_in_memory(inner: Arc<dyn KeyValueDB>) -> Self {
Self {
inner,
metrics: Metrics(None),
}
}
}
@@ -130,7 +134,7 @@ where
Ok(FromOverseer::Signal(Conclude)) => break,
Ok(FromOverseer::Signal(_)) => (),
Ok(FromOverseer::Communication { msg }) => {
process_message(&subsystem.inner, msg)?;
process_message(&subsystem.inner, &subsystem.metrics, msg)?;
}
Err(_) => break,
}
@@ -142,7 +146,7 @@ where
Ok(())
}
fn process_message(db: &Arc<dyn KeyValueDB>, msg: AvailabilityStoreMessage) -> Result<(), Error> {
fn process_message(db: &Arc<dyn KeyValueDB>, metrics: &Metrics, msg: AvailabilityStoreMessage) -> Result<(), Error> {
use AvailabilityStoreMessage::*;
match msg {
QueryAvailableData(hash, tx) => {
@@ -152,10 +156,10 @@ fn process_message(db: &Arc<dyn KeyValueDB>, msg: AvailabilityStoreMessage) -> R
tx.send(available_data(db, &hash).is_some()).map_err(|_| oneshot::Canceled)?;
}
QueryChunk(hash, id, tx) => {
tx.send(get_chunk(db, &hash, id)?).map_err(|_| oneshot::Canceled)?;
tx.send(get_chunk(db, &hash, id, metrics)?).map_err(|_| oneshot::Canceled)?;
}
QueryChunkAvailability(hash, id, tx) => {
tx.send(get_chunk(db, &hash, id)?.is_some()).map_err(|_| oneshot::Canceled)?;
tx.send(get_chunk(db, &hash, id, metrics)?.is_some()).map_err(|_| oneshot::Canceled)?;
}
StoreChunk(hash, id, chunk, tx) => {
match store_chunk(db, &hash, id, chunk) {
@@ -169,7 +173,7 @@ fn process_message(db: &Arc<dyn KeyValueDB>, msg: AvailabilityStoreMessage) -> R
}
}
StoreAvailableData(hash, id, n_validators, av_data, tx) => {
match store_available_data(db, &hash, id, n_validators, av_data) {
match store_available_data(db, &hash, id, n_validators, av_data, metrics) {
Err(e) => {
tx.send(Err(())).map_err(|_| oneshot::Canceled)?;
return Err(e);
@@ -194,11 +198,12 @@ fn store_available_data(
id: Option<ValidatorIndex>,
n_validators: u32,
available_data: AvailableData,
metrics: &Metrics,
) -> Result<(), Error> {
let mut tx = DBTransaction::new();
if let Some(index) = id {
let chunks = get_chunks(&available_data, n_validators as usize)?;
let chunks = get_chunks(&available_data, n_validators as usize, metrics)?;
store_chunk(db, candidate_hash, n_validators, chunks[index as usize].clone())?;
}
@@ -231,7 +236,7 @@ fn store_chunk(db: &Arc<dyn KeyValueDB>, candidate_hash: &Hash, _n_validators: u
Ok(())
}
fn get_chunk(db: &Arc<dyn KeyValueDB>, candidate_hash: &Hash, index: u32)
fn get_chunk(db: &Arc<dyn KeyValueDB>, candidate_hash: &Hash, index: u32, metrics: &Metrics)
-> Result<Option<ErasureChunk>, Error>
{
if let Some(chunk) = query_inner(
@@ -242,7 +247,7 @@ fn get_chunk(db: &Arc<dyn KeyValueDB>, candidate_hash: &Hash, index: u32)
}
if let Some(data) = available_data(db, candidate_hash) {
let mut chunks = get_chunks(&data.data, data.n_validators as usize)?;
let mut chunks = get_chunks(&data.data, data.n_validators as usize, metrics)?;
let desired_chunk = chunks.get(index as usize).cloned();
for chunk in chunks.drain(..) {
store_chunk(db, candidate_hash, data.n_validators, chunk)?;
@@ -271,6 +276,8 @@ impl<Context> Subsystem<Context> for AvailabilityStoreSubsystem
where
Context: SubsystemContext<Message=AvailabilityStoreMessage>,
{
type Metrics = Metrics;
fn start(self, ctx: Context) -> SpawnedSubsystem {
let future = Box::pin(async move {
if let Err(e) = run(self, ctx).await {
@@ -285,8 +292,9 @@ impl<Context> Subsystem<Context> for AvailabilityStoreSubsystem
}
}
fn get_chunks(data: &AvailableData, n_validators: usize) -> Result<Vec<ErasureChunk>, Error> {
fn get_chunks(data: &AvailableData, n_validators: usize, metrics: &Metrics) -> Result<Vec<ErasureChunk>, Error> {
let chunks = erasure::obtain_chunks_v1(n_validators, data)?;
metrics.on_chunks_received(chunks.len());
let branches = erasure::branches(chunks.as_ref());
Ok(chunks
@@ -302,6 +310,41 @@ fn get_chunks(data: &AvailableData, n_validators: usize) -> Result<Vec<ErasureCh
)
}
#[derive(Clone)]
struct MetricsInner {
received_availability_chunks_total: prometheus::Counter<prometheus::U64>,
}
/// Availability metrics.
#[derive(Default, Clone)]
pub struct Metrics(Option<MetricsInner>);
impl Metrics {
fn on_chunks_received(&self, count: usize) {
if let Some(metrics) = &self.0 {
use core::convert::TryFrom as _;
// assume usize fits into u64
let by = u64::try_from(count).unwrap_or_default();
metrics.received_availability_chunks_total.inc_by(by);
}
}
}
impl metrics::Metrics for Metrics {
fn try_register(registry: &prometheus::Registry) -> Result<Self, prometheus::PrometheusError> {
let metrics = MetricsInner {
received_availability_chunks_total: prometheus::register(
prometheus::Counter::new(
"parachain_received_availability_chunks_total",
"Number of availability chunks received.",
)?,
registry,
)?,
};
Ok(Metrics(Some(metrics)))
}
}
#[cfg(test)]
mod tests {
use super::*;
@@ -501,7 +544,8 @@ mod tests {
omitted_validation,
};
let chunks_expected = get_chunks(&available_data, n_validators as usize).unwrap();
let no_metrics = Metrics(None);
let chunks_expected = get_chunks(&available_data, n_validators as usize, &no_metrics).unwrap();
let (tx, rx) = oneshot::channel();
let block_msg = AvailabilityStoreMessage::StoreAvailableData(
+57 -3
View File
@@ -45,6 +45,7 @@ use polkadot_subsystem::{
ProvisionerMessage, RuntimeApiMessage, StatementDistributionMessage, ValidationFailed,
RuntimeApiRequest,
},
metrics::{self, prometheus},
};
use polkadot_node_subsystem_util::{
self as util,
@@ -100,6 +101,7 @@ struct CandidateBackingJob {
reported_misbehavior_for: HashSet<ValidatorIndex>,
table: Table<TableContext>,
table_context: TableContext,
metrics: Metrics,
}
const fn group_quorum(n_validators: usize) -> usize {
@@ -432,6 +434,7 @@ impl CandidateBackingJob {
&candidate,
pov,
).await {
self.metrics.on_candidate_seconded();
self.seconded = Some(candidate_hash);
}
}
@@ -528,7 +531,9 @@ impl CandidateBackingJob {
}
fn sign_statement(&self, statement: Statement) -> Option<SignedFullStatement> {
Some(self.table_context.validator.as_ref()?.sign(statement))
let signed = self.table_context.validator.as_ref()?.sign(statement);
self.metrics.on_statement_signed();
Some(signed)
}
fn check_statement_signature(&self, statement: &SignedFullStatement) -> Result<(), Error> {
@@ -672,12 +677,14 @@ impl util::JobTrait for CandidateBackingJob {
type FromJob = FromJob;
type Error = Error;
type RunArgs = KeyStorePtr;
type Metrics = Metrics;
const NAME: &'static str = "CandidateBackingJob";
fn run(
parent: Hash,
keystore: KeyStorePtr,
metrics: Metrics,
rx_to: mpsc::Receiver<Self::ToJob>,
mut tx_from: mpsc::Sender<Self::FromJob>,
) -> Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send>> {
@@ -764,6 +771,7 @@ impl util::JobTrait for CandidateBackingJob {
reported_misbehavior_for: HashSet::new(),
table: Table::default(),
table_context,
metrics,
};
job.run_loop().await
@@ -772,7 +780,53 @@ impl util::JobTrait for CandidateBackingJob {
}
}
delegated_subsystem!(CandidateBackingJob(KeyStorePtr) <- ToJob as CandidateBackingSubsystem);
#[derive(Clone)]
struct MetricsInner {
signed_statements_total: prometheus::Counter<prometheus::U64>,
candidates_seconded_total: prometheus::Counter<prometheus::U64>
}
/// Candidate backing metrics.
#[derive(Default, Clone)]
pub struct Metrics(Option<MetricsInner>);
impl Metrics {
fn on_statement_signed(&self) {
if let Some(metrics) = &self.0 {
metrics.signed_statements_total.inc();
}
}
fn on_candidate_seconded(&self) {
if let Some(metrics) = &self.0 {
metrics.candidates_seconded_total.inc();
}
}
}
impl metrics::Metrics for Metrics {
fn try_register(registry: &prometheus::Registry) -> Result<Self, prometheus::PrometheusError> {
let metrics = MetricsInner {
signed_statements_total: prometheus::register(
prometheus::Counter::new(
"parachain_signed_statements_total",
"Number of statements signed.",
)?,
registry,
)?,
candidates_seconded_total: prometheus::register(
prometheus::Counter::new(
"parachain_candidates_seconded_total",
"Number of candidates seconded.",
)?,
registry,
)?,
};
Ok(Metrics(Some(metrics)))
}
}
delegated_subsystem!(CandidateBackingJob(KeyStorePtr, Metrics) <- ToJob as CandidateBackingSubsystem);
#[cfg(test)]
mod tests {
@@ -904,7 +958,7 @@ mod tests {
let (context, virtual_overseer) = polkadot_node_subsystem_test_helpers::make_subsystem_context(pool.clone());
let subsystem = CandidateBackingSubsystem::run(context, keystore, pool.clone());
let subsystem = CandidateBackingSubsystem::run(context, keystore, Metrics(None), pool.clone());
let test_fut = test(TestHarness {
virtual_overseer,
@@ -29,6 +29,7 @@ use polkadot_node_subsystem::{
BitfieldSigningMessage, CandidateBackingMessage, RuntimeApiMessage,
},
errors::RuntimeApiError,
metrics::{self, prometheus},
};
use polkadot_node_subsystem_util::{
self as util, JobManager, JobTrait, ToJobTrait, Validator
@@ -252,11 +253,44 @@ async fn construct_availability_bitfield(
}
}
#[derive(Clone)]
struct MetricsInner {
bitfields_signed_total: prometheus::Counter<prometheus::U64>,
}
/// Bitfield signing metrics.
#[derive(Default, Clone)]
pub struct Metrics(Option<MetricsInner>);
impl Metrics {
fn on_bitfield_signed(&self) {
if let Some(metrics) = &self.0 {
metrics.bitfields_signed_total.inc();
}
}
}
impl metrics::Metrics for Metrics {
fn try_register(registry: &prometheus::Registry) -> Result<Self, prometheus::PrometheusError> {
let metrics = MetricsInner {
bitfields_signed_total: prometheus::register(
prometheus::Counter::new(
"parachain_bitfields_signed_total",
"Number of bitfields signed.",
)?,
registry,
)?,
};
Ok(Metrics(Some(metrics)))
}
}
impl JobTrait for BitfieldSigningJob {
type ToJob = ToJob;
type FromJob = FromJob;
type Error = Error;
type RunArgs = KeyStorePtr;
type Metrics = Metrics;
const NAME: &'static str = "BitfieldSigningJob";
@@ -264,6 +298,7 @@ impl JobTrait for BitfieldSigningJob {
fn run(
relay_parent: Hash,
keystore: Self::RunArgs,
metrics: Self::Metrics,
_receiver: mpsc::Receiver<ToJob>,
mut sender: mpsc::Sender<FromJob>,
) -> Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send>> {
@@ -295,6 +330,7 @@ impl JobTrait for BitfieldSigningJob {
};
let signed_bitfield = validator.sign(bitfield);
metrics.on_bitfield_signed();
// make an anonymous scope to contain some use statements to simplify creating the outbound message
{
@@ -23,9 +23,11 @@
use polkadot_subsystem::{
Subsystem, SubsystemContext, SpawnedSubsystem, SubsystemResult,
FromOverseer, OverseerSignal,
};
use polkadot_subsystem::messages::{
AllMessages, CandidateValidationMessage, RuntimeApiMessage, ValidationFailed, RuntimeApiRequest,
messages::{
AllMessages, CandidateValidationMessage, RuntimeApiMessage,
ValidationFailed, RuntimeApiRequest,
},
metrics::{self, prometheus},
};
use polkadot_subsystem::errors::RuntimeApiError;
use polkadot_node_primitives::{ValidationResult, ValidationOutputs, InvalidCandidate};
@@ -45,13 +47,63 @@ use futures::prelude::*;
use std::sync::Arc;
const LOG_TARGET: &'static str = "candidate_validation";
/// The candidate validation subsystem.
pub struct CandidateValidationSubsystem<S>(S);
pub struct CandidateValidationSubsystem<S> {
spawn: S,
metrics: Metrics,
}
#[derive(Clone)]
struct MetricsInner {
validation_requests: prometheus::CounterVec<prometheus::U64>,
}
/// Candidate validation metrics.
#[derive(Default, Clone)]
pub struct Metrics(Option<MetricsInner>);
impl Metrics {
fn on_validation_event(&self, event: &Result<ValidationResult, ValidationFailed>) {
if let Some(metrics) = &self.0 {
match event {
Ok(ValidationResult::Valid(_)) => {
metrics.validation_requests.with_label_values(&["valid"]).inc();
},
Ok(ValidationResult::Invalid(_)) => {
metrics.validation_requests.with_label_values(&["invalid"]).inc();
},
Err(_) => {
metrics.validation_requests.with_label_values(&["failed"]).inc();
},
}
}
}
}
impl metrics::Metrics for Metrics {
fn try_register(registry: &prometheus::Registry) -> Result<Self, prometheus::PrometheusError> {
let metrics = MetricsInner {
validation_requests: prometheus::register(
prometheus::CounterVec::new(
prometheus::Opts::new(
"parachain_validation_requests_total",
"Number of validation requests served.",
),
&["valid", "invalid", "failed"],
)?,
registry,
)?,
};
Ok(Metrics(Some(metrics)))
}
}
impl<S> CandidateValidationSubsystem<S> {
/// Create a new `CandidateValidationSubsystem` with the given task spawner.
pub fn new(spawn: S) -> Self {
CandidateValidationSubsystem(spawn)
pub fn new(spawn: S, metrics: Metrics) -> Self {
CandidateValidationSubsystem { spawn, metrics }
}
}
@@ -59,10 +111,12 @@ impl<S, C> Subsystem<C> for CandidateValidationSubsystem<S> where
C: SubsystemContext<Message = CandidateValidationMessage>,
S: SpawnNamed + Clone + 'static,
{
type Metrics = Metrics;
fn start(self, ctx: C) -> SpawnedSubsystem {
SpawnedSubsystem {
name: "candidate-validation-subsystem",
future: run(ctx, self.0).map(|_| ()).boxed(),
future: run(ctx, self.spawn, self.metrics).map(|_| ()).boxed(),
}
}
}
@@ -70,6 +124,7 @@ impl<S, C> Subsystem<C> for CandidateValidationSubsystem<S> where
async fn run(
mut ctx: impl SubsystemContext<Message = CandidateValidationMessage>,
spawn: impl SpawnNamed + Clone + 'static,
metrics: Metrics,
)
-> SubsystemResult<()>
{
@@ -95,8 +150,11 @@ async fn run(
).await;
match res {
Ok(x) => { let _ = response_sender.send(x); }
Err(e)=> return Err(e),
Ok(x) => {
metrics.on_validation_event(&x);
let _ = response_sender.send(x);
}
Err(e) => return Err(e),
}
}
CandidateValidationMessage::ValidateFromExhaustive(
@@ -117,13 +175,16 @@ async fn run(
).await;
match res {
Ok(x) => if let Err(_e) = response_sender.send(x) {
log::warn!(
target: "candidate_validation",
"Requester of candidate validation dropped",
)
Ok(x) => {
metrics.on_validation_event(&x);
if let Err(_e) = response_sender.send(x) {
log::warn!(
target: LOG_TARGET,
"Requester of candidate validation dropped",
)
}
},
Err(e)=> return Err(e),
Err(e) => return Err(e),
}
}
}
@@ -237,7 +298,7 @@ async fn spawn_validate_from_chain_state(
Ok(g) => g,
Err(e) => {
log::warn!(
target: "candidate_validation",
target: LOG_TARGET,
"Error making runtime API request: {:?}",
e,
);
+60 -9
View File
@@ -30,6 +30,7 @@ use polkadot_subsystem::{
FromOverseer, OverseerSignal,
SpawnedSubsystem, Subsystem, SubsystemResult, SubsystemContext,
messages::ChainApiMessage,
metrics::{self, prometheus},
};
use polkadot_primitives::v1::{Block, BlockId};
use sp_blockchain::HeaderBackend;
@@ -39,13 +40,15 @@ use futures::prelude::*;
/// The Chain API Subsystem implementation.
pub struct ChainApiSubsystem<Client> {
client: Client,
metrics: Metrics,
}
impl<Client> ChainApiSubsystem<Client> {
/// Create a new Chain API subsystem with the given client.
pub fn new(client: Client) -> Self {
pub fn new(client: Client, metrics: Metrics) -> Self {
ChainApiSubsystem {
client
client,
metrics,
}
}
}
@@ -54,9 +57,11 @@ impl<Client, Context> Subsystem<Context> for ChainApiSubsystem<Client> where
Client: HeaderBackend<Block> + 'static,
Context: SubsystemContext<Message = ChainApiMessage>
{
type Metrics = Metrics;
fn start(self, ctx: Context) -> SpawnedSubsystem {
SpawnedSubsystem {
future: run(ctx, self.client).map(|_| ()).boxed(),
future: run(ctx, self).map(|_| ()).boxed(),
name: "chain-api-subsystem",
}
}
@@ -64,7 +69,7 @@ impl<Client, Context> Subsystem<Context> for ChainApiSubsystem<Client> where
async fn run<Client>(
mut ctx: impl SubsystemContext<Message = ChainApiMessage>,
client: Client,
subsystem: ChainApiSubsystem<Client>,
) -> SubsystemResult<()>
where
Client: HeaderBackend<Block>,
@@ -76,23 +81,27 @@ where
FromOverseer::Signal(OverseerSignal::BlockFinalized(_)) => {},
FromOverseer::Communication { msg } => match msg {
ChainApiMessage::BlockNumber(hash, response_channel) => {
let result = client.number(hash).map_err(|e| e.to_string().into());
let result = subsystem.client.number(hash).map_err(|e| e.to_string().into());
subsystem.metrics.on_request(result.is_ok());
let _ = response_channel.send(result);
},
ChainApiMessage::FinalizedBlockHash(number, response_channel) => {
// Note: we don't verify it's finalized
let result = client.hash(number).map_err(|e| e.to_string().into());
let result = subsystem.client.hash(number).map_err(|e| e.to_string().into());
subsystem.metrics.on_request(result.is_ok());
let _ = response_channel.send(result);
},
ChainApiMessage::FinalizedBlockNumber(response_channel) => {
let result = client.info().finalized_number;
let result = subsystem.client.info().finalized_number;
// always succeeds
subsystem.metrics.on_request(true);
let _ = response_channel.send(Ok(result));
},
ChainApiMessage::Ancestors { hash, k, response_channel } => {
let mut hash = hash;
let next_parent = core::iter::from_fn(|| {
let maybe_header = client.header(BlockId::Hash(hash));
let maybe_header = subsystem.client.header(BlockId::Hash(hash));
match maybe_header {
// propagate the error
Err(e) => Some(Err(e.to_string().into())),
@@ -106,6 +115,7 @@ where
});
let result = next_parent.take(k).collect::<Result<Vec<_>, _>>();
subsystem.metrics.on_request(result.is_ok());
let _ = response_channel.send(result);
},
}
@@ -113,6 +123,46 @@ where
}
}
#[derive(Clone)]
struct MetricsInner {
chain_api_requests: prometheus::CounterVec<prometheus::U64>,
}
/// Chain API metrics.
#[derive(Default, Clone)]
pub struct Metrics(Option<MetricsInner>);
impl Metrics {
fn on_request(&self, succeeded: bool) {
if let Some(metrics) = &self.0 {
if succeeded {
metrics.chain_api_requests.with_label_values(&["succeeded"]).inc();
} else {
metrics.chain_api_requests.with_label_values(&["failed"]).inc();
}
}
}
}
impl metrics::Metrics for Metrics {
fn try_register(registry: &prometheus::Registry) -> Result<Self, prometheus::PrometheusError> {
let metrics = MetricsInner {
chain_api_requests: prometheus::register(
prometheus::CounterVec::new(
prometheus::Opts::new(
"parachain_chain_api_requests_total",
"Number of Chain API requests served.",
),
&["succeeded", "failed"],
)?,
registry,
)?,
};
Ok(Metrics(Some(metrics)))
}
}
#[cfg(test)]
mod tests {
use super::*;
@@ -238,7 +288,8 @@ mod tests {
let (ctx, ctx_handle) = make_subsystem_context(TaskExecutor::new());
let client = TestClient::default();
let chain_api_task = run(ctx, client.clone()).map(|x| x.unwrap());
let subsystem = ChainApiSubsystem::new(client.clone(), Metrics(None));
let chain_api_task = run(ctx, subsystem).map(|x| x.unwrap());
let test_task = test(client, ctx_handle);
futures::executor::block_on(future::join(chain_api_task, test_task));
+54 -13
View File
@@ -30,6 +30,7 @@ use polkadot_node_subsystem::{
AllMessages, ChainApiMessage, ProvisionableData, ProvisionerInherentData,
ProvisionerMessage, RuntimeApiMessage,
},
metrics::{self, prometheus},
};
use polkadot_node_subsystem_util::{
self as util,
@@ -50,6 +51,7 @@ struct ProvisioningJob {
provisionable_data_channels: Vec<mpsc::Sender<ProvisionableData>>,
backed_candidates: Vec<BackedCandidate>,
signed_bitfields: Vec<SignedAvailabilityBitfield>,
metrics: Metrics,
}
/// This enum defines the messages that the provisioner is prepared to receive.
@@ -134,6 +136,7 @@ impl JobTrait for ProvisioningJob {
type FromJob = FromJob;
type Error = Error;
type RunArgs = ();
type Metrics = Metrics;
const NAME: &'static str = "ProvisioningJob";
@@ -143,11 +146,12 @@ impl JobTrait for ProvisioningJob {
fn run(
relay_parent: Hash,
_run_args: Self::RunArgs,
metrics: Self::Metrics,
receiver: mpsc::Receiver<ToJob>,
sender: mpsc::Sender<FromJob>,
) -> Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send>> {
async move {
let job = ProvisioningJob::new(relay_parent, sender, receiver);
let job = ProvisioningJob::new(relay_parent, metrics, sender, receiver);
// it isn't necessary to break run_loop into its own function,
// but it's convenient to separate the concerns in this way
@@ -160,6 +164,7 @@ impl JobTrait for ProvisioningJob {
impl ProvisioningJob {
pub fn new(
relay_parent: Hash,
metrics: Metrics,
sender: mpsc::Sender<FromJob>,
receiver: mpsc::Receiver<ToJob>,
) -> Self {
@@ -170,6 +175,7 @@ impl ProvisioningJob {
provisionable_data_channels: Vec::new(),
backed_candidates: Vec::new(),
signed_bitfields: Vec::new(),
metrics,
}
}
@@ -190,7 +196,10 @@ impl ProvisioningJob {
)
.await
{
log::warn!(target: "provisioner", "failed to send inherent data: {:?}", err);
log::warn!(target: "provisioner", "failed to assemble or send inherent data: {:?}", err);
self.metrics.on_inherent_data_request(false);
} else {
self.metrics.on_inherent_data_request(true);
}
}
ToJob::Provisioner(RequestBlockAuthorshipData(_, sender)) => {
@@ -275,17 +284,9 @@ async fn send_inherent_data(
return_sender: oneshot::Sender<ProvisionerInherentData>,
mut from_job: mpsc::Sender<FromJob>,
) -> Result<(), Error> {
let availability_cores = match request_availability_cores(relay_parent, &mut from_job)
let availability_cores = request_availability_cores(relay_parent, &mut from_job)
.await?
.await?
{
Ok(cores) => cores,
Err(runtime_err) => {
// Don't take down the node on runtime API errors.
log::warn!(target: "provisioner", "Encountered a runtime API error: {:?}", runtime_err);
return Ok(());
}
};
.await??;
let bitfields = select_availability_bitfields(&availability_cores, bitfields);
let candidates = select_candidates(
@@ -467,7 +468,47 @@ fn bitfields_indicate_availability(
3 * availability.count_ones() >= 2 * availability.len()
}
delegated_subsystem!(ProvisioningJob(()) <- ToJob as ProvisioningSubsystem);
#[derive(Clone)]
struct MetricsInner {
inherent_data_requests: prometheus::CounterVec<prometheus::U64>,
}
/// Candidate backing metrics.
#[derive(Default, Clone)]
pub struct Metrics(Option<MetricsInner>);
impl Metrics {
fn on_inherent_data_request(&self, succeeded: bool) {
if let Some(metrics) = &self.0 {
if succeeded {
metrics.inherent_data_requests.with_label_values(&["succeded"]).inc();
} else {
metrics.inherent_data_requests.with_label_values(&["failed"]).inc();
}
}
}
}
impl metrics::Metrics for Metrics {
fn try_register(registry: &prometheus::Registry) -> Result<Self, prometheus::PrometheusError> {
let metrics = MetricsInner {
inherent_data_requests: prometheus::register(
prometheus::CounterVec::new(
prometheus::Opts::new(
"parachain_inherent_data_requests_total",
"Number of InherentData requests served by provisioner.",
),
&["succeeded", "failed"],
)?,
registry,
)?,
};
Ok(Metrics(Some(metrics)))
}
}
delegated_subsystem!(ProvisioningJob((), Metrics) <- ToJob as ProvisioningSubsystem);
#[cfg(test)]
mod tests {
+73 -17
View File
@@ -22,6 +22,7 @@
use polkadot_subsystem::{
Subsystem, SpawnedSubsystem, SubsystemResult, SubsystemContext,
FromOverseer, OverseerSignal,
metrics::{self, prometheus},
};
use polkadot_subsystem::messages::{
RuntimeApiMessage, RuntimeApiRequest as Request,
@@ -34,12 +35,15 @@ use sp_api::{ProvideRuntimeApi};
use futures::prelude::*;
/// The `RuntimeApiSubsystem`. See module docs for more details.
pub struct RuntimeApiSubsystem<Client>(Client);
pub struct RuntimeApiSubsystem<Client> {
client: Client,
metrics: Metrics,
}
impl<Client> RuntimeApiSubsystem<Client> {
/// Create a new Runtime API subsystem wrapping the given client.
pub fn new(client: Client) -> Self {
RuntimeApiSubsystem(client)
/// Create a new Runtime API subsystem wrapping the given client and metrics.
pub fn new(client: Client, metrics: Metrics) -> Self {
RuntimeApiSubsystem { client, metrics }
}
}
@@ -48,9 +52,11 @@ impl<Client, Context> Subsystem<Context> for RuntimeApiSubsystem<Client> where
Client::Api: ParachainHost<Block>,
Context: SubsystemContext<Message = RuntimeApiMessage>
{
type Metrics = Metrics;
fn start(self, ctx: Context) -> SpawnedSubsystem {
SpawnedSubsystem {
future: run(ctx, self.0).map(|_| ()).boxed(),
future: run(ctx, self).map(|_| ()).boxed(),
name: "runtime-api-subsystem",
}
}
@@ -58,7 +64,7 @@ impl<Client, Context> Subsystem<Context> for RuntimeApiSubsystem<Client> where
async fn run<Client>(
mut ctx: impl SubsystemContext<Message = RuntimeApiMessage>,
client: Client,
subsystem: RuntimeApiSubsystem<Client>,
) -> SubsystemResult<()> where
Client: ProvideRuntimeApi<Block>,
Client::Api: ParachainHost<Block>,
@@ -70,7 +76,8 @@ async fn run<Client>(
FromOverseer::Signal(OverseerSignal::BlockFinalized(_)) => {},
FromOverseer::Communication { msg } => match msg {
RuntimeApiMessage::Request(relay_parent, request) => make_runtime_api_request(
&client,
&subsystem.client,
&subsystem.metrics,
relay_parent,
request,
),
@@ -81,6 +88,7 @@ async fn run<Client>(
fn make_runtime_api_request<Client>(
client: &Client,
metrics: &Metrics,
relay_parent: Hash,
request: Request,
) where
@@ -93,7 +101,7 @@ fn make_runtime_api_request<Client>(
let api = client.runtime_api();
let res = api.$api_name(&BlockId::Hash(relay_parent), $($param),*)
.map_err(|e| RuntimeApiError::from(format!("{:?}", e)));
metrics.on_request(res.is_ok());
let _ = sender.send(res);
}}
}
@@ -114,6 +122,45 @@ fn make_runtime_api_request<Client>(
}
}
#[derive(Clone)]
struct MetricsInner {
chain_api_requests: prometheus::CounterVec<prometheus::U64>,
}
/// Runtime API metrics.
#[derive(Default, Clone)]
pub struct Metrics(Option<MetricsInner>);
impl Metrics {
fn on_request(&self, succeeded: bool) {
if let Some(metrics) = &self.0 {
if succeeded {
metrics.chain_api_requests.with_label_values(&["succeeded"]).inc();
} else {
metrics.chain_api_requests.with_label_values(&["failed"]).inc();
}
}
}
}
impl metrics::Metrics for Metrics {
fn try_register(registry: &prometheus::Registry) -> Result<Self, prometheus::PrometheusError> {
let metrics = MetricsInner {
chain_api_requests: prometheus::register(
prometheus::CounterVec::new(
prometheus::Opts::new(
"parachain_runtime_api_requests_total",
"Number of Runtime API requests served.",
),
&["succeeded", "failed"],
)?,
registry,
)?,
};
Ok(Metrics(Some(metrics)))
}
}
#[cfg(test)]
mod tests {
use super::*;
@@ -216,7 +263,8 @@ mod tests {
let runtime_api = MockRuntimeApi::default();
let relay_parent = [1; 32].into();
let subsystem_task = run(ctx, runtime_api.clone()).map(|x| x.unwrap());
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None));
let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
let test_task = async move {
let (tx, rx) = oneshot::channel();
@@ -238,7 +286,8 @@ mod tests {
let runtime_api = MockRuntimeApi::default();
let relay_parent = [1; 32].into();
let subsystem_task = run(ctx, runtime_api.clone()).map(|x| x.unwrap());
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None));
let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
let test_task = async move {
let (tx, rx) = oneshot::channel();
@@ -260,7 +309,8 @@ mod tests {
let runtime_api = MockRuntimeApi::default();
let relay_parent = [1; 32].into();
let subsystem_task = run(ctx, runtime_api.clone()).map(|x| x.unwrap());
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None));
let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
let test_task = async move {
let (tx, rx) = oneshot::channel();
@@ -282,7 +332,8 @@ mod tests {
let runtime_api = MockRuntimeApi::default();
let relay_parent = [1; 32].into();
let subsystem_task = run(ctx, runtime_api.clone()).map(|x| x.unwrap());
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None));
let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
let test_task = async move {
let (tx, rx) = oneshot::channel();
@@ -308,7 +359,8 @@ mod tests {
runtime_api.local_validation_data.insert(para_a, Default::default());
let subsystem_task = run(ctx, runtime_api.clone()).map(|x| x.unwrap());
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None));
let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
let test_task = async move {
let (tx, rx) = oneshot::channel();
@@ -343,7 +395,8 @@ mod tests {
let runtime_api = MockRuntimeApi::default();
let relay_parent = [1; 32].into();
let subsystem_task = run(ctx, runtime_api.clone()).map(|x| x.unwrap());
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None));
let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
let test_task = async move {
let (tx, rx) = oneshot::channel();
@@ -369,7 +422,8 @@ mod tests {
runtime_api.validation_code.insert(para_a, Default::default());
let subsystem_task = run(ctx, runtime_api.clone()).map(|x| x.unwrap());
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None));
let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
let test_task = async move {
let (tx, rx) = oneshot::channel();
@@ -408,7 +462,8 @@ mod tests {
runtime_api.candidate_pending_availability.insert(para_a, Default::default());
let subsystem_task = run(ctx, runtime_api.clone()).map(|x| x.unwrap());
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None));
let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
let test_task = async move {
let (tx, rx) = oneshot::channel();
@@ -444,7 +499,8 @@ mod tests {
let runtime_api = MockRuntimeApi::default();
let relay_parent = [1; 32].into();
let subsystem_task = run(ctx, runtime_api.clone()).map(|x| x.unwrap());
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None));
let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
let test_task = async move {
let (tx, rx) = oneshot::channel();