Add metrics for the events in the network output channels (#5597)

* Add metrics for the events in the network output channels

* Documentation fixes

* A couple fixes

* Fix panic at destruction

* Rework for direct Prometheus integration

* Don't lock on the Receiver

* Another review address

* Address review

* Update client/network/src/service/out_events.rs

Co-Authored-By: Max Inden <mail@max-inden.de>

* Fix bad event name

* Fix descriptions

* Fix names

* client/network/service/out_events: Apply remaining suggestions

Co-authored-by: Max Inden <mail@max-inden.de>
This commit is contained in:
Pierre Krieger
2020-04-09 20:30:34 +02:00
committed by GitHub
parent effc9bae92
commit 877a7ab531
2 changed files with 275 additions and 11 deletions
+6 -11
View File
@@ -68,6 +68,7 @@ use std::{
task::Poll,
};
mod out_events;
#[cfg(test)]
mod tests;
@@ -386,7 +387,7 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkWorker<B, H> {
import_queue: params.import_queue,
from_worker,
light_client_rqs: params.on_demand.and_then(|od| od.extract_receiver()),
event_streams: Vec::new(),
event_streams: out_events::OutChannels::new(params.metrics_registry.as_ref())?,
metrics,
boot_node_ids,
})
@@ -576,7 +577,7 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkService<B, H> {
/// The stream never ends (unless the `NetworkWorker` gets shut down).
pub fn event_stream(&self) -> impl Stream<Item = Event> {
// Note: when transitioning to stable futures, remove the `Error` entirely
let (tx, rx) = tracing_unbounded("mpsc_network_event_stream");
let (tx, rx) = out_events::channel();
let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::EventStream(tx));
rx
}
@@ -796,7 +797,7 @@ enum ServiceToWorkerMsg<B: BlockT, H: ExHashT> {
PutValue(record::Key, Vec<u8>),
AddKnownAddress(PeerId, Multiaddr),
SyncFork(Vec<PeerId>, B::Hash, NumberFor<B>),
EventStream(TracingUnboundedSender<Event>),
EventStream(out_events::Sender),
WriteNotification {
message: Vec<u8>,
engine_id: ConsensusEngineId,
@@ -831,7 +832,7 @@ pub struct NetworkWorker<B: BlockT + 'static, H: ExHashT> {
/// Receiver for queries from the light client that must be processed.
light_client_rqs: Option<TracingUnboundedReceiver<light_client_handler::Request<B>>>,
/// Senders for events that happen on the network.
event_streams: Vec<TracingUnboundedSender<Event>>,
event_streams: out_events::OutChannels,
/// Prometheus network metrics.
metrics: Option<Metrics>,
/// The `PeerId`'s of all boot nodes.
@@ -855,7 +856,6 @@ struct Metrics {
network_per_sec_bytes: GaugeVec<U64>,
notifications_queues_size: HistogramVec,
notifications_sizes: HistogramVec,
num_event_stream_channels: Gauge<U64>,
opened_notification_streams: GaugeVec<U64>,
peers_count: Gauge<U64>,
peerset_num_discovered: Gauge<U64>,
@@ -948,10 +948,6 @@ impl Metrics {
},
&["direction", "protocol"]
)?, registry)?,
num_event_stream_channels: register(Gauge::new(
"sub_libp2p_num_event_stream_channels",
"Number of internal active channels that broadcast network events",
)?, registry)?,
opened_notification_streams: register(GaugeVec::new(
Opts::new(
"sub_libp2p_opened_notification_streams",
@@ -1105,10 +1101,10 @@ impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> {
}
},
Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::Event(ev))) => {
this.event_streams.retain(|sender| sender.unbounded_send(ev.clone()).is_ok());
if let Some(metrics) = this.metrics.as_ref() {
metrics.update_with_network_event(&ev);
}
this.event_streams.send(ev);
},
Poll::Ready(SwarmEvent::ConnectionEstablished { peer_id, endpoint, .. }) => {
trace!(target: "sub-libp2p", "Libp2p => Connected({:?})", peer_id);
@@ -1249,7 +1245,6 @@ impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> {
metrics.network_per_sec_bytes.with_label_values(&["out"]).set(this.service.bandwidth.average_upload_per_sec());
metrics.is_major_syncing.set(is_major_syncing as u64);
metrics.kbuckets_num_nodes.set(this.network_service.num_kbuckets_entries() as u64);
metrics.num_event_stream_channels.set(this.event_streams.len() as u64);
metrics.peers_count.set(num_connected_peers as u64);
metrics.peerset_num_discovered.set(this.network_service.user_protocol().num_discovered_peers() as u64);
metrics.peerset_num_requested.set(this.network_service.user_protocol().requested_peers().count() as u64);