client/finality-grandpa: Instrument until-imported queue (#5438)

* client/finality-grandpa: Instrument until-imported queue

The `UntilImported` queue takes as input finality grandpa messages that
depend on blocks that are not yet imported and holds them back until
those blocks are imported.

This commit adds a basic metric, the amount of messages waiting in the
queue, to the module. For now this metric is only available for the
global `UntilImported` queue awaiting blocks for commit and catch-up
messages.

* client/finality-grandpa/src/until_imported: Update metric help text

Co-Authored-By: Ashley <ashley.ruglys@gmail.com>

Co-authored-by: Ashley <ashley.ruglys@gmail.com>
This commit is contained in:
Max Inden
2020-03-31 20:17:14 +02:00
committed by GitHub
parent f04486e5bc
commit 091335780e
4 changed files with 134 additions and 7 deletions
@@ -636,6 +636,7 @@ where
self.client.clone(),
incoming,
"round",
None,
).map_err(Into::into));
// schedule network message cleanup when sink drops.
+32 -5
View File
@@ -63,6 +63,7 @@ use sc_client_api::{
};
use sp_blockchain::{HeaderBackend, Error as ClientError, HeaderMetadata};
use parity_scale_codec::{Decode, Encode};
use prometheus_endpoint::{PrometheusError, Registry};
use sp_runtime::generic::BlockId;
use sp_runtime::traits::{NumberFor, Block as BlockT, DigestFor, Zero};
use sc_keystore::KeyStorePtr;
@@ -104,7 +105,7 @@ pub use voting_rule::{
};
use aux_schema::PersistentData;
use environment::{Environment, VoterSetState, Metrics};
use environment::{Environment, VoterSetState};
use import::GrandpaBlockImport;
use until_imported::UntilGlobalMessageBlocksImported;
use communication::{NetworkBridge, Network as NetworkT};
@@ -519,6 +520,7 @@ fn global_communication<BE, Block: BlockT, C, N>(
client: Arc<C>,
network: &NetworkBridge<Block, N>,
keystore: &Option<KeyStorePtr>,
metrics: Option<until_imported::Metrics>,
) -> (
impl Stream<
Item = Result<CommunicationInH<Block, Block::Hash>, CommandOrError<Block::Hash, NumberFor<Block>>>,
@@ -549,6 +551,7 @@ fn global_communication<BE, Block: BlockT, C, N>(
client.clone(),
global_in,
"global",
metrics,
);
let global_in = global_in.map_err(CommandOrError::from);
@@ -696,6 +699,20 @@ pub fn run_grandpa_voter<Block: BlockT, BE: 'static, C, N, SC, VR>(
Ok(future::select(voter_work, telemetry_task).map(drop))
}
struct Metrics {
environment: environment::Metrics,
until_imported: until_imported::Metrics,
}
impl Metrics {
fn register(registry: &Registry) -> Result<Self, PrometheusError> {
Ok(Metrics {
environment: environment::Metrics::register(registry)?,
until_imported: until_imported::Metrics::register(registry)?,
})
}
}
/// Future that powers the voter.
#[must_use]
struct VoterWork<B, Block: BlockT, C, N: NetworkT<Block>, SC, VR> {
@@ -703,6 +720,9 @@ struct VoterWork<B, Block: BlockT, C, N: NetworkT<Block>, SC, VR> {
env: Arc<Environment<B, Block, C, N, SC, VR>>,
voter_commands_rx: mpsc::UnboundedReceiver<VoterCommand<Block::Hash, NumberFor<Block>>>,
network: NetworkBridge<Block, N>,
/// Prometheus metrics.
metrics: Option<Metrics>,
}
impl<B, Block, C, N, SC, VR> VoterWork<B, Block, C, N, SC, VR>
@@ -725,6 +745,14 @@ where
voter_commands_rx: mpsc::UnboundedReceiver<VoterCommand<Block::Hash, NumberFor<Block>>>,
prometheus_registry: Option<prometheus_endpoint::Registry>,
) -> Self {
let metrics = match prometheus_registry.as_ref().map(Metrics::register) {
Some(Ok(metrics)) => Some(metrics),
Some(Err(e)) => {
debug!(target: "afg", "Failed to register metrics: {:?}", e);
None
}
None => None,
};
let voters = persistent_data.authority_set.current_authorities();
let env = Arc::new(Environment {
@@ -738,10 +766,7 @@ where
authority_set: persistent_data.authority_set.clone(),
consensus_changes: persistent_data.consensus_changes.clone(),
voter_set_state: persistent_data.set_state.clone(),
metrics: prometheus_registry.map(|registry| {
Metrics::register(&registry)
.expect("Other metrics would have failed to register before these; qed")
}),
metrics: metrics.as_ref().map(|m| m.environment.clone()),
_phantom: PhantomData,
});
@@ -752,6 +777,7 @@ where
env,
voter_commands_rx,
network,
metrics,
};
work.rebuild_voter();
work
@@ -800,6 +826,7 @@ where
self.env.client.clone(),
&self.env.network,
&self.env.config.keystore,
self.metrics.as_ref().map(|m| m.until_imported.clone()),
);
let last_completed_round = completed_rounds.last();
@@ -255,6 +255,7 @@ where
self.client.clone(),
&self.network,
&self.keystore,
None,
);
let last_finalized_number = self.client.info().finalized_number;
@@ -29,13 +29,17 @@ use super::{
};
use log::{debug, warn};
use sc_client_api::{BlockImportNotification, ImportNotifications};
use futures::prelude::*;
use futures::stream::Fuse;
use futures_timer::Delay;
use futures::channel::mpsc::UnboundedReceiver;
use finality_grandpa::voter;
use parking_lot::Mutex;
use prometheus_endpoint::{
Gauge, U64, PrometheusError, register, Registry,
};
use sc_client_api::{BlockImportNotification, ImportNotifications};
use sp_finality_grandpa::AuthorityId;
use sp_runtime::traits::{Block as BlockT, Header as HeaderT, NumberFor};
use std::collections::{HashMap, VecDeque};
@@ -43,7 +47,6 @@ use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use sp_finality_grandpa::AuthorityId;
const LOG_PENDING_INTERVAL: Duration = Duration::from_secs(15);
@@ -77,6 +80,63 @@ pub(crate) enum DiscardWaitOrReady<Block: BlockT, W, R> {
Ready(R),
}
/// Prometheus metrics for the `UntilImported` queue.
//
// At a given point in time there can be more than one `UntilImported` queue. One can not register a
// metric twice, thus queues need to share the same Prometheus metrics instead of instantiating
// their own ones.
//
// When a queue is dropped it might still contain messages. In order for those to not distort the
// Prometheus metrics, the `Metric` struct cleans up after itself within its `Drop` implementation
// by subtracting the local_waiting_messages (the amount of messages left in the queue about to
// be dropped) from the global_waiting_messages gauge.
pub(crate) struct Metrics {
global_waiting_messages: Gauge<U64>,
local_waiting_messages: u64,
}
impl Metrics {
pub(crate) fn register(registry: &Registry) -> Result<Self, PrometheusError> {
Ok(Self {
global_waiting_messages: register(Gauge::new(
"finality_grandpa_until_imported_waiting_messages_number",
"Number of finality grandpa messages waiting within the until imported queue.",
)?, registry)?,
local_waiting_messages: 0,
})
}
fn waiting_messages_inc(&mut self) {
self.local_waiting_messages += 1;
self.global_waiting_messages.inc();
}
fn waiting_messages_dec(&mut self) {
self.local_waiting_messages -= 1;
self.global_waiting_messages.dec();
}
}
impl Clone for Metrics {
fn clone(&self) -> Self {
Metrics {
global_waiting_messages: self.global_waiting_messages.clone(),
// When cloned, reset local_waiting_messages, so the global counter is not reduced a
// second time for the same messages on `drop` of the clone.
local_waiting_messages: 0,
}
}
}
impl Drop for Metrics {
fn drop(&mut self) {
// Reduce the global counter by the amount of messages that were still left in the dropped
// queue.
self.global_waiting_messages.sub(self.local_waiting_messages)
}
}
/// Buffering imported messages until blocks with given hashes are imported.
#[pin_project::pin_project]
pub(crate) struct UntilImported<Block: BlockT, BlockStatus, BlockSyncRequester, I, M: BlockUntilImported<Block>> {
@@ -86,12 +146,17 @@ pub(crate) struct UntilImported<Block: BlockT, BlockStatus, BlockSyncRequester,
#[pin]
inner: Fuse<I>,
ready: VecDeque<M::Blocked>,
/// Interval at which to check status of each awaited block.
check_pending: Pin<Box<dyn Stream<Item = Result<(), std::io::Error>> + Send>>,
/// Mapping block hashes to their block number, the point in time it was
/// first encountered (Instant) and a list of GRANDPA messages referencing
/// the block hash.
pending: HashMap<Block::Hash, (NumberFor<Block>, Instant, Vec<M>)>,
/// Queue identifier for differentiation in logs.
identifier: &'static str,
/// Prometheus metrics.
metrics: Option<Metrics>,
}
impl<Block, BlockStatus, BlockSyncRequester, I, M> UntilImported<Block, BlockStatus, BlockSyncRequester, I, M> where
@@ -108,6 +173,7 @@ impl<Block, BlockStatus, BlockSyncRequester, I, M> UntilImported<Block, BlockSta
status_check: BlockStatus,
stream: I,
identifier: &'static str,
metrics: Option<Metrics>,
) -> Self {
// how often to check if pending messages that are waiting for blocks to be
// imported can be checked.
@@ -131,6 +197,7 @@ impl<Block, BlockStatus, BlockSyncRequester, I, M> UntilImported<Block, BlockSta
check_pending: Box::pin(check_pending),
pending: HashMap::new(),
identifier,
metrics,
}
}
}
@@ -168,6 +235,10 @@ impl<Block, BStatus, BSyncRequester, I, M> Stream for UntilImported<Block, BStat
},
DiscardWaitOrReady::Ready(item) => this.ready.push_back(item),
}
if let Some(metrics) = &mut this.metrics {
metrics.waiting_messages_inc();
}
}
Poll::Pending => break,
}
@@ -238,6 +309,9 @@ impl<Block, BStatus, BSyncRequester, I, M> Stream for UntilImported<Block, BStat
}
if let Some(ready) = this.ready.pop_front() {
if let Some(metrics) = &mut this.metrics {
metrics.waiting_messages_dec();
}
return Poll::Ready(Some(Ok(ready)))
}
@@ -583,6 +657,7 @@ mod tests {
block_status,
global_rx,
"global",
None,
);
global_tx.unbounded_send(msg).unwrap();
@@ -609,6 +684,7 @@ mod tests {
block_status,
global_rx,
"global",
None,
);
global_tx.unbounded_send(msg).unwrap();
@@ -863,6 +939,7 @@ mod tests {
block_status,
global_rx,
"global",
None,
);
let h1 = make_header(5);
@@ -986,4 +1063,25 @@ mod tests {
// block number mismatch this should return None.
assert!(waiting_block_2.wait_completed(2).is_none());
}
#[test]
fn metrics_cleans_up_after_itself() {
let r = Registry::new();
let mut m1 = Metrics::register(&r).unwrap();
let m2 = m1.clone();
// Add a new message to the 'queue' of m1.
m1.waiting_messages_inc();
// m1 and m2 are synced through the shared atomic.
assert_eq!(1, m2.global_waiting_messages.get());
// Drop 'queue' m1.
drop(m1);
// Make sure m1 cleaned up after itself, removing all messages that were left in its queue
// when dropped from the global metric.
assert_eq!(0, m2.global_waiting_messages.get());
}
}