Give names to channels (#5626)

* Give names to channels

* Fix

* A couple more changes

* More minor tweaks

* Fix test
This commit is contained in:
Pierre Krieger
2020-04-14 14:49:41 +02:00
committed by GitHub
parent 5afa74254e
commit 1e1b0e2767
5 changed files with 34 additions and 24 deletions
+6 -2
View File
@@ -575,9 +575,13 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkService<B, H> {
/// If this method is called multiple times, the events are duplicated.
///
/// The stream never ends (unless the `NetworkWorker` gets shut down).
pub fn event_stream(&self) -> impl Stream<Item = Event> {
///
/// The name passed is used to identify the channel in the Prometheus metrics. Note that the
/// parameter is a `&'static str`, and not a `String`, in order to avoid accidentally having
/// an unbounded set of Prometheus metrics, which would be quite bad in terms of memory
pub fn event_stream(&self, name: &'static str) -> impl Stream<Item = Event> {
// Note: when transitioning to stable futures, remove the `Error` entirely
let (tx, rx) = out_events::channel();
let (tx, rx) = out_events::channel(name);
let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::EventStream(tx));
rx
}
@@ -43,11 +43,13 @@ use std::{
};
/// Creates a new channel that can be associated to a [`OutChannels`].
pub fn channel() -> (Sender, Receiver) {
///
/// The name is used in Prometheus reports.
pub fn channel(name: &'static str) -> (Sender, Receiver) {
let (tx, rx) = mpsc::unbounded();
let metrics = Arc::new(Mutex::new(None));
let tx = Sender { inner: tx, metrics: metrics.clone() };
let rx = Receiver { inner: rx, metrics };
let tx = Sender { inner: tx, name, metrics: metrics.clone() };
let rx = Receiver { inner: rx, name, metrics };
(tx, rx)
}
@@ -60,6 +62,7 @@ pub fn channel() -> (Sender, Receiver) {
/// sync on drop. If someone adds a `#[derive(Clone)]` below, it is **wrong**.
pub struct Sender {
inner: mpsc::UnboundedSender<Event>,
name: &'static str,
/// Clone of [`Receiver::metrics`].
metrics: Arc<Mutex<Option<Arc<Option<Metrics>>>>>,
}
@@ -82,6 +85,7 @@ impl Drop for Sender {
/// Receiving side of a channel.
pub struct Receiver {
inner: mpsc::UnboundedReceiver<Event>,
name: &'static str,
/// Initially contains `None`, and will be set to a value once the corresponding [`Sender`]
/// is assigned to an instance of [`OutChannels`].
metrics: Arc<Mutex<Option<Arc<Option<Metrics>>>>>,
@@ -94,7 +98,7 @@ impl Stream for Receiver {
if let Some(ev) = ready!(Pin::new(&mut self.inner).poll_next(cx)) {
let metrics = self.metrics.lock().clone();
if let Some(Some(metrics)) = metrics.as_ref().map(|m| &**m) {
metrics.event_out(&ev);
metrics.event_out(&ev, self.name);
} else {
log::warn!("Inconsistency in out_events: event happened before sender associated");
}
@@ -161,7 +165,9 @@ impl OutChannels {
});
if let Some(metrics) = &*self.metrics {
metrics.event_in(&event, self.event_streams.len() as u64);
for ev in &self.event_streams {
metrics.event_in(&event, 1, ev.name);
}
}
}
}
@@ -190,7 +196,7 @@ impl Metrics {
"Number of broadcast network events that have been sent or received across all \
channels"
),
&["event_name", "action"]
&["event_name", "action", "name"]
)?, registry)?,
notifications_sizes: register(CounterVec::new(
Opts::new(
@@ -198,7 +204,7 @@ impl Metrics {
"Size of notification events that have been sent or received across all \
channels"
),
&["protocol", "action"]
&["protocol", "action", "name"]
)?, registry)?,
num_channels: register(Gauge::new(
"sub_libp2p_out_events_num_channels",
@@ -207,60 +213,60 @@ impl Metrics {
})
}
fn event_in(&self, event: &Event, num: u64) {
fn event_in(&self, event: &Event, num: u64, name: &str) {
match event {
Event::Dht(_) => {
self.events_total
.with_label_values(&["dht", "sent"])
.with_label_values(&["dht", "sent", name])
.inc_by(num);
}
Event::NotificationStreamOpened { engine_id, .. } => {
self.events_total
.with_label_values(&[&format!("notif-open-{:?}", engine_id), "sent"])
.with_label_values(&[&format!("notif-open-{:?}", engine_id), "sent", name])
.inc_by(num);
},
Event::NotificationStreamClosed { engine_id, .. } => {
self.events_total
.with_label_values(&[&format!("notif-closed-{:?}", engine_id), "sent"])
.with_label_values(&[&format!("notif-closed-{:?}", engine_id), "sent", name])
.inc_by(num);
},
Event::NotificationsReceived { messages, .. } => {
for (engine_id, message) in messages {
self.events_total
.with_label_values(&[&format!("notif-{:?}", engine_id), "sent"])
.with_label_values(&[&format!("notif-{:?}", engine_id), "sent", name])
.inc_by(num);
self.notifications_sizes
.with_label_values(&[&engine_id_to_string(engine_id), "sent"])
.with_label_values(&[&engine_id_to_string(engine_id), "sent", name])
.inc_by(num.saturating_mul(u64::try_from(message.len()).unwrap_or(u64::max_value())));
}
},
}
}
fn event_out(&self, event: &Event) {
fn event_out(&self, event: &Event, name: &str) {
match event {
Event::Dht(_) => {
self.events_total
.with_label_values(&["dht", "received"])
.with_label_values(&["dht", "received", name])
.inc();
}
Event::NotificationStreamOpened { engine_id, .. } => {
self.events_total
.with_label_values(&[&format!("notif-open-{:?}", engine_id), "received"])
.with_label_values(&[&format!("notif-open-{:?}", engine_id), "received", name])
.inc();
},
Event::NotificationStreamClosed { engine_id, .. } => {
self.events_total
.with_label_values(&[&format!("notif-closed-{:?}", engine_id), "received"])
.with_label_values(&[&format!("notif-closed-{:?}", engine_id), "received", name])
.inc();
},
Event::NotificationsReceived { messages, .. } => {
for (engine_id, message) in messages {
self.events_total
.with_label_values(&[&format!("notif-{:?}", engine_id), "received"])
.with_label_values(&[&format!("notif-{:?}", engine_id), "received", name])
.inc();
self.notifications_sizes
.with_label_values(&[&engine_id_to_string(engine_id), "received"])
.with_label_values(&[&engine_id_to_string(engine_id), "received", name])
.inc_by(u64::try_from(message.len()).unwrap_or(u64::max_value()));
}
},
@@ -106,7 +106,7 @@ fn build_test_full_node(config: config::NetworkConfiguration)
.unwrap();
let service = worker.service().clone();
let event_stream = service.event_stream();
let event_stream = service.event_stream("test");
async_std::task::spawn(async move {
futures::pin_mut!(worker);