diff --git a/backend/common/src/dense_map.rs b/backend/common/src/dense_map.rs index d529eb1..24bc60d 100644 --- a/backend/common/src/dense_map.rs +++ b/backend/common/src/dense_map.rs @@ -138,7 +138,7 @@ mod test { use super::*; #[test] - fn len_doesnt_panic_if_lots_of_retired() { + fn len_doesnt_panic_if_lots_of_ids_are_retired() { let mut map = DenseMap::::new(); let id1 = map.add(1); diff --git a/backend/telemetry_core/src/aggregator/aggregator.rs b/backend/telemetry_core/src/aggregator/aggregator.rs index d6604a8..c172680 100644 --- a/backend/telemetry_core/src/aggregator/aggregator.rs +++ b/backend/telemetry_core/src/aggregator/aggregator.rs @@ -86,7 +86,7 @@ impl Aggregator { } /// This is spawned into a separate task and handles any messages coming - /// in to the aggregator. If nobody is tolding the tx side of the channel + /// in to the aggregator. If nobody is holding the tx side of the channel /// any more, this task will gracefully end. async fn handle_messages( rx_from_external: flume::Receiver, diff --git a/backend/telemetry_core/src/aggregator/inner_loop.rs b/backend/telemetry_core/src/aggregator/inner_loop.rs index 20ed612..35cc332 100644 --- a/backend/telemetry_core/src/aggregator/inner_loop.rs +++ b/backend/telemetry_core/src/aggregator/inner_loop.rs @@ -30,6 +30,7 @@ use std::{ net::{IpAddr, Ipv4Addr}, str::FromStr, }; +use std::sync::{ Arc, atomic::{ Ordering, AtomicU64 } }; /// Incoming messages come via subscriptions, and end up looking like this. #[derive(Clone, Debug)] @@ -38,7 +39,7 @@ pub enum ToAggregator { FromFeedWebsocket(ConnId, FromFeedWebsocket), FromFindLocation(NodeId, find_location::Location), /// Hand back some metrics. The provided sender is expected not to block when - /// a message it sent into it. + /// a message is sent into it. GatherMetrics(flume::Sender), } @@ -117,7 +118,9 @@ pub struct Metrics { pub total_messages_to_feeds: usize, /// How many messages are queued waiting to be handled by this aggregator. pub total_messages_to_aggregator: usize, - /// How many nodes are currently known about by this aggregator. + /// How many (non-critical) messages have been dropped by the aggregator because it was overwhelmed. + pub dropped_messages_to_aggregator: u64, + /// How many nodes are currently known to this aggregator. pub connected_nodes: usize, /// How many feeds are currently connected to this aggregator. pub connected_feeds: usize, @@ -206,9 +209,13 @@ impl InnerLoop { let max_queue_len = self.max_queue_len; let (metered_tx, metered_rx) = flume::unbounded(); + // Keep count of the number of messages we drop for the sake of metric reporting + let dropped_messages = Arc::new(AtomicU64::new(0)); + // Actually handle all of our messages, but before we get here, we // check the length of the queue below to decide whether or not to // pass the message on to this. + let dropped_messages2 = Arc::clone(&dropped_messages); tokio::spawn(async move { while let Ok(msg) = metered_rx.recv_async().await { match msg { @@ -222,7 +229,7 @@ impl InnerLoop { self.handle_from_find_location(node_id, location) } ToAggregator::GatherMetrics(tx) => { - self.handle_gather_metrics(tx, metered_rx.len()) + self.handle_gather_metrics(tx, metered_rx.len(), dropped_messages2.load(Ordering::Relaxed)) } } } @@ -237,6 +244,9 @@ impl InnerLoop { msg, ToAggregator::FromShardWebsocket(.., FromShardWebsocket::Update { .. }) ) { + // Note: this wraps on overflow (which is probably the best + // behaviour for graphing it anyway) + dropped_messages.fetch_add(1, Ordering::Relaxed); continue; } } @@ -253,6 +263,7 @@ impl InnerLoop { &mut self, rx: flume::Sender, total_messages_to_aggregator: usize, + dropped_messages_to_aggregator: u64 ) { let timestamp_unix_ms = time::now(); let connected_nodes = self.node_ids.len(); @@ -271,6 +282,7 @@ impl InnerLoop { subscribed_finality_feeds, total_messages_to_feeds, total_messages_to_aggregator, + dropped_messages_to_aggregator, connected_nodes, connected_feeds, connected_shards, diff --git a/backend/telemetry_core/src/main.rs b/backend/telemetry_core/src/main.rs index 9be898f..1508daa 100644 --- a/backend/telemetry_core/src/main.rs +++ b/backend/telemetry_core/src/main.rs @@ -486,8 +486,8 @@ async fn return_prometheus_metrics(aggregator: AggregatorSet) -> Response Response