Separate metrics for messages sent & received (#2721)

* metered channel - sent & received

* Add for readouts

* metrics for both sent & received

* retract on send failure
This commit is contained in:
Robert Habermeier
2021-03-26 14:11:01 +01:00
committed by GitHub
parent 064df81ee4
commit 73b9247c10
6 changed files with 140 additions and 56 deletions
+47 -17
View File
@@ -1327,8 +1327,10 @@ struct MetricsInner {
deactivated_heads_total: prometheus::Counter<prometheus::U64>,
messages_relayed_total: prometheus::Counter<prometheus::U64>,
message_relay_timings: prometheus::Histogram,
to_overseer_channel_queue_size: prometheus::Gauge<prometheus::U64>,
from_overseer_channel_queue_size: prometheus::GaugeVec<prometheus::U64>,
to_overseer_sent: prometheus::Gauge<prometheus::U64>,
to_overseer_received: prometheus::Gauge<prometheus::U64>,
from_overseer_sent: prometheus::GaugeVec<prometheus::U64>,
from_overseer_received: prometheus::GaugeVec<prometheus::U64>,
}
#[derive(Default, Clone)]
@@ -1360,15 +1362,21 @@ impl Metrics {
fn channel_fill_level_snapshot(
&self,
from_overseer: AllSubsystemsSame<(&'static str, usize)>,
to_overseer: usize,
from_overseer: AllSubsystemsSame<(&'static str, metered::Readout)>,
to_overseer: metered::Readout,
) {
self.0.as_ref().map(|metrics| {
from_overseer.map_subsystems(|(name, queue_size): (_, usize)| {
metrics.from_overseer_channel_queue_size.with_label_values(&[name]).set(queue_size as u64);
})
from_overseer.map_subsystems(|(name, readout): (_, metered::Readout)| {
metrics.from_overseer_sent.with_label_values(&[name])
.set(readout.sent as u64);
metrics.from_overseer_received.with_label_values(&[name])
.set(readout.received as u64);
});
metrics.to_overseer_sent.set(to_overseer.sent as u64);
metrics.to_overseer_received.set(to_overseer.received as u64);
});
self.0.as_ref().map(|metrics| metrics.to_overseer_channel_queue_size.set(to_overseer as u64));
}
}
@@ -1418,11 +1426,11 @@ impl metrics::Metrics for Metrics {
)?,
registry,
)?,
from_overseer_channel_queue_size: prometheus::register(
from_overseer_sent: prometheus::register(
prometheus::GaugeVec::<prometheus::U64>::new(
prometheus::Opts::new(
"parachain_from_overseer_channel_queue_size",
"Number of elements sitting in the channel from the overseer waiting to be processed.",
"parachain_from_overseer_sent",
"Number of elements sent by the overseer to subsystems",
),
&[
"subsystem_name",
@@ -1430,11 +1438,32 @@ impl metrics::Metrics for Metrics {
)?,
registry,
)?,
to_overseer_channel_queue_size: prometheus::register(
from_overseer_received: prometheus::register(
prometheus::GaugeVec::<prometheus::U64>::new(
prometheus::Opts::new(
"parachain_from_overseer_received",
"Number of elements received by subsystems from overseer",
),
&[
"subsystem_name",
],
)?,
registry,
)?,
to_overseer_sent: prometheus::register(
prometheus::Gauge::<prometheus::U64>::with_opts(
prometheus::Opts::new(
"parachain_to_overseer_channel_queue_size",
"Number of elements sitting in the channel to the overseer waiting to be processed.",
"parachain_to_overseer_sent",
"Number of elements sent by subsystems to overseer",
),
)?,
registry,
)?,
to_overseer_received: prometheus::register(
prometheus::Gauge::<prometheus::U64>::with_opts(
prometheus::Opts::new(
"parachain_to_overseer_received",
"Number of element received by overseer from subsystems",
),
)?,
registry,
@@ -1820,14 +1849,14 @@ where
let metronome = Metronome::new(std::time::Duration::from_millis(950))
.for_each(move |_| {
let to_subsystem_counts = subsystem_meters.as_ref()
.map_subsystems(|&(name, ref meter): &(_, metered::Meter)| (name, meter.queue_count()));
.map_subsystems(|&(name, ref meter): &(_, metered::Meter)| (name, meter.read()));
// We combine the amount of messages from subsystems to the overseer
// as well as the amount of messages from external sources to the overseer
// into one to_overseer value.
metronome_metrics.channel_fill_level_snapshot(
to_subsystem_counts,
meter_subsystem_to_overseer.queue_count() + meter_external_to_overseer.queue_count(),
meter_subsystem_to_overseer.read() + meter_external_to_overseer.read(),
);
async move {
@@ -2467,7 +2496,8 @@ mod tests {
assert_eq!(gather[1].get_name(), "parachain_deactivated_heads_total");
assert_eq!(gather[2].get_name(), "parachain_messages_relayed_total");
assert_eq!(gather[3].get_name(), "parachain_overseer_messages_relay_timings");
assert_eq!(gather[4].get_name(), "parachain_to_overseer_channel_queue_size");
assert_eq!(gather[4].get_name(), "parachain_to_overseer_received");
assert_eq!(gather[5].get_name(), "parachain_to_overseer_sent");
let activated = gather[0].get_metric()[0].get_counter().get_value() as u64;
let deactivated = gather[1].get_metric()[0].get_counter().get_value() as u64;
let relayed = gather[2].get_metric()[0].get_counter().get_value() as u64;