expose dropped message counts and fix some typos/wording

This commit is contained in:
James Wilson
2021-08-13 11:33:53 +01:00
parent 811babca27
commit b842c7fc8b
4 changed files with 23 additions and 7 deletions
+1 -1
View File
@@ -138,7 +138,7 @@ mod test {
use super::*;
#[test]
fn len_doesnt_panic_if_lots_of_retired() {
fn len_doesnt_panic_if_lots_of_ids_are_retired() {
let mut map = DenseMap::<usize, usize>::new();
let id1 = map.add(1);
@@ -86,7 +86,7 @@ impl Aggregator {
}
/// This is spawned into a separate task and handles any messages coming
/// in to the aggregator. If nobody is tolding the tx side of the channel
/// in to the aggregator. If nobody is holding the tx side of the channel
/// any more, this task will gracefully end.
async fn handle_messages(
rx_from_external: flume::Receiver<inner_loop::ToAggregator>,
@@ -30,6 +30,7 @@ use std::{
net::{IpAddr, Ipv4Addr},
str::FromStr,
};
use std::sync::{ Arc, atomic::{ Ordering, AtomicU64 } };
/// Incoming messages come via subscriptions, and end up looking like this.
#[derive(Clone, Debug)]
@@ -38,7 +39,7 @@ pub enum ToAggregator {
FromFeedWebsocket(ConnId, FromFeedWebsocket),
FromFindLocation(NodeId, find_location::Location),
/// Hand back some metrics. The provided sender is expected not to block when
/// a message it sent into it.
/// a message is sent into it.
GatherMetrics(flume::Sender<Metrics>),
}
@@ -117,7 +118,9 @@ pub struct Metrics {
pub total_messages_to_feeds: usize,
/// How many messages are queued waiting to be handled by this aggregator.
pub total_messages_to_aggregator: usize,
/// How many nodes are currently known about by this aggregator.
/// How many (non-critical) messages have been dropped by the aggregator because it was overwhelmed.
pub dropped_messages_to_aggregator: u64,
/// How many nodes are currently known to this aggregator.
pub connected_nodes: usize,
/// How many feeds are currently connected to this aggregator.
pub connected_feeds: usize,
@@ -206,9 +209,13 @@ impl InnerLoop {
let max_queue_len = self.max_queue_len;
let (metered_tx, metered_rx) = flume::unbounded();
// Keep count of the number of messages we drop for the sake of metric reporting
let dropped_messages = Arc::new(AtomicU64::new(0));
// Actually handle all of our messages, but before we get here, we
// check the length of the queue below to decide whether or not to
// pass the message on to this.
let dropped_messages2 = Arc::clone(&dropped_messages);
tokio::spawn(async move {
while let Ok(msg) = metered_rx.recv_async().await {
match msg {
@@ -222,7 +229,7 @@ impl InnerLoop {
self.handle_from_find_location(node_id, location)
}
ToAggregator::GatherMetrics(tx) => {
self.handle_gather_metrics(tx, metered_rx.len())
self.handle_gather_metrics(tx, metered_rx.len(), dropped_messages2.load(Ordering::Relaxed))
}
}
}
@@ -237,6 +244,9 @@ impl InnerLoop {
msg,
ToAggregator::FromShardWebsocket(.., FromShardWebsocket::Update { .. })
) {
// Note: this wraps on overflow (which is probably the best
// behaviour for graphing it anyway)
dropped_messages.fetch_add(1, Ordering::Relaxed);
continue;
}
}
@@ -253,6 +263,7 @@ impl InnerLoop {
&mut self,
rx: flume::Sender<Metrics>,
total_messages_to_aggregator: usize,
dropped_messages_to_aggregator: u64
) {
let timestamp_unix_ms = time::now();
let connected_nodes = self.node_ids.len();
@@ -271,6 +282,7 @@ impl InnerLoop {
subscribed_finality_feeds,
total_messages_to_feeds,
total_messages_to_aggregator,
dropped_messages_to_aggregator,
connected_nodes,
connected_feeds,
connected_shards,
+6 -2
View File
@@ -486,8 +486,8 @@ async fn return_prometheus_metrics(aggregator: AggregatorSet) -> Response<hyper:
let metrics = aggregator.latest_metrics();
// Instead of using the rust prometheus library (which is optimised around global variables updated across a codebase),
// we just split out the text format that prometheus expects ourselves, using whatever the latest metrics that we've
// captured so far from the aggregators are. See:
// we just split out the text format that prometheus expects ourselves, and use the latest metrics that we've
// captured so far from the aggregators. See:
//
// https://github.com/prometheus/docs/blob/master/content/docs/instrumenting/exposition_formats.md#text-format-details
//
@@ -529,6 +529,10 @@ async fn return_prometheus_metrics(aggregator: AggregatorSet) -> Response<hyper:
"telemetry_total_messages_to_aggregator{{aggregator=\"{}\"}} {} {}\n\n",
idx, m.total_messages_to_aggregator, m.timestamp_unix_ms
));
s.push_str(&format!(
"telemetry_dropped_messages_to_aggregator{{aggregator=\"{}\"}} {} {}\n\n",
idx, m.dropped_messages_to_aggregator, m.timestamp_unix_ms
));
}
Response::builder()