diff --git a/substrate/client/finality-grandpa/src/environment.rs b/substrate/client/finality-grandpa/src/environment.rs index eb80ad30ac..dec9492482 100644 --- a/substrate/client/finality-grandpa/src/environment.rs +++ b/substrate/client/finality-grandpa/src/environment.rs @@ -636,6 +636,7 @@ where self.client.clone(), incoming, "round", + None, ).map_err(Into::into)); // schedule network message cleanup when sink drops. diff --git a/substrate/client/finality-grandpa/src/lib.rs b/substrate/client/finality-grandpa/src/lib.rs index 967584fa9b..d8e5846a56 100644 --- a/substrate/client/finality-grandpa/src/lib.rs +++ b/substrate/client/finality-grandpa/src/lib.rs @@ -63,6 +63,7 @@ use sc_client_api::{ }; use sp_blockchain::{HeaderBackend, Error as ClientError, HeaderMetadata}; use parity_scale_codec::{Decode, Encode}; +use prometheus_endpoint::{PrometheusError, Registry}; use sp_runtime::generic::BlockId; use sp_runtime::traits::{NumberFor, Block as BlockT, DigestFor, Zero}; use sc_keystore::KeyStorePtr; @@ -104,7 +105,7 @@ pub use voting_rule::{ }; use aux_schema::PersistentData; -use environment::{Environment, VoterSetState, Metrics}; +use environment::{Environment, VoterSetState}; use import::GrandpaBlockImport; use until_imported::UntilGlobalMessageBlocksImported; use communication::{NetworkBridge, Network as NetworkT}; @@ -519,6 +520,7 @@ fn global_communication( client: Arc, network: &NetworkBridge, keystore: &Option, + metrics: Option, ) -> ( impl Stream< Item = Result, CommandOrError>>, @@ -549,6 +551,7 @@ fn global_communication( client.clone(), global_in, "global", + metrics, ); let global_in = global_in.map_err(CommandOrError::from); @@ -696,6 +699,20 @@ pub fn run_grandpa_voter( Ok(future::select(voter_work, telemetry_task).map(drop)) } +struct Metrics { + environment: environment::Metrics, + until_imported: until_imported::Metrics, +} + +impl Metrics { + fn register(registry: &Registry) -> Result { + Ok(Metrics { + environment: environment::Metrics::register(registry)?, + until_imported: until_imported::Metrics::register(registry)?, + }) + } +} + /// Future that powers the voter. #[must_use] struct VoterWork, SC, VR> { @@ -703,6 +720,9 @@ struct VoterWork, SC, VR> { env: Arc>, voter_commands_rx: mpsc::UnboundedReceiver>>, network: NetworkBridge, + + /// Prometheus metrics. + metrics: Option, } impl VoterWork @@ -725,6 +745,14 @@ where voter_commands_rx: mpsc::UnboundedReceiver>>, prometheus_registry: Option, ) -> Self { + let metrics = match prometheus_registry.as_ref().map(Metrics::register) { + Some(Ok(metrics)) => Some(metrics), + Some(Err(e)) => { + debug!(target: "afg", "Failed to register metrics: {:?}", e); + None + } + None => None, + }; let voters = persistent_data.authority_set.current_authorities(); let env = Arc::new(Environment { @@ -738,10 +766,7 @@ where authority_set: persistent_data.authority_set.clone(), consensus_changes: persistent_data.consensus_changes.clone(), voter_set_state: persistent_data.set_state.clone(), - metrics: prometheus_registry.map(|registry| { - Metrics::register(®istry) - .expect("Other metrics would have failed to register before these; qed") - }), + metrics: metrics.as_ref().map(|m| m.environment.clone()), _phantom: PhantomData, }); @@ -752,6 +777,7 @@ where env, voter_commands_rx, network, + metrics, }; work.rebuild_voter(); work @@ -800,6 +826,7 @@ where self.env.client.clone(), &self.env.network, &self.env.config.keystore, + self.metrics.as_ref().map(|m| m.until_imported.clone()), ); let last_completed_round = completed_rounds.last(); diff --git a/substrate/client/finality-grandpa/src/observer.rs b/substrate/client/finality-grandpa/src/observer.rs index 07eaf1db9f..2382c6e249 100644 --- a/substrate/client/finality-grandpa/src/observer.rs +++ b/substrate/client/finality-grandpa/src/observer.rs @@ -255,6 +255,7 @@ where self.client.clone(), &self.network, &self.keystore, + None, ); let last_finalized_number = self.client.info().finalized_number; diff --git a/substrate/client/finality-grandpa/src/until_imported.rs b/substrate/client/finality-grandpa/src/until_imported.rs index 223078ec92..95bcceaded 100644 --- a/substrate/client/finality-grandpa/src/until_imported.rs +++ b/substrate/client/finality-grandpa/src/until_imported.rs @@ -29,13 +29,17 @@ use super::{ }; use log::{debug, warn}; -use sc_client_api::{BlockImportNotification, ImportNotifications}; use futures::prelude::*; use futures::stream::Fuse; use futures_timer::Delay; use futures::channel::mpsc::UnboundedReceiver; use finality_grandpa::voter; use parking_lot::Mutex; +use prometheus_endpoint::{ + Gauge, U64, PrometheusError, register, Registry, +}; +use sc_client_api::{BlockImportNotification, ImportNotifications}; +use sp_finality_grandpa::AuthorityId; use sp_runtime::traits::{Block as BlockT, Header as HeaderT, NumberFor}; use std::collections::{HashMap, VecDeque}; @@ -43,7 +47,6 @@ use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; use std::time::{Duration, Instant}; -use sp_finality_grandpa::AuthorityId; const LOG_PENDING_INTERVAL: Duration = Duration::from_secs(15); @@ -77,6 +80,63 @@ pub(crate) enum DiscardWaitOrReady { Ready(R), } +/// Prometheus metrics for the `UntilImported` queue. +// +// At a given point in time there can be more than one `UntilImported` queue. One can not register a +// metric twice, thus queues need to share the same Prometheus metrics instead of instantiating +// their own ones. +// +// When a queue is dropped it might still contain messages. In order for those to not distort the +// Prometheus metrics, the `Metric` struct cleans up after itself within its `Drop` implementation +// by subtracting the local_waiting_messages (the amount of messages left in the queue about to +// be dropped) from the global_waiting_messages gauge. +pub(crate) struct Metrics { + global_waiting_messages: Gauge, + local_waiting_messages: u64, +} + +impl Metrics { + pub(crate) fn register(registry: &Registry) -> Result { + Ok(Self { + global_waiting_messages: register(Gauge::new( + "finality_grandpa_until_imported_waiting_messages_number", + "Number of finality grandpa messages waiting within the until imported queue.", + )?, registry)?, + local_waiting_messages: 0, + }) + } + + fn waiting_messages_inc(&mut self) { + self.local_waiting_messages += 1; + self.global_waiting_messages.inc(); + } + + fn waiting_messages_dec(&mut self) { + self.local_waiting_messages -= 1; + self.global_waiting_messages.dec(); + } +} + + +impl Clone for Metrics { + fn clone(&self) -> Self { + Metrics { + global_waiting_messages: self.global_waiting_messages.clone(), + // When cloned, reset local_waiting_messages, so the global counter is not reduced a + // second time for the same messages on `drop` of the clone. + local_waiting_messages: 0, + } + } +} + +impl Drop for Metrics { + fn drop(&mut self) { + // Reduce the global counter by the amount of messages that were still left in the dropped + // queue. + self.global_waiting_messages.sub(self.local_waiting_messages) + } +} + /// Buffering imported messages until blocks with given hashes are imported. #[pin_project::pin_project] pub(crate) struct UntilImported> { @@ -86,12 +146,17 @@ pub(crate) struct UntilImported, ready: VecDeque, + /// Interval at which to check status of each awaited block. check_pending: Pin> + Send>>, /// Mapping block hashes to their block number, the point in time it was /// first encountered (Instant) and a list of GRANDPA messages referencing /// the block hash. pending: HashMap, Instant, Vec)>, + + /// Queue identifier for differentiation in logs. identifier: &'static str, + /// Prometheus metrics. + metrics: Option, } impl UntilImported where @@ -108,6 +173,7 @@ impl UntilImported, ) -> Self { // how often to check if pending messages that are waiting for blocks to be // imported can be checked. @@ -131,6 +197,7 @@ impl UntilImported Stream for UntilImported this.ready.push_back(item), } + + if let Some(metrics) = &mut this.metrics { + metrics.waiting_messages_inc(); + } } Poll::Pending => break, } @@ -238,6 +309,9 @@ impl Stream for UntilImported