Adjustments to Kademlia-related metrics (#5660)

* Turn kbuckets_num_nodes into a GaugeVec

* random_kademlia_queries -> kademlia_random_queries

* kademalia_random_queries_total now a CounterVec

* Add metrics about records store
This commit is contained in:
Pierre Krieger
2020-04-17 09:11:47 +02:00
committed by GitHub
parent bc0b8fbddf
commit 0fd5643e84
4 changed files with 103 additions and 31 deletions
+17 -5
View File
@@ -15,7 +15,7 @@
// along with Substrate. If not, see <http://www.gnu.org/licenses/>. // along with Substrate. If not, see <http://www.gnu.org/licenses/>.
use crate::{ use crate::{
config::Role, config::{ProtocolId, Role},
debug_info, discovery::{DiscoveryBehaviour, DiscoveryConfig, DiscoveryOut}, debug_info, discovery::{DiscoveryBehaviour, DiscoveryConfig, DiscoveryOut},
Event, ObservedRole, DhtEvent, ExHashT, Event, ObservedRole, DhtEvent, ExHashT,
}; };
@@ -61,7 +61,7 @@ pub enum BehaviourOut<B: BlockT> {
JustificationImport(Origin, B::Hash, NumberFor<B>, Justification), JustificationImport(Origin, B::Hash, NumberFor<B>, Justification),
FinalityProofImport(Origin, B::Hash, NumberFor<B>, Vec<u8>), FinalityProofImport(Origin, B::Hash, NumberFor<B>, Vec<u8>),
/// Started a random Kademlia discovery query. /// Started a random Kademlia discovery query.
RandomKademliaStarted, RandomKademliaStarted(ProtocolId),
Event(Event), Event(Event),
} }
@@ -98,10 +98,20 @@ impl<B: BlockT, H: ExHashT> Behaviour<B, H> {
} }
/// Returns the number of nodes that are in the Kademlia k-buckets. /// Returns the number of nodes that are in the Kademlia k-buckets.
pub fn num_kbuckets_entries(&mut self) -> usize { pub fn num_kbuckets_entries(&mut self) -> impl ExactSizeIterator<Item = (&ProtocolId, usize)> {
self.discovery.num_kbuckets_entries() self.discovery.num_kbuckets_entries()
} }
/// Returns the number of records in the Kademlia record stores.
pub fn num_kademlia_records(&mut self) -> impl ExactSizeIterator<Item = (&ProtocolId, usize)> {
self.discovery.num_kademlia_records()
}
/// Returns the total size in bytes of all the records in the Kademlia record stores.
pub fn kademlia_records_total_size(&mut self) -> impl ExactSizeIterator<Item = (&ProtocolId, usize)> {
self.discovery.kademlia_records_total_size()
}
/// Borrows `self` and returns a struct giving access to the information about a node. /// Borrows `self` and returns a struct giving access to the information about a node.
/// ///
/// Returns `None` if we don't know anything about this node. Always returns `Some` for nodes /// Returns `None` if we don't know anything about this node. Always returns `Some` for nodes
@@ -268,8 +278,10 @@ impl<B: BlockT, H: ExHashT> NetworkBehaviourEventProcess<DiscoveryOut>
DiscoveryOut::ValuePutFailed(key) => { DiscoveryOut::ValuePutFailed(key) => {
self.events.push(BehaviourOut::Event(Event::Dht(DhtEvent::ValuePutFailed(key)))); self.events.push(BehaviourOut::Event(Event::Dht(DhtEvent::ValuePutFailed(key))));
} }
DiscoveryOut::RandomKademliaStarted => { DiscoveryOut::RandomKademliaStarted(protocols) => {
self.events.push(BehaviourOut::RandomKademliaStarted); for protocol in protocols {
self.events.push(BehaviourOut::RandomKademliaStarted(protocol));
}
} }
} }
} }
+26 -7
View File
@@ -55,7 +55,7 @@ use libp2p::kad::{Kademlia, KademliaConfig, KademliaEvent, Quorum, Record};
use libp2p::kad::GetClosestPeersError; use libp2p::kad::GetClosestPeersError;
use libp2p::kad::handler::KademliaHandler; use libp2p::kad::handler::KademliaHandler;
use libp2p::kad::QueryId; use libp2p::kad::QueryId;
use libp2p::kad::record::{self, store::MemoryStore}; use libp2p::kad::record::{self, store::{MemoryStore, RecordStore}};
#[cfg(not(target_os = "unknown"))] #[cfg(not(target_os = "unknown"))]
use libp2p::swarm::toggle::Toggle; use libp2p::swarm::toggle::Toggle;
#[cfg(not(target_os = "unknown"))] #[cfg(not(target_os = "unknown"))]
@@ -77,7 +77,7 @@ pub struct DiscoveryConfig {
} }
impl DiscoveryConfig { impl DiscoveryConfig {
/// Crate a default configuration with the given public key. /// Create a default configuration with the given public key.
pub fn new(local_public_key: PublicKey) -> Self { pub fn new(local_public_key: PublicKey) -> Self {
let mut this = DiscoveryConfig { let mut this = DiscoveryConfig {
local_peer_id: local_public_key.into_peer_id(), local_peer_id: local_public_key.into_peer_id(),
@@ -276,8 +276,27 @@ impl DiscoveryBehaviour {
} }
/// Returns the number of nodes that are in the Kademlia k-buckets. /// Returns the number of nodes that are in the Kademlia k-buckets.
pub fn num_kbuckets_entries(&mut self) -> usize { pub fn num_kbuckets_entries(&mut self) -> impl ExactSizeIterator<Item = (&ProtocolId, usize)> {
self.known_peers().count() self.kademlias.iter_mut().map(|(id, kad)| (id, kad.kbuckets_entries().count()))
}
/// Returns the number of records in the Kademlia record stores.
pub fn num_kademlia_records(&mut self) -> impl ExactSizeIterator<Item = (&ProtocolId, usize)> {
// Note that this code is ok only because we use a `MemoryStore`.
self.kademlias.iter_mut().map(|(id, kad)| {
let num = kad.store_mut().records().count();
(id, num)
})
}
/// Returns the total size in bytes of all the records in the Kademlia record stores.
pub fn kademlia_records_total_size(&mut self) -> impl ExactSizeIterator<Item = (&ProtocolId, usize)> {
// Note that this code is ok only because we use a `MemoryStore`. If the records were
// for example stored on disk, this would load every single one of them every single time.
self.kademlias.iter_mut().map(|(id, kad)| {
let size = kad.store_mut().records().fold(0, |tot, rec| tot + rec.value.len());
(id, size)
})
} }
} }
@@ -307,8 +326,8 @@ pub enum DiscoveryOut {
/// Inserting a value into the DHT failed. /// Inserting a value into the DHT failed.
ValuePutFailed(record::Key), ValuePutFailed(record::Key),
/// Started a random Kademlia query. /// Started a random Kademlia query for each DHT identified by the given `ProtocolId`s.
RandomKademliaStarted, RandomKademliaStarted(Vec<ProtocolId>),
} }
impl NetworkBehaviour for DiscoveryBehaviour { impl NetworkBehaviour for DiscoveryBehaviour {
@@ -515,7 +534,7 @@ impl NetworkBehaviour for DiscoveryBehaviour {
Duration::from_secs(60)); Duration::from_secs(60));
if actually_started { if actually_started {
let ev = DiscoveryOut::RandomKademliaStarted; let ev = DiscoveryOut::RandomKademliaStarted(self.kademlias.keys().cloned().collect());
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev)); return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev));
} }
} }
+57 -16
View File
@@ -880,7 +880,10 @@ struct Metrics {
incoming_connections_total: Counter<U64>, incoming_connections_total: Counter<U64>,
is_major_syncing: Gauge<U64>, is_major_syncing: Gauge<U64>,
issued_light_requests: Counter<U64>, issued_light_requests: Counter<U64>,
kbuckets_num_nodes: Gauge<U64>, kademlia_random_queries_total: CounterVec<U64>,
kademlia_records_count: GaugeVec<U64>,
kademlia_records_sizes_total: GaugeVec<U64>,
kbuckets_num_nodes: GaugeVec<U64>,
listeners_local_addresses: Gauge<U64>, listeners_local_addresses: Gauge<U64>,
listeners_errors_total: Counter<U64>, listeners_errors_total: Counter<U64>,
network_per_sec_bytes: GaugeVec<U64>, network_per_sec_bytes: GaugeVec<U64>,
@@ -893,7 +896,6 @@ struct Metrics {
peerset_num_requested: Gauge<U64>, peerset_num_requested: Gauge<U64>,
pending_connections: Gauge<U64>, pending_connections: Gauge<U64>,
pending_connections_errors_total: CounterVec<U64>, pending_connections_errors_total: CounterVec<U64>,
random_kademalia_queries_total: Counter<U64>,
} }
impl Metrics { impl Metrics {
@@ -945,8 +947,33 @@ impl Metrics {
"issued_light_requests", "issued_light_requests",
"Number of light client requests that our node has issued.", "Number of light client requests that our node has issued.",
)?, registry)?, )?, registry)?,
kbuckets_num_nodes: register(Gauge::new( kademlia_random_queries_total: register(CounterVec::new(
"sub_libp2p_kbuckets_num_nodes", "Number of nodes in the Kademlia k-buckets" Opts::new(
"sub_libp2p_kademlia_random_queries_total",
"Number of random Kademlia queries started"
),
&["protocol"]
)?, registry)?,
kademlia_records_count: register(GaugeVec::new(
Opts::new(
"sub_libp2p_kademlia_records_count",
"Number of records in the Kademlia records store"
),
&["protocol"]
)?, registry)?,
kademlia_records_sizes_total: register(GaugeVec::new(
Opts::new(
"sub_libp2p_kademlia_records_sizes_total",
"Total size of all the records in the Kademlia records store"
),
&["protocol"]
)?, registry)?,
kbuckets_num_nodes: register(GaugeVec::new(
Opts::new(
"sub_libp2p_kbuckets_num_nodes",
"Number of nodes in the Kademlia k-buckets"
),
&["protocol"]
)?, registry)?, )?, registry)?,
listeners_local_addresses: register(Gauge::new( listeners_local_addresses: register(Gauge::new(
"sub_libp2p_listeners_local_addresses", "Number of local addresses we're listening on" "sub_libp2p_listeners_local_addresses", "Number of local addresses we're listening on"
@@ -1017,24 +1044,23 @@ impl Metrics {
), ),
&["reason"] &["reason"]
)?, registry)?, )?, registry)?,
random_kademalia_queries_total: register(Counter::new(
"sub_libp2p_random_kademalia_queries_total", "Number of random Kademlia queries started",
)?, registry)?,
}) })
} }
fn update_with_network_event(&self, event: &Event) { fn update_with_network_event(&self, event: &Event) {
match event { match event {
Event::NotificationStreamOpened { engine_id, .. } => { Event::NotificationStreamOpened { engine_id, .. } => {
self.notifications_streams_opened_total.with_label_values(&[&engine_id_to_string(&engine_id)]).inc(); self.notifications_streams_opened_total
.with_label_values(&[&maybe_utf8_bytes_to_string(engine_id)]).inc();
}, },
Event::NotificationStreamClosed { engine_id, .. } => { Event::NotificationStreamClosed { engine_id, .. } => {
self.notifications_streams_closed_total.with_label_values(&[&engine_id_to_string(&engine_id)]).inc(); self.notifications_streams_closed_total
.with_label_values(&[&maybe_utf8_bytes_to_string(engine_id)]).inc();
}, },
Event::NotificationsReceived { messages, .. } => { Event::NotificationsReceived { messages, .. } => {
for (engine_id, message) in messages { for (engine_id, message) in messages {
self.notifications_sizes self.notifications_sizes
.with_label_values(&["in", &engine_id_to_string(&engine_id)]) .with_label_values(&["in", &maybe_utf8_bytes_to_string(engine_id)])
.observe(message.len() as f64); .observe(message.len() as f64);
} }
}, },
@@ -1097,7 +1123,7 @@ impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> {
ServiceToWorkerMsg::WriteNotification { message, engine_id, target } => { ServiceToWorkerMsg::WriteNotification { message, engine_id, target } => {
if let Some(metrics) = this.metrics.as_ref() { if let Some(metrics) = this.metrics.as_ref() {
metrics.notifications_sizes metrics.notifications_sizes
.with_label_values(&["out", &engine_id_to_string(&engine_id)]) .with_label_values(&["out", &maybe_utf8_bytes_to_string(&engine_id)])
.observe(message.len() as f64); .observe(message.len() as f64);
} }
this.network_service.user_protocol_mut().write_notification(target, engine_id, message) this.network_service.user_protocol_mut().write_notification(target, engine_id, message)
@@ -1137,9 +1163,11 @@ impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> {
} }
this.import_queue.import_finality_proof(origin, hash, nb, proof); this.import_queue.import_finality_proof(origin, hash, nb, proof);
}, },
Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::RandomKademliaStarted)) => { Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::RandomKademliaStarted(protocol))) => {
if let Some(metrics) = this.metrics.as_ref() { if let Some(metrics) = this.metrics.as_ref() {
metrics.random_kademalia_queries_total.inc(); metrics.kademlia_random_queries_total
.with_label_values(&[&maybe_utf8_bytes_to_string(protocol.as_bytes())])
.inc();
} }
}, },
Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::Event(ev))) => { Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::Event(ev))) => {
@@ -1292,7 +1320,18 @@ impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> {
metrics.network_per_sec_bytes.with_label_values(&["in"]).set(this.service.bandwidth.average_download_per_sec()); metrics.network_per_sec_bytes.with_label_values(&["in"]).set(this.service.bandwidth.average_download_per_sec());
metrics.network_per_sec_bytes.with_label_values(&["out"]).set(this.service.bandwidth.average_upload_per_sec()); metrics.network_per_sec_bytes.with_label_values(&["out"]).set(this.service.bandwidth.average_upload_per_sec());
metrics.is_major_syncing.set(is_major_syncing as u64); metrics.is_major_syncing.set(is_major_syncing as u64);
metrics.kbuckets_num_nodes.set(this.network_service.num_kbuckets_entries() as u64); for (proto, num_entries) in this.network_service.num_kbuckets_entries() {
let proto = maybe_utf8_bytes_to_string(proto.as_bytes());
metrics.kbuckets_num_nodes.with_label_values(&[&proto]).set(num_entries as u64);
}
for (proto, num_entries) in this.network_service.num_kademlia_records() {
let proto = maybe_utf8_bytes_to_string(proto.as_bytes());
metrics.kademlia_records_count.with_label_values(&[&proto]).set(num_entries as u64);
}
for (proto, num_entries) in this.network_service.kademlia_records_total_size() {
let proto = maybe_utf8_bytes_to_string(proto.as_bytes());
metrics.kademlia_records_sizes_total.with_label_values(&[&proto]).set(num_entries as u64);
}
metrics.peers_count.set(num_connected_peers as u64); metrics.peers_count.set(num_connected_peers as u64);
metrics.peerset_num_discovered.set(this.network_service.user_protocol().num_discovered_peers() as u64); metrics.peerset_num_discovered.set(this.network_service.user_protocol().num_discovered_peers() as u64);
metrics.peerset_num_requested.set(this.network_service.user_protocol().requested_peers().count() as u64); metrics.peerset_num_requested.set(this.network_service.user_protocol().requested_peers().count() as u64);
@@ -1306,8 +1345,10 @@ impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> {
impl<B: BlockT + 'static, H: ExHashT> Unpin for NetworkWorker<B, H> { impl<B: BlockT + 'static, H: ExHashT> Unpin for NetworkWorker<B, H> {
} }
/// Turns a `ConsensusEngineId` into a representable string. /// Turns bytes that are potentially UTF-8 into a reasonable representable string.
fn engine_id_to_string(id: &ConsensusEngineId) -> Cow<str> { ///
/// Meant to be used only for debugging or metrics-reporting purposes.
fn maybe_utf8_bytes_to_string(id: &[u8]) -> Cow<str> {
if let Ok(s) = std::str::from_utf8(&id[..]) { if let Ok(s) = std::str::from_utf8(&id[..]) {
Cow::Borrowed(s) Cow::Borrowed(s)
} else { } else {
@@ -31,7 +31,7 @@
//! //!
use crate::Event; use crate::Event;
use super::engine_id_to_string; use super::maybe_utf8_bytes_to_string;
use futures::{prelude::*, channel::mpsc, ready}; use futures::{prelude::*, channel::mpsc, ready};
use parking_lot::Mutex; use parking_lot::Mutex;
@@ -240,7 +240,7 @@ impl Metrics {
.with_label_values(&[&format!("notif-{:?}", engine_id), "sent", name]) .with_label_values(&[&format!("notif-{:?}", engine_id), "sent", name])
.inc_by(num); .inc_by(num);
self.notifications_sizes self.notifications_sizes
.with_label_values(&[&engine_id_to_string(engine_id), "sent", name]) .with_label_values(&[&maybe_utf8_bytes_to_string(engine_id), "sent", name])
.inc_by(num.saturating_mul(u64::try_from(message.len()).unwrap_or(u64::max_value()))); .inc_by(num.saturating_mul(u64::try_from(message.len()).unwrap_or(u64::max_value())));
} }
}, },
@@ -270,7 +270,7 @@ impl Metrics {
.with_label_values(&[&format!("notif-{:?}", engine_id), "received", name]) .with_label_values(&[&format!("notif-{:?}", engine_id), "received", name])
.inc(); .inc();
self.notifications_sizes self.notifications_sizes
.with_label_values(&[&engine_id_to_string(engine_id), "received", name]) .with_label_values(&[&maybe_utf8_bytes_to_string(engine_id), "received", name])
.inc_by(u64::try_from(message.len()).unwrap_or(u64::max_value())); .inc_by(u64::try_from(message.len()).unwrap_or(u64::max_value()));
} }
}, },