mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-13 07:01:05 +00:00
client/authority-discovery: Instrument code with Prometheus (#5195)
* client/authority-discovery: Instrument code with Prometheus Introduce Prometheus metrics into the authority discovery module enabling one to observe: - authority_discovery_times_published_total - authority_discovery_amount_external_addresses_last_published - authority_discovery_times_requested_total - authority_discovery_dht_event_received * client/authority-discovery/src/lib.rs: Reword metric help texts Co-Authored-By: Ashley <ashley.ruglys@gmail.com> * client/authority-discovery/src/lib.rs: Reword metric help text Co-Authored-By: Ashley <ashley.ruglys@gmail.com> * client/authority-discovery/tests: Fix struct instantiation + basic test Co-authored-by: Ashley <ashley.ruglys@gmail.com>
This commit is contained in:
@@ -20,6 +20,7 @@ futures = "0.3.1"
|
||||
futures-timer = "3.0.1"
|
||||
libp2p = { version = "0.16.2", default-features = false, features = ["secp256k1", "libp2p-websocket"] }
|
||||
log = "0.4.8"
|
||||
prometheus-endpoint = { package = "substrate-prometheus-endpoint", path = "../../utils/prometheus", version = "0.8.0-alpha.2" }
|
||||
prost = "0.6.1"
|
||||
rand = "0.7.2"
|
||||
sc-client-api = { version = "2.0.0-alpha.2", path = "../api" }
|
||||
|
||||
@@ -46,4 +46,6 @@ pub enum Error {
|
||||
EncodingDecodingScale(codec::Error),
|
||||
/// Failed to parse a libp2p multi address.
|
||||
ParsingMultiaddress(libp2p::core::multiaddr::Error),
|
||||
/// Failed to register Prometheus metric.
|
||||
Prometheus(prometheus_endpoint::PrometheusError),
|
||||
}
|
||||
|
||||
@@ -58,6 +58,7 @@ use codec::{Decode, Encode};
|
||||
use error::{Error, Result};
|
||||
use libp2p::Multiaddr;
|
||||
use log::{debug, error, log_enabled, warn};
|
||||
use prometheus_endpoint::{Counter, CounterVec, Gauge, Opts, U64, register};
|
||||
use prost::Message;
|
||||
use sc_client_api::blockchain::HeaderBackend;
|
||||
use sc_network::{DhtEvent, ExHashT, NetworkStateInfo};
|
||||
@@ -87,6 +88,53 @@ const LIBP2P_KADEMLIA_BOOTSTRAP_TIME: Duration = Duration::from_secs(30);
|
||||
/// discovery module.
|
||||
const AUTHORITIES_PRIORITY_GROUP_NAME: &'static str = "authorities";
|
||||
|
||||
/// Prometheus metrics for an `AuthorityDiscovery`.
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct Metrics {
|
||||
publish: Counter<U64>,
|
||||
amount_last_published: Gauge<U64>,
|
||||
request: Counter<U64>,
|
||||
dht_event_received: CounterVec<U64>,
|
||||
}
|
||||
|
||||
impl Metrics {
|
||||
pub(crate) fn register(registry: &prometheus_endpoint::Registry) -> Result<Self> {
|
||||
Ok(Self {
|
||||
publish: register(
|
||||
Counter::new(
|
||||
"authority_discovery_times_published_total",
|
||||
"Number of times authority discovery has published external addresses."
|
||||
)?,
|
||||
registry,
|
||||
)?,
|
||||
amount_last_published: register(
|
||||
Gauge::new(
|
||||
"authority_discovery_amount_external_addresses_last_published",
|
||||
"Number of external addresses published when authority discovery last published addresses ."
|
||||
)?,
|
||||
registry,
|
||||
)?,
|
||||
request: register(
|
||||
Counter::new(
|
||||
"authority_discovery_times_requested_total",
|
||||
"Number of times authority discovery has requested external addresses."
|
||||
)?,
|
||||
registry,
|
||||
)?,
|
||||
dht_event_received: register(
|
||||
CounterVec::new(
|
||||
Opts::new(
|
||||
"authority_discovery_dht_event_received",
|
||||
"Number of dht events received by authority discovery."
|
||||
),
|
||||
&["name"],
|
||||
)?,
|
||||
registry,
|
||||
)?,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// An `AuthorityDiscovery` makes a given authority discoverable and discovers other authorities.
|
||||
pub struct AuthorityDiscovery<Client, Network, Block>
|
||||
where
|
||||
@@ -118,6 +166,8 @@ where
|
||||
|
||||
addr_cache: addr_cache::AddrCache<AuthorityId, Multiaddr>,
|
||||
|
||||
metrics: Option<Metrics>,
|
||||
|
||||
phantom: PhantomData<Block>,
|
||||
}
|
||||
|
||||
@@ -140,6 +190,7 @@ where
|
||||
sentry_nodes: Vec<String>,
|
||||
key_store: BareCryptoStorePtr,
|
||||
dht_event_rx: Pin<Box<dyn Stream<Item = DhtEvent> + Send>>,
|
||||
prometheus_registry: Option<prometheus_endpoint::Registry>,
|
||||
) -> Self {
|
||||
// Kademlia's default time-to-live for Dht records is 36h, republishing records every 24h.
|
||||
// Given that a node could restart at any point in time, one can not depend on the
|
||||
@@ -177,6 +228,19 @@ where
|
||||
|
||||
let addr_cache = AddrCache::new();
|
||||
|
||||
let metrics = match prometheus_registry {
|
||||
Some(registry) => {
|
||||
match Metrics::register(®istry) {
|
||||
Ok(metrics) => Some(metrics),
|
||||
Err(e) => {
|
||||
error!(target: "sub-authority-discovery", "Failed to register metrics: {:?}", e);
|
||||
None
|
||||
},
|
||||
}
|
||||
},
|
||||
None => None,
|
||||
};
|
||||
|
||||
AuthorityDiscovery {
|
||||
client,
|
||||
network,
|
||||
@@ -186,13 +250,18 @@ where
|
||||
publish_interval,
|
||||
query_interval,
|
||||
addr_cache,
|
||||
metrics,
|
||||
phantom: PhantomData,
|
||||
}
|
||||
}
|
||||
|
||||
/// Publish either our own or if specified the public addresses of our sentry nodes.
|
||||
fn publish_ext_addresses(&mut self) -> Result<()> {
|
||||
let addresses = match &self.sentry_nodes {
|
||||
if let Some(metrics) = &self.metrics {
|
||||
metrics.publish.inc()
|
||||
}
|
||||
|
||||
let addresses: Vec<_> = match &self.sentry_nodes {
|
||||
Some(addrs) => addrs.clone().into_iter()
|
||||
.map(|a| a.to_vec())
|
||||
.collect(),
|
||||
@@ -205,6 +274,10 @@ where
|
||||
.collect(),
|
||||
};
|
||||
|
||||
if let Some(metrics) = &self.metrics {
|
||||
metrics.amount_last_published.set(addresses.len() as u64);
|
||||
}
|
||||
|
||||
let mut serialized_addresses = vec![];
|
||||
schema::AuthorityAddresses { addresses }
|
||||
.encode(&mut serialized_addresses)
|
||||
@@ -231,6 +304,10 @@ where
|
||||
}
|
||||
|
||||
fn request_addresses_of_others(&mut self) -> Result<()> {
|
||||
if let Some(metrics) = &self.metrics {
|
||||
metrics.request.inc();
|
||||
}
|
||||
|
||||
let id = BlockId::hash(self.client.info().best_hash);
|
||||
|
||||
let authorities = self
|
||||
@@ -251,6 +328,10 @@ where
|
||||
while let Poll::Ready(Some(event)) = self.dht_event_rx.poll_next_unpin(cx) {
|
||||
match event {
|
||||
DhtEvent::ValueFound(v) => {
|
||||
if let Some(metrics) = &self.metrics {
|
||||
metrics.dht_event_received.with_label_values(&["value_found"]).inc();
|
||||
}
|
||||
|
||||
if log_enabled!(log::Level::Debug) {
|
||||
let hashes = v.iter().map(|(hash, _value)| hash.clone());
|
||||
debug!(
|
||||
@@ -261,17 +342,36 @@ where
|
||||
|
||||
self.handle_dht_value_found_event(v)?;
|
||||
}
|
||||
DhtEvent::ValueNotFound(hash) => debug!(
|
||||
target: "sub-authority-discovery",
|
||||
"Value for hash '{:?}' not found on Dht.", hash
|
||||
),
|
||||
DhtEvent::ValuePut(hash) => debug!(
|
||||
target: "sub-authority-discovery",
|
||||
"Successfully put hash '{:?}' on Dht.", hash),
|
||||
DhtEvent::ValuePutFailed(hash) => warn!(
|
||||
target: "sub-authority-discovery",
|
||||
"Failed to put hash '{:?}' on Dht.", hash
|
||||
),
|
||||
DhtEvent::ValueNotFound(hash) => {
|
||||
if let Some(metrics) = &self.metrics {
|
||||
metrics.dht_event_received.with_label_values(&["value_not_found"]).inc();
|
||||
}
|
||||
|
||||
debug!(
|
||||
target: "sub-authority-discovery",
|
||||
"Value for hash '{:?}' not found on Dht.", hash
|
||||
)
|
||||
},
|
||||
DhtEvent::ValuePut(hash) => {
|
||||
if let Some(metrics) = &self.metrics {
|
||||
metrics.dht_event_received.with_label_values(&["value_put"]).inc();
|
||||
}
|
||||
|
||||
debug!(
|
||||
target: "sub-authority-discovery",
|
||||
"Successfully put hash '{:?}' on Dht.", hash,
|
||||
)
|
||||
},
|
||||
DhtEvent::ValuePutFailed(hash) => {
|
||||
if let Some(metrics) = &self.metrics {
|
||||
metrics.dht_event_received.with_label_values(&["value_put_failed"]).inc();
|
||||
}
|
||||
|
||||
warn!(
|
||||
target: "sub-authority-discovery",
|
||||
"Failed to put hash '{:?}' on Dht.", hash
|
||||
)
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
@@ -394,7 +494,7 @@ where
|
||||
|
||||
/// Update the peer set 'authority' priority group.
|
||||
//
|
||||
fn update_peer_set_priority_group(&self) -> Result<()>{
|
||||
fn update_peer_set_priority_group(&self) -> Result<()> {
|
||||
let addresses = self.addr_cache.get_subset();
|
||||
|
||||
debug!(
|
||||
|
||||
@@ -275,6 +275,29 @@ impl NetworkStateInfo for TestNetwork {
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn new_registers_metrics() {
|
||||
let (_dht_event_tx, dht_event_rx) = channel(1000);
|
||||
let network: Arc<TestNetwork> = Arc::new(Default::default());
|
||||
let key_store = KeyStore::new();
|
||||
let test_api = Arc::new(TestApi {
|
||||
authorities: vec![],
|
||||
});
|
||||
|
||||
let registry = prometheus_endpoint::Registry::new();
|
||||
|
||||
AuthorityDiscovery::new(
|
||||
test_api,
|
||||
network.clone(),
|
||||
vec![],
|
||||
key_store,
|
||||
dht_event_rx.boxed(),
|
||||
Some(registry.clone()),
|
||||
);
|
||||
|
||||
assert!(registry.gather().len() > 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn publish_ext_addresses_puts_record_on_dht() {
|
||||
let (_dht_event_tx, dht_event_rx) = channel(1000);
|
||||
@@ -294,6 +317,7 @@ fn publish_ext_addresses_puts_record_on_dht() {
|
||||
vec![],
|
||||
key_store,
|
||||
dht_event_rx.boxed(),
|
||||
None,
|
||||
);
|
||||
|
||||
authority_discovery.publish_ext_addresses().unwrap();
|
||||
@@ -324,6 +348,7 @@ fn request_addresses_of_others_triggers_dht_get_query() {
|
||||
vec![],
|
||||
key_store,
|
||||
dht_event_rx.boxed(),
|
||||
None,
|
||||
);
|
||||
|
||||
authority_discovery.request_addresses_of_others().unwrap();
|
||||
@@ -351,6 +376,7 @@ fn handle_dht_events_with_value_found_should_call_set_priority_group() {
|
||||
vec![],
|
||||
key_store,
|
||||
dht_event_rx.boxed(),
|
||||
None,
|
||||
);
|
||||
|
||||
// Create sample dht event.
|
||||
|
||||
Reference in New Issue
Block a user