diff --git a/backend/telemetry_core/src/aggregator/inner_loop.rs b/backend/telemetry_core/src/aggregator/inner_loop.rs index 0f8a1a1..d6c1f40 100644 --- a/backend/telemetry_core/src/aggregator/inner_loop.rs +++ b/backend/telemetry_core/src/aggregator/inner_loop.rs @@ -119,8 +119,10 @@ pub struct Metrics { /// How many messages are currently queued up in internal channels /// waiting to be sent out to feeds. pub total_messages_to_feeds: usize, - /// How many messages are queued waiting to be handled by this aggregator. - pub total_messages_to_aggregator: usize, + /// How many messages are currently queued waiting to be handled by this aggregator. + pub current_messages_to_aggregator: usize, + /// The total number of messages sent to the aggregator. + pub total_messages_to_aggregator: u64, /// How many (non-critical) messages have been dropped by the aggregator because it was overwhelmed. pub dropped_messages_to_aggregator: u64, /// How many nodes are currently known to this aggregator. @@ -211,13 +213,15 @@ impl InnerLoop { let max_queue_len = self.max_queue_len; let (metered_tx, metered_rx) = flume::unbounded(); - // Keep count of the number of messages we drop for the sake of metric reporting + // Keep count of the number of dropped/total messages for the sake of metric reporting let dropped_messages = Arc::new(AtomicU64::new(0)); + let total_messages = Arc::new(AtomicU64::new(0)); // Actually handle all of our messages, but before we get here, we // check the length of the queue below to decide whether or not to // pass the message on to this. let dropped_messages2 = Arc::clone(&dropped_messages); + let total_messages2 = Arc::clone(&total_messages); tokio::spawn(async move { while let Ok(msg) = metered_rx.recv_async().await { match msg { @@ -234,12 +238,15 @@ impl InnerLoop { tx, metered_rx.len(), dropped_messages2.load(Ordering::Relaxed), + total_messages2.load(Ordering::Relaxed), ), } } }); while let Ok(msg) = rx_from_external.recv_async().await { + total_messages.fetch_add(1, Ordering::Relaxed); + // ignore node updates if we have too many messages to handle, in an attempt // to reduce the queue length back to something reasonable, lest it get out of // control and start consuming a load of memory. @@ -266,8 +273,9 @@ impl InnerLoop { fn handle_gather_metrics( &mut self, rx: flume::Sender, - total_messages_to_aggregator: usize, + current_messages_to_aggregator: usize, dropped_messages_to_aggregator: u64, + total_messages_to_aggregator: u64, ) { let timestamp_unix_ms = time::now(); let connected_nodes = self.node_ids.len(); @@ -285,6 +293,7 @@ impl InnerLoop { subscribed_feeds, subscribed_finality_feeds, total_messages_to_feeds, + current_messages_to_aggregator, total_messages_to_aggregator, dropped_messages_to_aggregator, connected_nodes, diff --git a/backend/telemetry_core/src/main.rs b/backend/telemetry_core/src/main.rs index fedc0f0..3af3f6f 100644 --- a/backend/telemetry_core/src/main.rs +++ b/backend/telemetry_core/src/main.rs @@ -507,44 +507,59 @@ async fn return_prometheus_metrics(aggregator: AggregatorSet) -> Response