diff --git a/polkadot/Cargo.lock b/polkadot/Cargo.lock index 900b257634..7c998fc0a5 100644 --- a/polkadot/Cargo.lock +++ b/polkadot/Cargo.lock @@ -4861,6 +4861,7 @@ dependencies = [ "sc-network", "smallvec 1.4.1", "sp-core", + "substrate-prometheus-endpoint", ] [[package]] diff --git a/polkadot/node/collation-generation/src/lib.rs b/polkadot/node/collation-generation/src/lib.rs index 3ad76ff7f7..793616d907 100644 --- a/polkadot/node/collation-generation/src/lib.rs +++ b/polkadot/node/collation-generation/src/lib.rs @@ -31,6 +31,7 @@ use polkadot_node_subsystem::{ errors::RuntimeApiError, messages::{AllMessages, CollationGenerationMessage, CollatorProtocolMessage}, FromOverseer, SpawnedSubsystem, Subsystem, SubsystemContext, SubsystemError, SubsystemResult, + metrics::{self, prometheus}, }; use polkadot_node_subsystem_util::{ self as util, request_availability_cores_ctx, request_global_validation_data_ctx, @@ -47,9 +48,18 @@ use std::sync::Arc; /// Collation Generation Subsystem pub struct CollationGenerationSubsystem { config: Option>, + metrics: Metrics, } impl CollationGenerationSubsystem { + /// Create a new instance of the `CollationGenerationSubsystem`. + pub fn new(metrics: Metrics) -> Self { + Self { + config: None, + metrics, + } + } + /// Run this subsystem /// /// Conceptually, this is very simple: it just loops forever. @@ -112,8 +122,9 @@ impl CollationGenerationSubsystem { Ok(Signal(ActiveLeaves(ActiveLeavesUpdate { activated, .. }))) => { // follow the procedure from the guide if let Some(config) = &self.config { + let metrics = self.metrics.clone(); if let Err(err) = - handle_new_activations(config.clone(), &activated, ctx, sender).await + handle_new_activations(config.clone(), &activated, ctx, metrics, sender).await { log::warn!(target: "collation_generation", "failed to handle new activations: {:?}", err); return true; @@ -146,13 +157,13 @@ impl Subsystem for CollationGenerationSubsystem where Context: SubsystemContext, { - fn start(self, ctx: Context) -> SpawnedSubsystem { - let subsystem = CollationGenerationSubsystem { config: None }; + type Metrics = Metrics; - let future = Box::pin(subsystem.run(ctx)); + fn start(self, ctx: Context) -> SpawnedSubsystem { + let future = Box::pin(self.run(ctx)); SpawnedSubsystem { - name: "CollationGenerationSubsystem", + name: "collation-generation-subsystem", future, } } @@ -178,6 +189,7 @@ async fn handle_new_activations( config: Arc, activated: &[Hash], ctx: &mut Context, + metrics: Metrics, sender: &mpsc::Sender, ) -> Result<()> { // follow the procedure from the guide: @@ -230,6 +242,7 @@ async fn handle_new_activations( let task_global_validation_data = global_validation_data.clone(); let task_config = config.clone(); let mut task_sender = sender.clone(); + let metrics = metrics.clone(); ctx.spawn("collation generation collation builder", Box::pin(async move { let validation_data_hash = validation_data_hash(&task_global_validation_data, &local_validation_data); @@ -273,6 +286,8 @@ async fn handle_new_activations( }, }; + metrics.on_collation_generated(); + if let Err(err) = task_sender.send(AllMessages::CollatorProtocol( CollatorProtocolMessage::DistributeCollation(ccr, collation.proof_of_validity) )).await { @@ -305,6 +320,38 @@ fn erasure_root( Ok(polkadot_erasure_coding::branches(&chunks).root()) } +#[derive(Clone)] +struct MetricsInner { + collations_generated_total: prometheus::Counter, +} + +/// CollationGenerationSubsystem metrics. +#[derive(Default, Clone)] +pub struct Metrics(Option); + +impl Metrics { + fn on_collation_generated(&self) { + if let Some(metrics) = &self.0 { + metrics.collations_generated_total.inc(); + } + } +} + +impl metrics::Metrics for Metrics { + fn try_register(registry: &prometheus::Registry) -> std::result::Result { + let metrics = MetricsInner { + collations_generated_total: prometheus::register( + prometheus::Counter::new( + "parachain_collations_generated_total", + "Number of collations generated." + )?, + registry, + )?, + }; + Ok(Metrics(Some(metrics))) + } +} + #[cfg(test)] mod tests { mod handle_new_activations { @@ -411,6 +458,7 @@ mod tests { test_config(123), &subsystem_activated_hashes, &mut ctx, + Metrics(None), &tx, ) .await @@ -498,7 +546,7 @@ mod tests { let (tx, _rx) = mpsc::channel(0); subsystem_test_harness(overseer, |mut ctx| async move { - handle_new_activations(test_config(16), &activated_hashes, &mut ctx, &tx) + handle_new_activations(test_config(16), &activated_hashes, &mut ctx, Metrics(None), &tx) .await .unwrap(); }); @@ -581,7 +629,7 @@ mod tests { let sent_messages = Arc::new(Mutex::new(Vec::new())); let subsystem_sent_messages = sent_messages.clone(); subsystem_test_harness(overseer, |mut ctx| async move { - handle_new_activations(subsystem_config, &activated_hashes, &mut ctx, &tx) + handle_new_activations(subsystem_config, &activated_hashes, &mut ctx, Metrics(None), &tx) .await .unwrap(); diff --git a/polkadot/node/core/av-store/src/lib.rs b/polkadot/node/core/av-store/src/lib.rs index 9c33052afc..84381e438e 100644 --- a/polkadot/node/core/av-store/src/lib.rs +++ b/polkadot/node/core/av-store/src/lib.rs @@ -34,6 +34,7 @@ use polkadot_primitives::v1::{ }; use polkadot_subsystem::{ FromOverseer, SubsystemError, Subsystem, SubsystemContext, SpawnedSubsystem, + metrics::{self, prometheus}, }; use polkadot_subsystem::messages::AvailabilityStoreMessage; @@ -59,6 +60,7 @@ enum Error { /// An implementation of the Availability Store subsystem. pub struct AvailabilityStoreSubsystem { inner: Arc, + metrics: Metrics, } fn available_data_key(candidate_hash: &Hash) -> Vec { @@ -85,7 +87,7 @@ pub struct Config { impl AvailabilityStoreSubsystem { /// Create a new `AvailabilityStoreSubsystem` with a given config on disk. - pub fn new_on_disk(config: Config) -> io::Result { + pub fn new_on_disk(config: Config, metrics: Metrics) -> io::Result { let mut db_config = DatabaseConfig::with_columns(columns::NUM_COLUMNS); if let Some(cache_size) = config.cache_size { @@ -106,6 +108,7 @@ impl AvailabilityStoreSubsystem { Ok(Self { inner: Arc::new(db), + metrics, }) } @@ -113,6 +116,7 @@ impl AvailabilityStoreSubsystem { fn new_in_memory(inner: Arc) -> Self { Self { inner, + metrics: Metrics(None), } } } @@ -130,7 +134,7 @@ where Ok(FromOverseer::Signal(Conclude)) => break, Ok(FromOverseer::Signal(_)) => (), Ok(FromOverseer::Communication { msg }) => { - process_message(&subsystem.inner, msg)?; + process_message(&subsystem.inner, &subsystem.metrics, msg)?; } Err(_) => break, } @@ -142,7 +146,7 @@ where Ok(()) } -fn process_message(db: &Arc, msg: AvailabilityStoreMessage) -> Result<(), Error> { +fn process_message(db: &Arc, metrics: &Metrics, msg: AvailabilityStoreMessage) -> Result<(), Error> { use AvailabilityStoreMessage::*; match msg { QueryAvailableData(hash, tx) => { @@ -152,10 +156,10 @@ fn process_message(db: &Arc, msg: AvailabilityStoreMessage) -> R tx.send(available_data(db, &hash).is_some()).map_err(|_| oneshot::Canceled)?; } QueryChunk(hash, id, tx) => { - tx.send(get_chunk(db, &hash, id)?).map_err(|_| oneshot::Canceled)?; + tx.send(get_chunk(db, &hash, id, metrics)?).map_err(|_| oneshot::Canceled)?; } QueryChunkAvailability(hash, id, tx) => { - tx.send(get_chunk(db, &hash, id)?.is_some()).map_err(|_| oneshot::Canceled)?; + tx.send(get_chunk(db, &hash, id, metrics)?.is_some()).map_err(|_| oneshot::Canceled)?; } StoreChunk(hash, id, chunk, tx) => { match store_chunk(db, &hash, id, chunk) { @@ -169,7 +173,7 @@ fn process_message(db: &Arc, msg: AvailabilityStoreMessage) -> R } } StoreAvailableData(hash, id, n_validators, av_data, tx) => { - match store_available_data(db, &hash, id, n_validators, av_data) { + match store_available_data(db, &hash, id, n_validators, av_data, metrics) { Err(e) => { tx.send(Err(())).map_err(|_| oneshot::Canceled)?; return Err(e); @@ -194,11 +198,12 @@ fn store_available_data( id: Option, n_validators: u32, available_data: AvailableData, + metrics: &Metrics, ) -> Result<(), Error> { let mut tx = DBTransaction::new(); if let Some(index) = id { - let chunks = get_chunks(&available_data, n_validators as usize)?; + let chunks = get_chunks(&available_data, n_validators as usize, metrics)?; store_chunk(db, candidate_hash, n_validators, chunks[index as usize].clone())?; } @@ -231,7 +236,7 @@ fn store_chunk(db: &Arc, candidate_hash: &Hash, _n_validators: u Ok(()) } -fn get_chunk(db: &Arc, candidate_hash: &Hash, index: u32) +fn get_chunk(db: &Arc, candidate_hash: &Hash, index: u32, metrics: &Metrics) -> Result, Error> { if let Some(chunk) = query_inner( @@ -242,7 +247,7 @@ fn get_chunk(db: &Arc, candidate_hash: &Hash, index: u32) } if let Some(data) = available_data(db, candidate_hash) { - let mut chunks = get_chunks(&data.data, data.n_validators as usize)?; + let mut chunks = get_chunks(&data.data, data.n_validators as usize, metrics)?; let desired_chunk = chunks.get(index as usize).cloned(); for chunk in chunks.drain(..) { store_chunk(db, candidate_hash, data.n_validators, chunk)?; @@ -271,6 +276,8 @@ impl Subsystem for AvailabilityStoreSubsystem where Context: SubsystemContext, { + type Metrics = Metrics; + fn start(self, ctx: Context) -> SpawnedSubsystem { let future = Box::pin(async move { if let Err(e) = run(self, ctx).await { @@ -285,8 +292,9 @@ impl Subsystem for AvailabilityStoreSubsystem } } -fn get_chunks(data: &AvailableData, n_validators: usize) -> Result, Error> { +fn get_chunks(data: &AvailableData, n_validators: usize, metrics: &Metrics) -> Result, Error> { let chunks = erasure::obtain_chunks_v1(n_validators, data)?; + metrics.on_chunks_received(chunks.len()); let branches = erasure::branches(chunks.as_ref()); Ok(chunks @@ -302,6 +310,41 @@ fn get_chunks(data: &AvailableData, n_validators: usize) -> Result, +} + +/// Availability metrics. +#[derive(Default, Clone)] +pub struct Metrics(Option); + +impl Metrics { + fn on_chunks_received(&self, count: usize) { + if let Some(metrics) = &self.0 { + use core::convert::TryFrom as _; + // assume usize fits into u64 + let by = u64::try_from(count).unwrap_or_default(); + metrics.received_availability_chunks_total.inc_by(by); + } + } +} + +impl metrics::Metrics for Metrics { + fn try_register(registry: &prometheus::Registry) -> Result { + let metrics = MetricsInner { + received_availability_chunks_total: prometheus::register( + prometheus::Counter::new( + "parachain_received_availability_chunks_total", + "Number of availability chunks received.", + )?, + registry, + )?, + }; + Ok(Metrics(Some(metrics))) + } +} + #[cfg(test)] mod tests { use super::*; @@ -501,7 +544,8 @@ mod tests { omitted_validation, }; - let chunks_expected = get_chunks(&available_data, n_validators as usize).unwrap(); + let no_metrics = Metrics(None); + let chunks_expected = get_chunks(&available_data, n_validators as usize, &no_metrics).unwrap(); let (tx, rx) = oneshot::channel(); let block_msg = AvailabilityStoreMessage::StoreAvailableData( diff --git a/polkadot/node/core/backing/src/lib.rs b/polkadot/node/core/backing/src/lib.rs index a77d98cd23..30a95965eb 100644 --- a/polkadot/node/core/backing/src/lib.rs +++ b/polkadot/node/core/backing/src/lib.rs @@ -45,6 +45,7 @@ use polkadot_subsystem::{ ProvisionerMessage, RuntimeApiMessage, StatementDistributionMessage, ValidationFailed, RuntimeApiRequest, }, + metrics::{self, prometheus}, }; use polkadot_node_subsystem_util::{ self as util, @@ -100,6 +101,7 @@ struct CandidateBackingJob { reported_misbehavior_for: HashSet, table: Table, table_context: TableContext, + metrics: Metrics, } const fn group_quorum(n_validators: usize) -> usize { @@ -432,6 +434,7 @@ impl CandidateBackingJob { &candidate, pov, ).await { + self.metrics.on_candidate_seconded(); self.seconded = Some(candidate_hash); } } @@ -528,7 +531,9 @@ impl CandidateBackingJob { } fn sign_statement(&self, statement: Statement) -> Option { - Some(self.table_context.validator.as_ref()?.sign(statement)) + let signed = self.table_context.validator.as_ref()?.sign(statement); + self.metrics.on_statement_signed(); + Some(signed) } fn check_statement_signature(&self, statement: &SignedFullStatement) -> Result<(), Error> { @@ -672,12 +677,14 @@ impl util::JobTrait for CandidateBackingJob { type FromJob = FromJob; type Error = Error; type RunArgs = KeyStorePtr; + type Metrics = Metrics; const NAME: &'static str = "CandidateBackingJob"; fn run( parent: Hash, keystore: KeyStorePtr, + metrics: Metrics, rx_to: mpsc::Receiver, mut tx_from: mpsc::Sender, ) -> Pin> + Send>> { @@ -764,6 +771,7 @@ impl util::JobTrait for CandidateBackingJob { reported_misbehavior_for: HashSet::new(), table: Table::default(), table_context, + metrics, }; job.run_loop().await @@ -772,7 +780,53 @@ impl util::JobTrait for CandidateBackingJob { } } -delegated_subsystem!(CandidateBackingJob(KeyStorePtr) <- ToJob as CandidateBackingSubsystem); +#[derive(Clone)] +struct MetricsInner { + signed_statements_total: prometheus::Counter, + candidates_seconded_total: prometheus::Counter +} + +/// Candidate backing metrics. +#[derive(Default, Clone)] +pub struct Metrics(Option); + +impl Metrics { + fn on_statement_signed(&self) { + if let Some(metrics) = &self.0 { + metrics.signed_statements_total.inc(); + } + } + + fn on_candidate_seconded(&self) { + if let Some(metrics) = &self.0 { + metrics.candidates_seconded_total.inc(); + } + } +} + +impl metrics::Metrics for Metrics { + fn try_register(registry: &prometheus::Registry) -> Result { + let metrics = MetricsInner { + signed_statements_total: prometheus::register( + prometheus::Counter::new( + "parachain_signed_statements_total", + "Number of statements signed.", + )?, + registry, + )?, + candidates_seconded_total: prometheus::register( + prometheus::Counter::new( + "parachain_candidates_seconded_total", + "Number of candidates seconded.", + )?, + registry, + )?, + }; + Ok(Metrics(Some(metrics))) + } +} + +delegated_subsystem!(CandidateBackingJob(KeyStorePtr, Metrics) <- ToJob as CandidateBackingSubsystem); #[cfg(test)] mod tests { @@ -904,7 +958,7 @@ mod tests { let (context, virtual_overseer) = polkadot_node_subsystem_test_helpers::make_subsystem_context(pool.clone()); - let subsystem = CandidateBackingSubsystem::run(context, keystore, pool.clone()); + let subsystem = CandidateBackingSubsystem::run(context, keystore, Metrics(None), pool.clone()); let test_fut = test(TestHarness { virtual_overseer, diff --git a/polkadot/node/core/bitfield-signing/src/lib.rs b/polkadot/node/core/bitfield-signing/src/lib.rs index feca8f0c8f..66badf3d18 100644 --- a/polkadot/node/core/bitfield-signing/src/lib.rs +++ b/polkadot/node/core/bitfield-signing/src/lib.rs @@ -29,6 +29,7 @@ use polkadot_node_subsystem::{ BitfieldSigningMessage, CandidateBackingMessage, RuntimeApiMessage, }, errors::RuntimeApiError, + metrics::{self, prometheus}, }; use polkadot_node_subsystem_util::{ self as util, JobManager, JobTrait, ToJobTrait, Validator @@ -252,11 +253,44 @@ async fn construct_availability_bitfield( } } +#[derive(Clone)] +struct MetricsInner { + bitfields_signed_total: prometheus::Counter, +} + +/// Bitfield signing metrics. +#[derive(Default, Clone)] +pub struct Metrics(Option); + +impl Metrics { + fn on_bitfield_signed(&self) { + if let Some(metrics) = &self.0 { + metrics.bitfields_signed_total.inc(); + } + } +} + +impl metrics::Metrics for Metrics { + fn try_register(registry: &prometheus::Registry) -> Result { + let metrics = MetricsInner { + bitfields_signed_total: prometheus::register( + prometheus::Counter::new( + "parachain_bitfields_signed_total", + "Number of bitfields signed.", + )?, + registry, + )?, + }; + Ok(Metrics(Some(metrics))) + } +} + impl JobTrait for BitfieldSigningJob { type ToJob = ToJob; type FromJob = FromJob; type Error = Error; type RunArgs = KeyStorePtr; + type Metrics = Metrics; const NAME: &'static str = "BitfieldSigningJob"; @@ -264,6 +298,7 @@ impl JobTrait for BitfieldSigningJob { fn run( relay_parent: Hash, keystore: Self::RunArgs, + metrics: Self::Metrics, _receiver: mpsc::Receiver, mut sender: mpsc::Sender, ) -> Pin> + Send>> { @@ -295,6 +330,7 @@ impl JobTrait for BitfieldSigningJob { }; let signed_bitfield = validator.sign(bitfield); + metrics.on_bitfield_signed(); // make an anonymous scope to contain some use statements to simplify creating the outbound message { diff --git a/polkadot/node/core/candidate-validation/src/lib.rs b/polkadot/node/core/candidate-validation/src/lib.rs index 179f9be309..bb832ee195 100644 --- a/polkadot/node/core/candidate-validation/src/lib.rs +++ b/polkadot/node/core/candidate-validation/src/lib.rs @@ -23,9 +23,11 @@ use polkadot_subsystem::{ Subsystem, SubsystemContext, SpawnedSubsystem, SubsystemResult, FromOverseer, OverseerSignal, -}; -use polkadot_subsystem::messages::{ - AllMessages, CandidateValidationMessage, RuntimeApiMessage, ValidationFailed, RuntimeApiRequest, + messages::{ + AllMessages, CandidateValidationMessage, RuntimeApiMessage, + ValidationFailed, RuntimeApiRequest, + }, + metrics::{self, prometheus}, }; use polkadot_subsystem::errors::RuntimeApiError; use polkadot_node_primitives::{ValidationResult, ValidationOutputs, InvalidCandidate}; @@ -45,13 +47,63 @@ use futures::prelude::*; use std::sync::Arc; +const LOG_TARGET: &'static str = "candidate_validation"; + /// The candidate validation subsystem. -pub struct CandidateValidationSubsystem(S); +pub struct CandidateValidationSubsystem { + spawn: S, + metrics: Metrics, +} + +#[derive(Clone)] +struct MetricsInner { + validation_requests: prometheus::CounterVec, +} + +/// Candidate validation metrics. +#[derive(Default, Clone)] +pub struct Metrics(Option); + +impl Metrics { + fn on_validation_event(&self, event: &Result) { + if let Some(metrics) = &self.0 { + match event { + Ok(ValidationResult::Valid(_)) => { + metrics.validation_requests.with_label_values(&["valid"]).inc(); + }, + Ok(ValidationResult::Invalid(_)) => { + metrics.validation_requests.with_label_values(&["invalid"]).inc(); + }, + Err(_) => { + metrics.validation_requests.with_label_values(&["failed"]).inc(); + }, + } + } + } +} + +impl metrics::Metrics for Metrics { + fn try_register(registry: &prometheus::Registry) -> Result { + let metrics = MetricsInner { + validation_requests: prometheus::register( + prometheus::CounterVec::new( + prometheus::Opts::new( + "parachain_validation_requests_total", + "Number of validation requests served.", + ), + &["valid", "invalid", "failed"], + )?, + registry, + )?, + }; + Ok(Metrics(Some(metrics))) + } +} impl CandidateValidationSubsystem { /// Create a new `CandidateValidationSubsystem` with the given task spawner. - pub fn new(spawn: S) -> Self { - CandidateValidationSubsystem(spawn) + pub fn new(spawn: S, metrics: Metrics) -> Self { + CandidateValidationSubsystem { spawn, metrics } } } @@ -59,10 +111,12 @@ impl Subsystem for CandidateValidationSubsystem where C: SubsystemContext, S: SpawnNamed + Clone + 'static, { + type Metrics = Metrics; + fn start(self, ctx: C) -> SpawnedSubsystem { SpawnedSubsystem { name: "candidate-validation-subsystem", - future: run(ctx, self.0).map(|_| ()).boxed(), + future: run(ctx, self.spawn, self.metrics).map(|_| ()).boxed(), } } } @@ -70,6 +124,7 @@ impl Subsystem for CandidateValidationSubsystem where async fn run( mut ctx: impl SubsystemContext, spawn: impl SpawnNamed + Clone + 'static, + metrics: Metrics, ) -> SubsystemResult<()> { @@ -95,8 +150,11 @@ async fn run( ).await; match res { - Ok(x) => { let _ = response_sender.send(x); } - Err(e)=> return Err(e), + Ok(x) => { + metrics.on_validation_event(&x); + let _ = response_sender.send(x); + } + Err(e) => return Err(e), } } CandidateValidationMessage::ValidateFromExhaustive( @@ -117,13 +175,16 @@ async fn run( ).await; match res { - Ok(x) => if let Err(_e) = response_sender.send(x) { - log::warn!( - target: "candidate_validation", - "Requester of candidate validation dropped", - ) + Ok(x) => { + metrics.on_validation_event(&x); + if let Err(_e) = response_sender.send(x) { + log::warn!( + target: LOG_TARGET, + "Requester of candidate validation dropped", + ) + } }, - Err(e)=> return Err(e), + Err(e) => return Err(e), } } } @@ -237,7 +298,7 @@ async fn spawn_validate_from_chain_state( Ok(g) => g, Err(e) => { log::warn!( - target: "candidate_validation", + target: LOG_TARGET, "Error making runtime API request: {:?}", e, ); diff --git a/polkadot/node/core/chain-api/src/lib.rs b/polkadot/node/core/chain-api/src/lib.rs index 20213e3c08..d3be1c4bf4 100644 --- a/polkadot/node/core/chain-api/src/lib.rs +++ b/polkadot/node/core/chain-api/src/lib.rs @@ -30,6 +30,7 @@ use polkadot_subsystem::{ FromOverseer, OverseerSignal, SpawnedSubsystem, Subsystem, SubsystemResult, SubsystemContext, messages::ChainApiMessage, + metrics::{self, prometheus}, }; use polkadot_primitives::v1::{Block, BlockId}; use sp_blockchain::HeaderBackend; @@ -39,13 +40,15 @@ use futures::prelude::*; /// The Chain API Subsystem implementation. pub struct ChainApiSubsystem { client: Client, + metrics: Metrics, } impl ChainApiSubsystem { /// Create a new Chain API subsystem with the given client. - pub fn new(client: Client) -> Self { + pub fn new(client: Client, metrics: Metrics) -> Self { ChainApiSubsystem { - client + client, + metrics, } } } @@ -54,9 +57,11 @@ impl Subsystem for ChainApiSubsystem where Client: HeaderBackend + 'static, Context: SubsystemContext { + type Metrics = Metrics; + fn start(self, ctx: Context) -> SpawnedSubsystem { SpawnedSubsystem { - future: run(ctx, self.client).map(|_| ()).boxed(), + future: run(ctx, self).map(|_| ()).boxed(), name: "chain-api-subsystem", } } @@ -64,7 +69,7 @@ impl Subsystem for ChainApiSubsystem where async fn run( mut ctx: impl SubsystemContext, - client: Client, + subsystem: ChainApiSubsystem, ) -> SubsystemResult<()> where Client: HeaderBackend, @@ -76,23 +81,27 @@ where FromOverseer::Signal(OverseerSignal::BlockFinalized(_)) => {}, FromOverseer::Communication { msg } => match msg { ChainApiMessage::BlockNumber(hash, response_channel) => { - let result = client.number(hash).map_err(|e| e.to_string().into()); + let result = subsystem.client.number(hash).map_err(|e| e.to_string().into()); + subsystem.metrics.on_request(result.is_ok()); let _ = response_channel.send(result); }, ChainApiMessage::FinalizedBlockHash(number, response_channel) => { // Note: we don't verify it's finalized - let result = client.hash(number).map_err(|e| e.to_string().into()); + let result = subsystem.client.hash(number).map_err(|e| e.to_string().into()); + subsystem.metrics.on_request(result.is_ok()); let _ = response_channel.send(result); }, ChainApiMessage::FinalizedBlockNumber(response_channel) => { - let result = client.info().finalized_number; + let result = subsystem.client.info().finalized_number; + // always succeeds + subsystem.metrics.on_request(true); let _ = response_channel.send(Ok(result)); }, ChainApiMessage::Ancestors { hash, k, response_channel } => { let mut hash = hash; let next_parent = core::iter::from_fn(|| { - let maybe_header = client.header(BlockId::Hash(hash)); + let maybe_header = subsystem.client.header(BlockId::Hash(hash)); match maybe_header { // propagate the error Err(e) => Some(Err(e.to_string().into())), @@ -106,6 +115,7 @@ where }); let result = next_parent.take(k).collect::, _>>(); + subsystem.metrics.on_request(result.is_ok()); let _ = response_channel.send(result); }, } @@ -113,6 +123,46 @@ where } } +#[derive(Clone)] +struct MetricsInner { + chain_api_requests: prometheus::CounterVec, +} + +/// Chain API metrics. +#[derive(Default, Clone)] +pub struct Metrics(Option); + +impl Metrics { + fn on_request(&self, succeeded: bool) { + if let Some(metrics) = &self.0 { + if succeeded { + metrics.chain_api_requests.with_label_values(&["succeeded"]).inc(); + } else { + metrics.chain_api_requests.with_label_values(&["failed"]).inc(); + } + } + } +} + +impl metrics::Metrics for Metrics { + fn try_register(registry: &prometheus::Registry) -> Result { + let metrics = MetricsInner { + chain_api_requests: prometheus::register( + prometheus::CounterVec::new( + prometheus::Opts::new( + "parachain_chain_api_requests_total", + "Number of Chain API requests served.", + ), + &["succeeded", "failed"], + )?, + registry, + )?, + }; + Ok(Metrics(Some(metrics))) + } +} + + #[cfg(test)] mod tests { use super::*; @@ -238,7 +288,8 @@ mod tests { let (ctx, ctx_handle) = make_subsystem_context(TaskExecutor::new()); let client = TestClient::default(); - let chain_api_task = run(ctx, client.clone()).map(|x| x.unwrap()); + let subsystem = ChainApiSubsystem::new(client.clone(), Metrics(None)); + let chain_api_task = run(ctx, subsystem).map(|x| x.unwrap()); let test_task = test(client, ctx_handle); futures::executor::block_on(future::join(chain_api_task, test_task)); diff --git a/polkadot/node/core/provisioner/src/lib.rs b/polkadot/node/core/provisioner/src/lib.rs index 9a92858a9e..ef93d15ab8 100644 --- a/polkadot/node/core/provisioner/src/lib.rs +++ b/polkadot/node/core/provisioner/src/lib.rs @@ -30,6 +30,7 @@ use polkadot_node_subsystem::{ AllMessages, ChainApiMessage, ProvisionableData, ProvisionerInherentData, ProvisionerMessage, RuntimeApiMessage, }, + metrics::{self, prometheus}, }; use polkadot_node_subsystem_util::{ self as util, @@ -50,6 +51,7 @@ struct ProvisioningJob { provisionable_data_channels: Vec>, backed_candidates: Vec, signed_bitfields: Vec, + metrics: Metrics, } /// This enum defines the messages that the provisioner is prepared to receive. @@ -134,6 +136,7 @@ impl JobTrait for ProvisioningJob { type FromJob = FromJob; type Error = Error; type RunArgs = (); + type Metrics = Metrics; const NAME: &'static str = "ProvisioningJob"; @@ -143,11 +146,12 @@ impl JobTrait for ProvisioningJob { fn run( relay_parent: Hash, _run_args: Self::RunArgs, + metrics: Self::Metrics, receiver: mpsc::Receiver, sender: mpsc::Sender, ) -> Pin> + Send>> { async move { - let job = ProvisioningJob::new(relay_parent, sender, receiver); + let job = ProvisioningJob::new(relay_parent, metrics, sender, receiver); // it isn't necessary to break run_loop into its own function, // but it's convenient to separate the concerns in this way @@ -160,6 +164,7 @@ impl JobTrait for ProvisioningJob { impl ProvisioningJob { pub fn new( relay_parent: Hash, + metrics: Metrics, sender: mpsc::Sender, receiver: mpsc::Receiver, ) -> Self { @@ -170,6 +175,7 @@ impl ProvisioningJob { provisionable_data_channels: Vec::new(), backed_candidates: Vec::new(), signed_bitfields: Vec::new(), + metrics, } } @@ -190,7 +196,10 @@ impl ProvisioningJob { ) .await { - log::warn!(target: "provisioner", "failed to send inherent data: {:?}", err); + log::warn!(target: "provisioner", "failed to assemble or send inherent data: {:?}", err); + self.metrics.on_inherent_data_request(false); + } else { + self.metrics.on_inherent_data_request(true); } } ToJob::Provisioner(RequestBlockAuthorshipData(_, sender)) => { @@ -275,17 +284,9 @@ async fn send_inherent_data( return_sender: oneshot::Sender, mut from_job: mpsc::Sender, ) -> Result<(), Error> { - let availability_cores = match request_availability_cores(relay_parent, &mut from_job) + let availability_cores = request_availability_cores(relay_parent, &mut from_job) .await? - .await? - { - Ok(cores) => cores, - Err(runtime_err) => { - // Don't take down the node on runtime API errors. - log::warn!(target: "provisioner", "Encountered a runtime API error: {:?}", runtime_err); - return Ok(()); - } - }; + .await??; let bitfields = select_availability_bitfields(&availability_cores, bitfields); let candidates = select_candidates( @@ -467,7 +468,47 @@ fn bitfields_indicate_availability( 3 * availability.count_ones() >= 2 * availability.len() } -delegated_subsystem!(ProvisioningJob(()) <- ToJob as ProvisioningSubsystem); +#[derive(Clone)] +struct MetricsInner { + inherent_data_requests: prometheus::CounterVec, +} + +/// Candidate backing metrics. +#[derive(Default, Clone)] +pub struct Metrics(Option); + +impl Metrics { + fn on_inherent_data_request(&self, succeeded: bool) { + if let Some(metrics) = &self.0 { + if succeeded { + metrics.inherent_data_requests.with_label_values(&["succeded"]).inc(); + } else { + metrics.inherent_data_requests.with_label_values(&["failed"]).inc(); + } + } + } +} + +impl metrics::Metrics for Metrics { + fn try_register(registry: &prometheus::Registry) -> Result { + let metrics = MetricsInner { + inherent_data_requests: prometheus::register( + prometheus::CounterVec::new( + prometheus::Opts::new( + "parachain_inherent_data_requests_total", + "Number of InherentData requests served by provisioner.", + ), + &["succeeded", "failed"], + )?, + registry, + )?, + }; + Ok(Metrics(Some(metrics))) + } +} + + +delegated_subsystem!(ProvisioningJob((), Metrics) <- ToJob as ProvisioningSubsystem); #[cfg(test)] mod tests { diff --git a/polkadot/node/core/runtime-api/src/lib.rs b/polkadot/node/core/runtime-api/src/lib.rs index 9b93bb914b..dd5f247eb0 100644 --- a/polkadot/node/core/runtime-api/src/lib.rs +++ b/polkadot/node/core/runtime-api/src/lib.rs @@ -22,6 +22,7 @@ use polkadot_subsystem::{ Subsystem, SpawnedSubsystem, SubsystemResult, SubsystemContext, FromOverseer, OverseerSignal, + metrics::{self, prometheus}, }; use polkadot_subsystem::messages::{ RuntimeApiMessage, RuntimeApiRequest as Request, @@ -34,12 +35,15 @@ use sp_api::{ProvideRuntimeApi}; use futures::prelude::*; /// The `RuntimeApiSubsystem`. See module docs for more details. -pub struct RuntimeApiSubsystem(Client); +pub struct RuntimeApiSubsystem { + client: Client, + metrics: Metrics, +} impl RuntimeApiSubsystem { - /// Create a new Runtime API subsystem wrapping the given client. - pub fn new(client: Client) -> Self { - RuntimeApiSubsystem(client) + /// Create a new Runtime API subsystem wrapping the given client and metrics. + pub fn new(client: Client, metrics: Metrics) -> Self { + RuntimeApiSubsystem { client, metrics } } } @@ -48,9 +52,11 @@ impl Subsystem for RuntimeApiSubsystem where Client::Api: ParachainHost, Context: SubsystemContext { + type Metrics = Metrics; + fn start(self, ctx: Context) -> SpawnedSubsystem { SpawnedSubsystem { - future: run(ctx, self.0).map(|_| ()).boxed(), + future: run(ctx, self).map(|_| ()).boxed(), name: "runtime-api-subsystem", } } @@ -58,7 +64,7 @@ impl Subsystem for RuntimeApiSubsystem where async fn run( mut ctx: impl SubsystemContext, - client: Client, + subsystem: RuntimeApiSubsystem, ) -> SubsystemResult<()> where Client: ProvideRuntimeApi, Client::Api: ParachainHost, @@ -70,7 +76,8 @@ async fn run( FromOverseer::Signal(OverseerSignal::BlockFinalized(_)) => {}, FromOverseer::Communication { msg } => match msg { RuntimeApiMessage::Request(relay_parent, request) => make_runtime_api_request( - &client, + &subsystem.client, + &subsystem.metrics, relay_parent, request, ), @@ -81,6 +88,7 @@ async fn run( fn make_runtime_api_request( client: &Client, + metrics: &Metrics, relay_parent: Hash, request: Request, ) where @@ -93,7 +101,7 @@ fn make_runtime_api_request( let api = client.runtime_api(); let res = api.$api_name(&BlockId::Hash(relay_parent), $($param),*) .map_err(|e| RuntimeApiError::from(format!("{:?}", e))); - + metrics.on_request(res.is_ok()); let _ = sender.send(res); }} } @@ -114,6 +122,45 @@ fn make_runtime_api_request( } } +#[derive(Clone)] +struct MetricsInner { + chain_api_requests: prometheus::CounterVec, +} + +/// Runtime API metrics. +#[derive(Default, Clone)] +pub struct Metrics(Option); + +impl Metrics { + fn on_request(&self, succeeded: bool) { + if let Some(metrics) = &self.0 { + if succeeded { + metrics.chain_api_requests.with_label_values(&["succeeded"]).inc(); + } else { + metrics.chain_api_requests.with_label_values(&["failed"]).inc(); + } + } + } +} + +impl metrics::Metrics for Metrics { + fn try_register(registry: &prometheus::Registry) -> Result { + let metrics = MetricsInner { + chain_api_requests: prometheus::register( + prometheus::CounterVec::new( + prometheus::Opts::new( + "parachain_runtime_api_requests_total", + "Number of Runtime API requests served.", + ), + &["succeeded", "failed"], + )?, + registry, + )?, + }; + Ok(Metrics(Some(metrics))) + } +} + #[cfg(test)] mod tests { use super::*; @@ -216,7 +263,8 @@ mod tests { let runtime_api = MockRuntimeApi::default(); let relay_parent = [1; 32].into(); - let subsystem_task = run(ctx, runtime_api.clone()).map(|x| x.unwrap()); + let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None)); + let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap()); let test_task = async move { let (tx, rx) = oneshot::channel(); @@ -238,7 +286,8 @@ mod tests { let runtime_api = MockRuntimeApi::default(); let relay_parent = [1; 32].into(); - let subsystem_task = run(ctx, runtime_api.clone()).map(|x| x.unwrap()); + let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None)); + let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap()); let test_task = async move { let (tx, rx) = oneshot::channel(); @@ -260,7 +309,8 @@ mod tests { let runtime_api = MockRuntimeApi::default(); let relay_parent = [1; 32].into(); - let subsystem_task = run(ctx, runtime_api.clone()).map(|x| x.unwrap()); + let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None)); + let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap()); let test_task = async move { let (tx, rx) = oneshot::channel(); @@ -282,7 +332,8 @@ mod tests { let runtime_api = MockRuntimeApi::default(); let relay_parent = [1; 32].into(); - let subsystem_task = run(ctx, runtime_api.clone()).map(|x| x.unwrap()); + let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None)); + let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap()); let test_task = async move { let (tx, rx) = oneshot::channel(); @@ -308,7 +359,8 @@ mod tests { runtime_api.local_validation_data.insert(para_a, Default::default()); - let subsystem_task = run(ctx, runtime_api.clone()).map(|x| x.unwrap()); + let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None)); + let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap()); let test_task = async move { let (tx, rx) = oneshot::channel(); @@ -343,7 +395,8 @@ mod tests { let runtime_api = MockRuntimeApi::default(); let relay_parent = [1; 32].into(); - let subsystem_task = run(ctx, runtime_api.clone()).map(|x| x.unwrap()); + let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None)); + let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap()); let test_task = async move { let (tx, rx) = oneshot::channel(); @@ -369,7 +422,8 @@ mod tests { runtime_api.validation_code.insert(para_a, Default::default()); - let subsystem_task = run(ctx, runtime_api.clone()).map(|x| x.unwrap()); + let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None)); + let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap()); let test_task = async move { let (tx, rx) = oneshot::channel(); @@ -408,7 +462,8 @@ mod tests { runtime_api.candidate_pending_availability.insert(para_a, Default::default()); - let subsystem_task = run(ctx, runtime_api.clone()).map(|x| x.unwrap()); + let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None)); + let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap()); let test_task = async move { let (tx, rx) = oneshot::channel(); @@ -444,7 +499,8 @@ mod tests { let runtime_api = MockRuntimeApi::default(); let relay_parent = [1; 32].into(); - let subsystem_task = run(ctx, runtime_api.clone()).map(|x| x.unwrap()); + let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None)); + let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap()); let test_task = async move { let (tx, rx) = oneshot::channel(); diff --git a/polkadot/node/network/availability-distribution/src/lib.rs b/polkadot/node/network/availability-distribution/src/lib.rs index 2a5a086065..32b61f5817 100644 --- a/polkadot/node/network/availability-distribution/src/lib.rs +++ b/polkadot/node/network/availability-distribution/src/lib.rs @@ -756,6 +756,8 @@ impl Subsystem for AvailabilityDistributionSubsystem where Context: SubsystemContext + Sync + Send, { + type Metrics = (); + fn start(self, ctx: Context) -> SpawnedSubsystem { SpawnedSubsystem { name: "availability-distribution-subsystem", diff --git a/polkadot/node/network/bitfield-distribution/src/lib.rs b/polkadot/node/network/bitfield-distribution/src/lib.rs index 8da071bddd..67475aea15 100644 --- a/polkadot/node/network/bitfield-distribution/src/lib.rs +++ b/polkadot/node/network/bitfield-distribution/src/lib.rs @@ -558,6 +558,8 @@ impl Subsystem for BitfieldDistribution where C: SubsystemContext + Sync + Send, { + type Metrics = (); + fn start(self, ctx: C) -> SpawnedSubsystem { SpawnedSubsystem { name: "bitfield-distribution-subsystem", diff --git a/polkadot/node/network/bridge/src/lib.rs b/polkadot/node/network/bridge/src/lib.rs index 9f347d39f1..c83e8b7c8b 100644 --- a/polkadot/node/network/bridge/src/lib.rs +++ b/polkadot/node/network/bridge/src/lib.rs @@ -205,6 +205,8 @@ impl Subsystem for NetworkBridge Net: Network, Context: SubsystemContext, { + type Metrics = (); + fn start(self, ctx: Context) -> SpawnedSubsystem { // Swallow error because failure is fatal to the node and we log with more precision // within `run_network`. diff --git a/polkadot/node/network/pov-distribution/src/lib.rs b/polkadot/node/network/pov-distribution/src/lib.rs index 994090c0d5..552981244d 100644 --- a/polkadot/node/network/pov-distribution/src/lib.rs +++ b/polkadot/node/network/pov-distribution/src/lib.rs @@ -51,6 +51,8 @@ pub struct PoVDistribution; impl Subsystem for PoVDistribution where C: SubsystemContext { + type Metrics = (); + fn start(self, ctx: C) -> SpawnedSubsystem { // Swallow error because failure is fatal to the node and we log with more precision // within `run`. diff --git a/polkadot/node/network/statement-distribution/src/lib.rs b/polkadot/node/network/statement-distribution/src/lib.rs index 3f8e6842a8..cbdc8e5845 100644 --- a/polkadot/node/network/statement-distribution/src/lib.rs +++ b/polkadot/node/network/statement-distribution/src/lib.rs @@ -65,6 +65,8 @@ pub struct StatementDistribution; impl Subsystem for StatementDistribution where C: SubsystemContext { + type Metrics = (); + fn start(self, ctx: C) -> SpawnedSubsystem { // Swallow error because failure is fatal to the node and we log with more precision // within `run`. diff --git a/polkadot/node/overseer/examples/minimal-example.rs b/polkadot/node/overseer/examples/minimal-example.rs index 3ebcb34825..4087429e6d 100644 --- a/polkadot/node/overseer/examples/minimal-example.rs +++ b/polkadot/node/overseer/examples/minimal-example.rs @@ -76,6 +76,8 @@ impl Subsystem1 { impl Subsystem for Subsystem1 where C: SubsystemContext { + type Metrics = (); // no Prometheus metrics + fn start(self, ctx: C) -> SpawnedSubsystem { let future = Box::pin(async move { Self::run(ctx).await; @@ -121,6 +123,8 @@ impl Subsystem2 { impl Subsystem for Subsystem2 where C: SubsystemContext { + type Metrics = (); // no Prometheus metrics + fn start(self, ctx: C) -> SpawnedSubsystem { let future = Box::pin(async move { Self::run(ctx).await; @@ -161,6 +165,7 @@ fn main() { let (overseer, _handler) = Overseer::new( vec![], all_subsystems, + None, spawner, ).unwrap(); let overseer_fut = overseer.run().fuse(); diff --git a/polkadot/node/overseer/src/lib.rs b/polkadot/node/overseer/src/lib.rs index 452ab1f4b6..321a20f568 100644 --- a/polkadot/node/overseer/src/lib.rs +++ b/polkadot/node/overseer/src/lib.rs @@ -84,6 +84,7 @@ use polkadot_subsystem::messages::{ pub use polkadot_subsystem::{ Subsystem, SubsystemContext, OverseerSignal, FromOverseer, SubsystemError, SubsystemResult, SpawnedSubsystem, ActiveLeavesUpdate, + metrics::{self, prometheus}, }; use polkadot_node_primitives::SpawnNamed; @@ -92,6 +93,9 @@ use polkadot_node_primitives::SpawnNamed; const CHANNEL_CAPACITY: usize = 1024; // A graceful `Overseer` teardown time delay. const STOP_DELAY: u64 = 1; +// Target for logs. +const LOG_TARGET: &'static str = "overseer"; + /// A type of messages that are sent from [`Subsystem`] to [`Overseer`]. /// @@ -325,10 +329,6 @@ impl SubsystemContext for OverseerSubsystemContext { } } -/// A subsystem compatible with the overseer - one which can be run in the context of the -/// overseer. -pub type CompatibleSubsystem = Box> + Send>; - /// A subsystem that we oversee. /// /// Ties together the [`Subsystem`] itself and it's running instance @@ -336,7 +336,6 @@ pub type CompatibleSubsystem = Box> /// for whatever reason). /// /// [`Subsystem`]: trait.Subsystem.html -#[allow(dead_code)] struct OverseenSubsystem { instance: Option>, } @@ -407,6 +406,9 @@ pub struct Overseer { /// The set of the "active leaves". active_leaves: HashSet<(Hash, BlockNumber)>, + + /// Various Prometheus metrics. + metrics: Metrics, } /// This struct is passed as an argument to create a new instance of an [`Overseer`]. @@ -453,6 +455,52 @@ pub struct AllSubsystems, + deactivated_heads_total: prometheus::Counter, +} + +#[derive(Default, Clone)] +struct Metrics(Option); + +impl Metrics { + fn on_head_activated(&self) { + if let Some(metrics) = &self.0 { + metrics.activated_heads_total.inc(); + } + } + + fn on_head_deactivated(&self) { + if let Some(metrics) = &self.0 { + metrics.deactivated_heads_total.inc(); + } + } +} + +impl metrics::Metrics for Metrics { + fn try_register(registry: &prometheus::Registry) -> Result { + let metrics = MetricsInner { + activated_heads_total: prometheus::register( + prometheus::Counter::new( + "parachain_activated_heads_total", + "Number of activated heads." + )?, + registry, + )?, + deactivated_heads_total: prometheus::register( + prometheus::Counter::new( + "parachain_deactivated_heads_total", + "Number of deactivated heads." + )?, + registry, + )?, + }; + Ok(Metrics(Some(metrics))) + } +} + impl Overseer where S: SpawnNamed, @@ -500,8 +548,10 @@ where /// struct ValidationSubsystem; /// /// impl Subsystem for ValidationSubsystem - /// where C: SubsystemContext + /// where C: SubsystemContext /// { + /// type Metrics = (); + /// /// fn start( /// self, /// mut ctx: C, @@ -539,6 +589,7 @@ where /// let (overseer, _handler) = Overseer::new( /// vec![], /// all_subsystems, + /// None, /// spawner, /// ).unwrap(); /// @@ -558,6 +609,7 @@ where pub fn new( leaves: impl IntoIterator, all_subsystems: AllSubsystems, + prometheus_registry: Option<&prometheus::Registry>, mut s: S, ) -> SubsystemResult<(Self, OverseerHandler)> where @@ -692,13 +744,15 @@ where all_subsystems.collator_protocol, )?; - let active_leaves = HashSet::new(); - let leaves = leaves .into_iter() .map(|BlockInfo { hash, parent_hash: _, number }| (hash, number)) .collect(); + let active_leaves = HashSet::new(); + + let metrics = ::register(prometheus_registry); + let this = Self { candidate_validation_subsystem, candidate_backing_subsystem, @@ -721,6 +775,7 @@ where events_rx, leaves, active_leaves, + metrics, }; Ok((this, handler)) @@ -811,6 +866,7 @@ where for leaf in leaves.into_iter() { update.activated.push(leaf.0); self.active_leaves.insert(leaf); + self.metrics.on_head_activated(); } self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?; @@ -850,7 +906,7 @@ where // Some subsystem exited? It's time to panic. if let Poll::Ready(Some(finished)) = poll!(self.running_subsystems.next()) { - log::error!("Subsystem finished unexpectedly {:?}", finished); + log::error!(target: LOG_TARGET, "Subsystem finished unexpectedly {:?}", finished); self.stop().await; return Err(SubsystemError); } @@ -865,11 +921,13 @@ where if let Some(parent) = block.number.checked_sub(1).and_then(|number| self.active_leaves.take(&(block.parent_hash, number))) { update.deactivated.push(parent.0); + self.metrics.on_head_deactivated(); } if !self.active_leaves.contains(&(block.hash, block.number)) { update.activated.push(block.hash); self.active_leaves.insert((block.hash, block.number)); + self.metrics.on_head_activated(); } self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?; @@ -879,10 +937,12 @@ where async fn block_finalized(&mut self, block: BlockInfo) -> SubsystemResult<()> { let mut update = ActiveLeavesUpdate::default(); + let metrics = &self.metrics; self.active_leaves.retain(|(h, n)| { if *n <= block.number { update.deactivated.push(*h); + metrics.on_head_deactivated(); false } else { true @@ -1103,6 +1163,8 @@ mod tests { impl Subsystem for TestSubsystem1 where C: SubsystemContext { + type Metrics = (); + fn start(self, mut ctx: C) -> SpawnedSubsystem { let mut sender = self.0; SpawnedSubsystem { @@ -1131,6 +1193,8 @@ mod tests { impl Subsystem for TestSubsystem2 where C: SubsystemContext { + type Metrics = (); + fn start(self, mut ctx: C) -> SpawnedSubsystem { let sender = self.0.clone(); SpawnedSubsystem { @@ -1177,6 +1241,8 @@ mod tests { impl Subsystem for TestSubsystem4 where C: SubsystemContext { + type Metrics = (); + fn start(self, mut _ctx: C) -> SpawnedSubsystem { SpawnedSubsystem { name: "test-subsystem-4", @@ -1187,6 +1253,7 @@ mod tests { } } + // Checks that a minimal configuration of two jobs can run and exchange messages. #[test] fn overseer_works() { @@ -1216,6 +1283,7 @@ mod tests { let (overseer, mut handler) = Overseer::new( vec![], all_subsystems, + None, spawner, ).unwrap(); let overseer_fut = overseer.run().fuse(); @@ -1252,6 +1320,86 @@ mod tests { assert_eq!(s1_results, (0..10).collect::>()); }); } + + // Checks activated/deactivated metrics are updated properly. + #[test] + fn overseer_metrics_work() { + let spawner = sp_core::testing::TaskExecutor::new(); + + executor::block_on(async move { + let first_block_hash = [1; 32].into(); + let second_block_hash = [2; 32].into(); + let third_block_hash = [3; 32].into(); + + let first_block = BlockInfo { + hash: first_block_hash, + parent_hash: [0; 32].into(), + number: 1, + }; + let second_block = BlockInfo { + hash: second_block_hash, + parent_hash: first_block_hash, + number: 2, + }; + let third_block = BlockInfo { + hash: third_block_hash, + parent_hash: second_block_hash, + number: 3, + }; + + let all_subsystems = AllSubsystems { + collation_generation: DummySubsystem, + candidate_validation: DummySubsystem, + candidate_backing: DummySubsystem, + candidate_selection: DummySubsystem, + collator_protocol: DummySubsystem, + statement_distribution: DummySubsystem, + availability_distribution: DummySubsystem, + bitfield_signing: DummySubsystem, + bitfield_distribution: DummySubsystem, + provisioner: DummySubsystem, + pov_distribution: DummySubsystem, + runtime_api: DummySubsystem, + availability_store: DummySubsystem, + network_bridge: DummySubsystem, + chain_api: DummySubsystem, + }; + let registry = prometheus::Registry::new(); + let (overseer, mut handler) = Overseer::new( + vec![first_block], + all_subsystems, + Some(®istry), + spawner, + ).unwrap(); + let overseer_fut = overseer.run().fuse(); + + pin_mut!(overseer_fut); + + handler.block_imported(second_block).await.unwrap(); + handler.block_imported(third_block).await.unwrap(); + handler.stop().await.unwrap(); + + select! { + res = overseer_fut => { + assert!(res.is_ok()); + let (activated, deactivated) = extract_metrics(®istry); + assert_eq!(activated, 3); + assert_eq!(deactivated, 2); + }, + complete => (), + } + }); + } + + fn extract_metrics(registry: &prometheus::Registry) -> (u64, u64) { + let gather = registry.gather(); + assert_eq!(gather[0].get_name(), "parachain_activated_heads_total"); + assert_eq!(gather[1].get_name(), "parachain_deactivated_heads_total"); + let activated = gather[0].get_metric()[0].get_counter().get_value() as u64; + let deactivated = gather[1].get_metric()[0].get_counter().get_value() as u64; + (activated, deactivated) + } + // Spawn a subsystem that immediately exits. // // Should immediately conclude the overseer itself with an error. @@ -1281,6 +1429,7 @@ mod tests { let (overseer, _handle) = Overseer::new( vec![], all_subsystems, + None, spawner, ).unwrap(); let overseer_fut = overseer.run().fuse(); @@ -1298,6 +1447,8 @@ mod tests { impl Subsystem for TestSubsystem5 where C: SubsystemContext { + type Metrics = (); + fn start(self, mut ctx: C) -> SpawnedSubsystem { let mut sender = self.0.clone(); @@ -1327,6 +1478,8 @@ mod tests { impl Subsystem for TestSubsystem6 where C: SubsystemContext { + type Metrics = (); + fn start(self, mut ctx: C) -> SpawnedSubsystem { let mut sender = self.0.clone(); @@ -1400,6 +1553,7 @@ mod tests { let (overseer, mut handler) = Overseer::new( vec![first_block], all_subsystems, + None, spawner, ).unwrap(); @@ -1505,6 +1659,7 @@ mod tests { let (overseer, mut handler) = Overseer::new( vec![first_block, second_block], all_subsystems, + None, spawner, ).unwrap(); @@ -1592,6 +1747,8 @@ mod tests { C: SubsystemContext, M: Send, { + type Metrics = (); + fn start(self, mut ctx: C) -> SpawnedSubsystem { SpawnedSubsystem { name: "counter-subsystem", @@ -1738,6 +1895,7 @@ mod tests { let (overseer, mut handler) = Overseer::new( vec![], all_subsystems, + None, spawner, ).unwrap(); let overseer_fut = overseer.run().fuse(); diff --git a/polkadot/node/service/src/lib.rs b/polkadot/node/service/src/lib.rs index 0489cde65b..9c854525c0 100644 --- a/polkadot/node/service/src/lib.rs +++ b/polkadot/node/service/src/lib.rs @@ -277,6 +277,7 @@ fn new_partial(config: &mut Configuration) -> Result< fn real_overseer( leaves: impl IntoIterator, + prometheus_registry: Option<&Registry>, s: S, ) -> Result<(Overseer, OverseerHandler), ServiceError> { let all_subsystems = AllSubsystems { @@ -296,9 +297,11 @@ fn real_overseer( collation_generation: DummySubsystem, collator_protocol: DummySubsystem, }; + Overseer::new( leaves, all_subsystems, + prometheus_registry, s, ).map_err(|e| ServiceError::Other(format!("Failed to create an Overseer: {:?}", e))) } @@ -399,7 +402,7 @@ fn new_full( }) .collect(); - let (overseer, handler) = real_overseer(leaves, spawner)?; + let (overseer, handler) = real_overseer(leaves, prometheus_registry.as_ref(), spawner)?; let handler_clone = handler.clone(); task_manager.spawn_essential_handle().spawn_blocking("overseer", Box::pin(async move { @@ -502,7 +505,7 @@ fn new_full( inherent_data_providers: inherent_data_providers.clone(), telemetry_on_connect: Some(telemetry_connection_sinks.on_connect_stream()), voting_rule, - prometheus_registry: prometheus_registry, + prometheus_registry, shared_voter_state, }; @@ -533,7 +536,7 @@ fn new_light(mut config: Configuration) -> Result>, Dispatch: NativeExecutionDispatch + 'static, { - crate::set_prometheus_registry(&mut config)?; + set_prometheus_registry(&mut config)?; use sc_client_api::backend::RemoteBackend; let (client, backend, keystore, mut task_manager, on_demand) = diff --git a/polkadot/node/subsystem-util/src/lib.rs b/polkadot/node/subsystem-util/src/lib.rs index d6998b267f..57f1efed3b 100644 --- a/polkadot/node/subsystem-util/src/lib.rs +++ b/polkadot/node/subsystem-util/src/lib.rs @@ -24,6 +24,7 @@ use polkadot_node_subsystem::{ errors::{ChainApiError, RuntimeApiError}, messages::{AllMessages, RuntimeApiMessage, RuntimeApiRequest, RuntimeApiSender}, FromOverseer, SpawnedSubsystem, Subsystem, SubsystemContext, SubsystemError, SubsystemResult, + metrics, }; use futures::{ channel::{mpsc, oneshot}, @@ -63,11 +64,13 @@ pub mod reexports { }; } + /// Duration a job will wait after sending a stop signal before hard-aborting. pub const JOB_GRACEFUL_STOP_DURATION: Duration = Duration::from_secs(1); /// Capacity of channels to and from individual jobs pub const JOB_CHANNEL_CAPACITY: usize = 64; + /// Utility errors #[derive(Debug, derive_more::From)] pub enum Error { @@ -446,6 +449,12 @@ pub trait JobTrait: Unpin { /// /// If no extra information is needed, it is perfectly acceptable to set it to `()`. type RunArgs: 'static + Send; + /// Subsystem-specific Prometheus metrics. + /// + /// Jobs spawned by one subsystem should share the same + /// instance of metrics (use `.clone()`). + /// The `delegate_subsystem!` macro should take care of this. + type Metrics: 'static + metrics::Metrics + Send; /// Name of the job, i.e. `CandidateBackingJob` const NAME: &'static str; @@ -454,6 +463,7 @@ pub trait JobTrait: Unpin { fn run( parent: Hash, run_args: Self::RunArgs, + metrics: Self::Metrics, receiver: mpsc::Receiver, sender: mpsc::Sender, ) -> Pin> + Send>>; @@ -532,7 +542,7 @@ impl Jobs { } /// Spawn a new job for this `parent_hash`, with whatever args are appropriate. - fn spawn_job(&mut self, parent_hash: Hash, run_args: Job::RunArgs) -> Result<(), Error> { + fn spawn_job(&mut self, parent_hash: Hash, run_args: Job::RunArgs, metrics: Job::Metrics) -> Result<(), Error> { let (to_job_tx, to_job_rx) = mpsc::channel(JOB_CHANNEL_CAPACITY); let (from_job_tx, from_job_rx) = mpsc::channel(JOB_CHANNEL_CAPACITY); let (finished_tx, finished) = oneshot::channel(); @@ -541,7 +551,7 @@ impl Jobs { let err_tx = self.errors.clone(); let (future, abort_handle) = future::abortable(async move { - if let Err(e) = Job::run(parent_hash, run_args, to_job_rx, from_job_tx).await { + if let Err(e) = Job::run(parent_hash, run_args, metrics, to_job_rx, from_job_tx).await { log::error!( "{}({}) finished with an error {:?}", Job::NAME, @@ -648,6 +658,7 @@ where pub struct JobManager { spawner: Spawner, run_args: Job::RunArgs, + metrics: Job::Metrics, context: std::marker::PhantomData, job: std::marker::PhantomData, errors: Option, JobsError)>>, @@ -662,10 +673,11 @@ where Job::ToJob: TryFrom + TryFrom<::Message> + Sync, { /// Creates a new `Subsystem`. - pub fn new(spawner: Spawner, run_args: Job::RunArgs) -> Self { + pub fn new(spawner: Spawner, run_args: Job::RunArgs, metrics: Job::Metrics) -> Self { Self { spawner, run_args, + metrics, context: std::marker::PhantomData, job: std::marker::PhantomData, errors: None, @@ -703,6 +715,7 @@ where pub async fn run( mut ctx: Context, run_args: Job::RunArgs, + metrics: Job::Metrics, spawner: Spawner, mut err_tx: Option, JobsError)>>, ) { @@ -714,7 +727,7 @@ where loop { select! { - incoming = ctx.recv().fuse() => if Self::handle_incoming(incoming, &mut jobs, &run_args, &mut err_tx).await { break }, + incoming = ctx.recv().fuse() => if Self::handle_incoming(incoming, &mut jobs, &run_args, &metrics, &mut err_tx).await { break }, outgoing = jobs.next().fuse() => if Self::handle_outgoing(outgoing, &mut ctx, &mut err_tx).await { break }, complete => break, } @@ -741,6 +754,7 @@ where incoming: SubsystemResult>, jobs: &mut Jobs, run_args: &Job::RunArgs, + metrics: &Job::Metrics, err_tx: &mut Option, JobsError)>>, ) -> bool { use polkadot_node_subsystem::ActiveLeavesUpdate; @@ -753,7 +767,8 @@ where deactivated, }))) => { for hash in activated { - if let Err(e) = jobs.spawn_job(hash, run_args.clone()) { + let metrics = metrics.clone(); + if let Err(e) = jobs.spawn_job(hash, run_args.clone(), metrics) { log::error!("Failed to spawn a job: {:?}", e); Self::fwd_err(Some(hash), e.into(), err_tx).await; return true; @@ -849,14 +864,18 @@ where Job: 'static + JobTrait + Send, Job::RunArgs: Clone + Sync, Job::ToJob: TryFrom + Sync, + Job::Metrics: Sync, { + type Metrics = Job::Metrics; + fn start(self, ctx: Context) -> SpawnedSubsystem { let spawner = self.spawner.clone(); let run_args = self.run_args.clone(); + let metrics = self.metrics.clone(); let errors = self.errors; let future = Box::pin(async move { - Self::run(ctx, run_args, spawner, errors).await; + Self::run(ctx, run_args, metrics, spawner, errors).await; }); SpawnedSubsystem { @@ -901,11 +920,11 @@ where /// ``` #[macro_export] macro_rules! delegated_subsystem { - ($job:ident($run_args:ty) <- $to_job:ty as $subsystem:ident) => { - delegated_subsystem!($job($run_args) <- $to_job as $subsystem; stringify!($subsystem)); + ($job:ident($run_args:ty, $metrics:ty) <- $to_job:ty as $subsystem:ident) => { + delegated_subsystem!($job($run_args, $metrics) <- $to_job as $subsystem; stringify!($subsystem)); }; - ($job:ident($run_args:ty) <- $to_job:ty as $subsystem:ident; $subsystem_name:expr) => { + ($job:ident($run_args:ty, $metrics:ty) <- $to_job:ty as $subsystem:ident; $subsystem_name:expr) => { #[doc = "Manager type for the "] #[doc = $subsystem_name] type Manager = $crate::JobManager; @@ -924,15 +943,15 @@ macro_rules! delegated_subsystem { { #[doc = "Creates a new "] #[doc = $subsystem_name] - pub fn new(spawner: Spawner, run_args: $run_args) -> Self { + pub fn new(spawner: Spawner, run_args: $run_args, metrics: $metrics) -> Self { $subsystem { - manager: $crate::JobManager::new(spawner, run_args) + manager: $crate::JobManager::new(spawner, run_args, metrics) } } /// Run this subsystem - pub async fn run(ctx: Context, run_args: $run_args, spawner: Spawner) { - >::run(ctx, run_args, spawner, None).await + pub async fn run(ctx: Context, run_args: $run_args, metrics: $metrics, spawner: Spawner) { + >::run(ctx, run_args, metrics, spawner, None).await } } @@ -942,6 +961,8 @@ macro_rules! delegated_subsystem { Context: $crate::reexports::SubsystemContext, ::Message: Into<$to_job>, { + type Metrics = $metrics; + fn start(self, ctx: Context) -> $crate::reexports::SpawnedSubsystem { self.manager.start(ctx) } @@ -1061,6 +1082,7 @@ mod tests { // RunArgs get cloned so that each job gets its own owned copy. If you need that, wrap it in // an Arc. Within a testing context, that efficiency is less important. type RunArgs = HashMap>; + type Metrics = (); const NAME: &'static str = "FakeCandidateSelectionJob"; @@ -1070,6 +1092,7 @@ mod tests { fn run( parent: Hash, mut run_args: Self::RunArgs, + _metrics: Self::Metrics, receiver: mpsc::Receiver, mut sender: mpsc::Sender, ) -> Pin> + Send>> { @@ -1121,7 +1144,7 @@ mod tests { let (context, overseer_handle) = make_subsystem_context(pool.clone()); let (err_tx, err_rx) = mpsc::channel(16); - let subsystem = FakeCandidateSelectionSubsystem::run(context, run_args, pool, Some(err_tx)); + let subsystem = FakeCandidateSelectionSubsystem::run(context, run_args, (), pool, Some(err_tx)); let test_future = test(overseer_handle, err_rx); let timeout = Delay::new(Duration::from_secs(2)); @@ -1196,7 +1219,7 @@ mod tests { let (context, _) = make_subsystem_context::(pool.clone()); let SpawnedSubsystem { name, .. } = - FakeCandidateSelectionSubsystem::new(pool, HashMap::new()).start(context); + FakeCandidateSelectionSubsystem::new(pool, HashMap::new(), ()).start(context); assert_eq!(name, "FakeCandidateSelection"); } } diff --git a/polkadot/node/subsystem/Cargo.toml b/polkadot/node/subsystem/Cargo.toml index 6ad013c177..f7283d40aa 100644 --- a/polkadot/node/subsystem/Cargo.toml +++ b/polkadot/node/subsystem/Cargo.toml @@ -21,6 +21,7 @@ polkadot-statement-table = { path = "../../statement-table" } sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" } smallvec = "1.4.1" sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" } +substrate-prometheus-endpoint = { git = "https://github.com/paritytech/substrate", branch = "master" } [dev-dependencies] assert_matches = "1.3.0" diff --git a/polkadot/node/subsystem/src/lib.rs b/polkadot/node/subsystem/src/lib.rs index 97d09f88cd..77ad27a2c1 100644 --- a/polkadot/node/subsystem/src/lib.rs +++ b/polkadot/node/subsystem/src/lib.rs @@ -19,6 +19,8 @@ //! Node-side logic for Polkadot is mostly comprised of Subsystems, which are discrete components //! that communicate via message-passing. They are coordinated by an overseer, provided by a //! separate crate. +//! +//! This crate also reexports Prometheus metric types which are expected to be implemented by subsystems. #![warn(missing_docs)] @@ -128,9 +130,9 @@ impl From for SubsystemError { } impl From for SubsystemError { - fn from(_: futures::task::SpawnError) -> Self { + fn from(_: futures::task::SpawnError) -> Self { Self - } + } } impl From for SubsystemError { @@ -202,6 +204,9 @@ pub trait SubsystemContext: Send + 'static { /// [`Overseer`]: struct.Overseer.html /// [`Subsystem`]: trait.Subsystem.html pub trait Subsystem { + /// Subsystem-specific Prometheus metrics. + type Metrics: metrics::Metrics; + /// Start this `Subsystem` and return `SpawnedSubsystem`. fn start(self, ctx: C) -> SpawnedSubsystem; } @@ -211,6 +216,8 @@ pub trait Subsystem { pub struct DummySubsystem; impl Subsystem for DummySubsystem { + type Metrics = (); + fn start(self, mut ctx: C) -> SpawnedSubsystem { let future = Box::pin(async move { loop { @@ -227,4 +234,41 @@ impl Subsystem for DummySubsystem { future, } } -} \ No newline at end of file +} + +/// This module reexports Prometheus types and defines the [`Metrics`] trait. +pub mod metrics { + /// Reexport Prometheus types. + pub use substrate_prometheus_endpoint as prometheus; + + /// Subsystem- or job-specific Prometheus metrics. + /// + /// Usually implemented as a wrapper for `Option` + /// to ensure `Default` bounds or as a dummy type (). + /// Prometheus metrics internally hold an `Arc` reference, so cloning them is fine. + pub trait Metrics: Default + Clone { + /// Try to register metrics in the Prometheus registry. + fn try_register(registry: &prometheus::Registry) -> Result; + + /// Convience method to register metrics in the optional Prometheus registry. + /// If the registration fails, prints a warning and returns `Default::default()`. + fn register(registry: Option<&prometheus::Registry>) -> Self { + registry.map(|r| { + match Self::try_register(r) { + Err(e) => { + log::warn!("Failed to register metrics: {:?}", e); + Default::default() + }, + Ok(metrics) => metrics, + } + }).unwrap_or_default() + } + } + + // dummy impl + impl Metrics for () { + fn try_register(_registry: &prometheus::Registry) -> Result<(), prometheus::PrometheusError> { + Ok(()) + } + } +}