client/finality-grandpa: Add Prometheus metrics to GossipValidator (#5237)

* client/finality-grandpa: Add Prometheus metrics to GossipValidator

Instrument finality grandpa `GossipValidator` exposing count of messages
validated by message type and message action.

```
\# HELP substrate_finality_grandpa_communication_gossip_validator_messages Number of messages validated by the finality grandpa gossip validator.
\# TYPE substrate_finality_grandpa_communication_gossip_validator_messages counter
substrate_finality_grandpa_communication_gossip_validator_messages{action="discard",message="neighbor"} 39
substrate_finality_grandpa_communication_gossip_validator_messages{action="keep",message="vote"} 28
```

* client/finality-grandpa: Add None as Prometheus registry in tests

* client/finality-granpda/src/communication: Refactor metric registration
This commit is contained in:
Max Inden
2020-03-14 12:51:44 +01:00
committed by GitHub
parent b817763ea9
commit 79fc16e4d7
6 changed files with 86 additions and 7 deletions
@@ -91,6 +91,7 @@ use sp_finality_grandpa::AuthorityId;
use sc_telemetry::{telemetry, CONSENSUS_DEBUG}; use sc_telemetry::{telemetry, CONSENSUS_DEBUG};
use log::{trace, debug}; use log::{trace, debug};
use futures::channel::mpsc; use futures::channel::mpsc;
use prometheus_endpoint::{CounterVec, Opts, PrometheusError, register, Registry, U64};
use rand::seq::SliceRandom; use rand::seq::SliceRandom;
use crate::{environment, CatchUp, CompactCommit, SignedMessage}; use crate::{environment, CatchUp, CompactCommit, SignedMessage};
@@ -1197,11 +1198,34 @@ impl<Block: BlockT> Inner<Block> {
} }
} }
// Prometheus metrics for [`GossipValidator`].
pub(crate) struct Metrics {
messages_validated: CounterVec<U64>,
}
impl Metrics {
pub(crate) fn register(registry: &prometheus_endpoint::Registry) -> Result<Self, PrometheusError> {
Ok(Self {
messages_validated: register(
CounterVec::new(
Opts::new(
"finality_grandpa_communication_gossip_validator_messages",
"Number of messages validated by the finality grandpa gossip validator."
),
&["message", "action"]
)?,
registry,
)?,
})
}
}
/// A validator for GRANDPA gossip messages. /// A validator for GRANDPA gossip messages.
pub(super) struct GossipValidator<Block: BlockT> { pub(super) struct GossipValidator<Block: BlockT> {
inner: parking_lot::RwLock<Inner<Block>>, inner: parking_lot::RwLock<Inner<Block>>,
set_state: environment::SharedVoterSetState<Block>, set_state: environment::SharedVoterSetState<Block>,
report_sender: mpsc::UnboundedSender<PeerReport>, report_sender: mpsc::UnboundedSender<PeerReport>,
metrics: Option<Metrics>,
} }
impl<Block: BlockT> GossipValidator<Block> { impl<Block: BlockT> GossipValidator<Block> {
@@ -1211,12 +1235,23 @@ impl<Block: BlockT> GossipValidator<Block> {
pub(super) fn new( pub(super) fn new(
config: crate::Config, config: crate::Config,
set_state: environment::SharedVoterSetState<Block>, set_state: environment::SharedVoterSetState<Block>,
prometheus_registry: Option<&Registry>,
) -> (GossipValidator<Block>, mpsc::UnboundedReceiver<PeerReport>) { ) -> (GossipValidator<Block>, mpsc::UnboundedReceiver<PeerReport>) {
let metrics = match prometheus_registry.map(Metrics::register) {
Some(Ok(metrics)) => Some(metrics),
Some(Err(e)) => {
debug!(target: "afg", "Failed to register metrics: {:?}", e);
None
},
None => None,
};
let (tx, rx) = mpsc::unbounded(); let (tx, rx) = mpsc::unbounded();
let val = GossipValidator { let val = GossipValidator {
inner: parking_lot::RwLock::new(Inner::new(config)), inner: parking_lot::RwLock::new(Inner::new(config)),
set_state, set_state,
report_sender: tx, report_sender: tx,
metrics: metrics,
}; };
(val, rx) (val, rx)
@@ -1279,12 +1314,21 @@ impl<Block: BlockT> GossipValidator<Block> {
let mut broadcast_topics = Vec::new(); let mut broadcast_topics = Vec::new();
let mut peer_reply = None; let mut peer_reply = None;
// Message name for Prometheus metric recording.
let message_name;
let action = { let action = {
match GossipMessage::<Block>::decode(&mut data) { match GossipMessage::<Block>::decode(&mut data) {
Ok(GossipMessage::Vote(ref message)) Ok(GossipMessage::Vote(ref message)) => {
=> self.inner.write().validate_round_message(who, message), message_name = Some("vote");
Ok(GossipMessage::Commit(ref message)) => self.inner.write().validate_commit_message(who, message), self.inner.write().validate_round_message(who, message)
},
Ok(GossipMessage::Commit(ref message)) => {
message_name = Some("commit");
self.inner.write().validate_commit_message(who, message)
},
Ok(GossipMessage::Neighbor(update)) => { Ok(GossipMessage::Neighbor(update)) => {
message_name = Some("neighbor");
let (topics, action, catch_up, report) = self.inner.write().import_neighbor_message( let (topics, action, catch_up, report) = self.inner.write().import_neighbor_message(
who, who,
update.into_neighbor_packet(), update.into_neighbor_packet(),
@@ -1298,9 +1342,12 @@ impl<Block: BlockT> GossipValidator<Block> {
peer_reply = catch_up; peer_reply = catch_up;
action action
} }
Ok(GossipMessage::CatchUp(ref message)) Ok(GossipMessage::CatchUp(ref message)) => {
=> self.inner.write().validate_catch_up_message(who, message), message_name = Some("catch_up");
self.inner.write().validate_catch_up_message(who, message)
},
Ok(GossipMessage::CatchUpRequest(request)) => { Ok(GossipMessage::CatchUpRequest(request)) => {
message_name = Some("catch_up_request");
let (reply, action) = self.inner.write().handle_catch_up_request( let (reply, action) = self.inner.write().handle_catch_up_request(
who, who,
request, request,
@@ -1311,6 +1358,7 @@ impl<Block: BlockT> GossipValidator<Block> {
action action
} }
Err(e) => { Err(e) => {
message_name = None;
debug!(target: "afg", "Error decoding message: {}", e.what()); debug!(target: "afg", "Error decoding message: {}", e.what());
telemetry!(CONSENSUS_DEBUG; "afg.err_decoding_msg"; "" => ""); telemetry!(CONSENSUS_DEBUG; "afg.err_decoding_msg"; "" => "");
@@ -1320,6 +1368,16 @@ impl<Block: BlockT> GossipValidator<Block> {
} }
}; };
// Prometheus metric recording.
if let (Some(metrics), Some(message_name)) = (&self.metrics, message_name) {
let action_name = match action {
Action::Keep(_, _) => "keep",
Action::ProcessAndDiscard(_, _) => "process_and_discard",
Action::Discard(_) => "discard",
};
metrics.messages_validated.with_label_values(&[message_name, action_name]).inc();
}
(action, broadcast_topics, peer_reply) (action, broadcast_topics, peer_reply)
} }
} }
@@ -1679,6 +1737,7 @@ mod tests {
let (val, _) = GossipValidator::<Block>::new( let (val, _) = GossipValidator::<Block>::new(
config(), config(),
voter_set_state(), voter_set_state(),
None,
); );
let set_id = 1; let set_id = 1;
@@ -1714,6 +1773,7 @@ mod tests {
let (val, _) = GossipValidator::<Block>::new( let (val, _) = GossipValidator::<Block>::new(
config(), config(),
voter_set_state(), voter_set_state(),
None,
); );
let set_id = 1; let set_id = 1;
let auth = AuthorityId::from_slice(&[1u8; 32]); let auth = AuthorityId::from_slice(&[1u8; 32]);
@@ -1758,6 +1818,7 @@ mod tests {
let (val, _) = GossipValidator::<Block>::new( let (val, _) = GossipValidator::<Block>::new(
config(), config(),
voter_set_state(), voter_set_state(),
None,
); );
let set_id = 1; let set_id = 1;
@@ -1826,6 +1887,7 @@ mod tests {
let (val, _) = GossipValidator::<Block>::new( let (val, _) = GossipValidator::<Block>::new(
config(), config(),
set_state.clone(), set_state.clone(),
None,
); );
let set_id = 1; let set_id = 1;
@@ -1880,6 +1942,7 @@ mod tests {
let (val, _) = GossipValidator::<Block>::new( let (val, _) = GossipValidator::<Block>::new(
config(), config(),
set_state.clone(), set_state.clone(),
None,
); );
// the validator starts at set id 2 // the validator starts at set id 2
@@ -1959,6 +2022,7 @@ mod tests {
let (val, _) = GossipValidator::<Block>::new( let (val, _) = GossipValidator::<Block>::new(
config(), config(),
voter_set_state(), voter_set_state(),
None,
); );
// the validator starts at set id 1. // the validator starts at set id 1.
@@ -2032,6 +2096,7 @@ mod tests {
let (val, _) = GossipValidator::<Block>::new( let (val, _) = GossipValidator::<Block>::new(
config, config,
voter_set_state(), voter_set_state(),
None,
); );
// the validator starts at set id 1. // the validator starts at set id 1.
@@ -2065,6 +2130,7 @@ mod tests {
let (val, _) = GossipValidator::<Block>::new( let (val, _) = GossipValidator::<Block>::new(
config(), config(),
voter_set_state(), voter_set_state(),
None,
); );
// the validator starts at set id 1. // the validator starts at set id 1.
@@ -2124,6 +2190,7 @@ mod tests {
let (val, _) = GossipValidator::<Block>::new( let (val, _) = GossipValidator::<Block>::new(
config, config,
voter_set_state(), voter_set_state(),
None,
); );
// the validator starts at set id 1. // the validator starts at set id 1.
@@ -2162,6 +2229,7 @@ mod tests {
let (val, _) = GossipValidator::<Block>::new( let (val, _) = GossipValidator::<Block>::new(
config(), config(),
voter_set_state(), voter_set_state(),
None,
); );
// the validator starts at set id 1. // the validator starts at set id 1.
@@ -2194,6 +2262,7 @@ mod tests {
let (val, _) = GossipValidator::<Block>::new( let (val, _) = GossipValidator::<Block>::new(
config, config,
voter_set_state(), voter_set_state(),
None,
); );
// the validator start at set id 0 // the validator start at set id 0
@@ -2271,6 +2340,7 @@ mod tests {
let (val, _) = GossipValidator::<Block>::new( let (val, _) = GossipValidator::<Block>::new(
config(), config(),
voter_set_state(), voter_set_state(),
None,
); );
// the validator start at set id 0 // the validator start at set id 0
@@ -2310,6 +2380,7 @@ mod tests {
let (val, _) = GossipValidator::<Block>::new( let (val, _) = GossipValidator::<Block>::new(
config, config,
voter_set_state(), voter_set_state(),
None,
); );
// the validator start at set id 0 // the validator start at set id 0
@@ -2358,7 +2429,7 @@ mod tests {
#[test] #[test]
fn only_gossip_commits_to_peers_on_same_set() { fn only_gossip_commits_to_peers_on_same_set() {
let (val, _) = GossipValidator::<Block>::new(config(), voter_set_state()); let (val, _) = GossipValidator::<Block>::new(config(), voter_set_state(), None);
// the validator start at set id 1 // the validator start at set id 1
val.note_set(SetId(1), Vec::new(), |_, _| {}); val.note_set(SetId(1), Vec::new(), |_, _| {});
@@ -2436,7 +2507,7 @@ mod tests {
#[test] #[test]
fn expire_commits_from_older_rounds() { fn expire_commits_from_older_rounds() {
let (val, _) = GossipValidator::<Block>::new(config(), voter_set_state()); let (val, _) = GossipValidator::<Block>::new(config(), voter_set_state(), None);
let commit = |round, set_id, target_number| { let commit = |round, set_id, target_number| {
let commit = finality_grandpa::CompactCommit { let commit = finality_grandpa::CompactCommit {
@@ -30,6 +30,7 @@
use futures::{prelude::*, channel::mpsc}; use futures::{prelude::*, channel::mpsc};
use log::{debug, trace}; use log::{debug, trace};
use parking_lot::Mutex; use parking_lot::Mutex;
use prometheus_endpoint::Registry;
use std::{pin::Pin, sync::Arc, task::{Context, Poll}}; use std::{pin::Pin, sync::Arc, task::{Context, Poll}};
use finality_grandpa::Message::{Prevote, Precommit, PrimaryPropose}; use finality_grandpa::Message::{Prevote, Precommit, PrimaryPropose};
@@ -178,10 +179,12 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
service: N, service: N,
config: crate::Config, config: crate::Config,
set_state: crate::environment::SharedVoterSetState<B>, set_state: crate::environment::SharedVoterSetState<B>,
prometheus_registry: Option<&Registry>,
) -> Self { ) -> Self {
let (validator, report_stream) = GossipValidator::new( let (validator, report_stream) = GossipValidator::new(
config, config,
set_state.clone(), set_state.clone(),
prometheus_registry,
); );
let validator = Arc::new(validator); let validator = Arc::new(validator);
@@ -179,6 +179,7 @@ pub(crate) fn make_test_network() -> (
net.clone(), net.clone(),
config(), config(),
voter_set_state(), voter_set_state(),
None,
); );
( (
@@ -591,6 +591,7 @@ pub fn run_grandpa_voter<Block: BlockT, BE: 'static, C, N, SC, VR>(
network, network,
config.clone(), config.clone(),
persistent_data.set_state.clone(), persistent_data.set_state.clone(),
prometheus_registry.as_ref(),
); );
register_finality_tracker_inherent_data_provider(client.clone(), &inherent_data_providers)?; register_finality_tracker_inherent_data_provider(client.clone(), &inherent_data_providers)?;
@@ -178,6 +178,7 @@ where
network, network,
config.clone(), config.clone(),
persistent_data.set_state.clone(), persistent_data.set_state.clone(),
None,
); );
let observer_work = ObserverWork::new( let observer_work = ObserverWork::new(
@@ -1220,6 +1220,7 @@ fn voter_persists_its_votes() {
net.lock().peers[1].network_service().clone(), net.lock().peers[1].network_service().clone(),
config.clone(), config.clone(),
set_state, set_state,
None,
); );
let (round_rx, round_tx) = network.round_communication( let (round_rx, round_tx) = network.round_communication(
@@ -1616,6 +1617,7 @@ fn grandpa_environment_respects_voting_rules() {
network_service.clone(), network_service.clone(),
config.clone(), config.clone(),
set_state.clone(), set_state.clone(),
None,
); );
Environment { Environment {