client/network: Expose DHT query duration to Prometheus (#6784)

Expose duration of DHT put and get request as a Prometheus histogram.
This commit is contained in:
Max Inden
2020-08-03 10:30:06 +02:00
committed by GitHub
parent a1786a92ec
commit 1365eef4c1
3 changed files with 58 additions and 25 deletions
+11 -10
View File
@@ -145,8 +145,9 @@ pub enum BehaviourOut<B: BlockT> {
messages: Vec<(ConsensusEngineId, Bytes)>,
},
/// Event generated by a DHT.
Dht(DhtEvent),
/// Events generated by a DHT as a response to get_value or put_value requests as well as the
/// request duration.
Dht(DhtEvent, Duration),
}
impl<B: BlockT, H: ExHashT> Behaviour<B, H> {
@@ -454,17 +455,17 @@ impl<B: BlockT, H: ExHashT> NetworkBehaviourEventProcess<DiscoveryOut>
DiscoveryOut::Discovered(peer_id) => {
self.substrate.add_discovered_nodes(iter::once(peer_id));
}
DiscoveryOut::ValueFound(results) => {
self.events.push_back(BehaviourOut::Dht(DhtEvent::ValueFound(results)));
DiscoveryOut::ValueFound(results, duration) => {
self.events.push_back(BehaviourOut::Dht(DhtEvent::ValueFound(results), duration));
}
DiscoveryOut::ValueNotFound(key) => {
self.events.push_back(BehaviourOut::Dht(DhtEvent::ValueNotFound(key)));
DiscoveryOut::ValueNotFound(key, duration) => {
self.events.push_back(BehaviourOut::Dht(DhtEvent::ValueNotFound(key), duration));
}
DiscoveryOut::ValuePut(key) => {
self.events.push_back(BehaviourOut::Dht(DhtEvent::ValuePut(key)));
DiscoveryOut::ValuePut(key, duration) => {
self.events.push_back(BehaviourOut::Dht(DhtEvent::ValuePut(key), duration));
}
DiscoveryOut::ValuePutFailed(key) => {
self.events.push_back(BehaviourOut::Dht(DhtEvent::ValuePutFailed(key)));
DiscoveryOut::ValuePutFailed(key, duration) => {
self.events.push_back(BehaviourOut::Dht(DhtEvent::ValuePutFailed(key), duration));
}
DiscoveryOut::RandomKademliaStarted(protocols) => {
for protocol in protocols {
+21 -13
View File
@@ -312,7 +312,7 @@ impl DiscoveryBehaviour {
for k in self.kademlias.values_mut() {
if let Err(e) = k.put_record(Record::new(key.clone(), value.clone()), Quorum::All) {
warn!(target: "sub-libp2p", "Libp2p => Failed to put record: {:?}", e);
self.pending_events.push_back(DiscoveryOut::ValuePutFailed(key.clone()));
self.pending_events.push_back(DiscoveryOut::ValuePutFailed(key.clone(), Duration::from_secs(0)));
}
}
}
@@ -379,17 +379,25 @@ pub enum DiscoveryOut {
/// the `identify` protocol.
UnroutablePeer(PeerId),
/// The DHT yielded results for the record request, grouped in (key, value) pairs.
ValueFound(Vec<(record::Key, Vec<u8>)>),
/// The DHT yielded results for the record request.
///
/// Returning the result grouped in (key, value) pairs as well as the request duration..
ValueFound(Vec<(record::Key, Vec<u8>)>, Duration),
/// The record requested was not found in the DHT.
ValueNotFound(record::Key),
///
/// Returning the corresponding key as well as the request duration.
ValueNotFound(record::Key, Duration),
/// The record with a given key was successfully inserted into the DHT.
ValuePut(record::Key),
///
/// Returning the corresponding key as well as the request duration.
ValuePut(record::Key, Duration),
/// Inserting a value into the DHT failed.
ValuePutFailed(record::Key),
///
/// Returning the corresponding key as well as the request duration.
ValuePutFailed(record::Key, Duration),
/// Started a random Kademlia query for each DHT identified by the given `ProtocolId`s.
RandomKademliaStarted(Vec<ProtocolId>),
@@ -620,7 +628,7 @@ impl NetworkBehaviour for DiscoveryBehaviour {
}
}
}
KademliaEvent::QueryResult { result: QueryResult::GetRecord(res), .. } => {
KademliaEvent::QueryResult { result: QueryResult::GetRecord(res), stats, .. } => {
let ev = match res {
Ok(ok) => {
let results = ok.records
@@ -628,28 +636,28 @@ impl NetworkBehaviour for DiscoveryBehaviour {
.map(|r| (r.record.key, r.record.value))
.collect();
DiscoveryOut::ValueFound(results)
DiscoveryOut::ValueFound(results, stats.duration().unwrap_or_else(Default::default))
}
Err(e @ libp2p::kad::GetRecordError::NotFound { .. }) => {
trace!(target: "sub-libp2p",
"Libp2p => Failed to get record: {:?}", e);
DiscoveryOut::ValueNotFound(e.into_key())
DiscoveryOut::ValueNotFound(e.into_key(), stats.duration().unwrap_or_else(Default::default))
}
Err(e) => {
warn!(target: "sub-libp2p",
"Libp2p => Failed to get record: {:?}", e);
DiscoveryOut::ValueNotFound(e.into_key())
DiscoveryOut::ValueNotFound(e.into_key(), stats.duration().unwrap_or_else(Default::default))
}
};
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev));
}
KademliaEvent::QueryResult { result: QueryResult::PutRecord(res), .. } => {
KademliaEvent::QueryResult { result: QueryResult::PutRecord(res), stats, .. } => {
let ev = match res {
Ok(ok) => DiscoveryOut::ValuePut(ok.key),
Ok(ok) => DiscoveryOut::ValuePut(ok.key, stats.duration().unwrap_or_else(Default::default)),
Err(e) => {
warn!(target: "sub-libp2p",
"Libp2p => Failed to put record: {:?}", e);
DiscoveryOut::ValuePutFailed(e.into_key())
DiscoveryOut::ValuePutFailed(e.into_key(), stats.duration().unwrap_or_else(Default::default))
}
};
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev));
+26 -2
View File
@@ -31,6 +31,7 @@ use crate::{
ExHashT, NetworkStateInfo,
behaviour::{Behaviour, BehaviourOut},
config::{parse_addr, parse_str_addr, NonReservedPeerMode, Params, Role, TransportConfig},
DhtEvent,
discovery::DiscoveryConfig,
error::Error,
network_state::{
@@ -1119,6 +1120,7 @@ struct Metrics {
incoming_connections_total: Counter<U64>,
is_major_syncing: Gauge<U64>,
issued_light_requests: Counter<U64>,
kademlia_query_duration: HistogramVec,
kademlia_random_queries_total: CounterVec<U64>,
kademlia_records_count: GaugeVec<U64>,
kademlia_records_sizes_total: GaugeVec<U64>,
@@ -1196,6 +1198,17 @@ impl Metrics {
"issued_light_requests",
"Number of light client requests that our node has issued.",
)?, registry)?,
kademlia_query_duration: register(HistogramVec::new(
HistogramOpts {
common_opts: Opts::new(
"sub_libp2p_kademlia_query_duration",
"Duration of Kademlia queries per protocol and query type"
),
buckets: prometheus_endpoint::exponential_buckets(0.5, 2.0, 10)
.expect("parameters are always valid values; qed"),
},
&["type"]
)?, registry)?,
kademlia_random_queries_total: register(CounterVec::new(
Opts::new(
"sub_libp2p_kademlia_random_queries_total",
@@ -1508,8 +1521,19 @@ impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> {
messages,
});
},
Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::Dht(ev))) => {
this.event_streams.send(Event::Dht(ev));
Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::Dht(event, duration))) => {
if let Some(metrics) = this.metrics.as_ref() {
let query_type = match event {
DhtEvent::ValueFound(_) => "value-found",
DhtEvent::ValueNotFound(_) => "value-not-found",
DhtEvent::ValuePut(_) => "value-put",
DhtEvent::ValuePutFailed(_) => "value-put-failed",
};
metrics.kademlia_query_duration.with_label_values(&[query_type])
.observe(duration.as_secs_f64());
}
this.event_streams.send(Event::Dht(event));
},
Poll::Ready(SwarmEvent::ConnectionEstablished { peer_id, endpoint, num_established }) => {
trace!(target: "sub-libp2p", "Libp2p => Connected({:?})", peer_id);