Expose total messages sent to aggregator, too (#416)

* Expose total messages sent to aggregator so we can make better graphs with total dropped msgs

* cargo fmt

* use write to hopefully avoid some allocating

* add 'core' namespace to telemetry metrics for better future clarity

* cargo fmt
This commit is contained in:
James Wilson
2021-09-30 12:21:37 +01:00
committed by GitHub
parent 224b1faeba
commit 7ac88a7e84
2 changed files with 55 additions and 31 deletions
@@ -119,8 +119,10 @@ pub struct Metrics {
/// How many messages are currently queued up in internal channels
/// waiting to be sent out to feeds.
pub total_messages_to_feeds: usize,
/// How many messages are queued waiting to be handled by this aggregator.
pub total_messages_to_aggregator: usize,
/// How many messages are currently queued waiting to be handled by this aggregator.
pub current_messages_to_aggregator: usize,
/// The total number of messages sent to the aggregator.
pub total_messages_to_aggregator: u64,
/// How many (non-critical) messages have been dropped by the aggregator because it was overwhelmed.
pub dropped_messages_to_aggregator: u64,
/// How many nodes are currently known to this aggregator.
@@ -211,13 +213,15 @@ impl InnerLoop {
let max_queue_len = self.max_queue_len;
let (metered_tx, metered_rx) = flume::unbounded();
// Keep count of the number of messages we drop for the sake of metric reporting
// Keep count of the number of dropped/total messages for the sake of metric reporting
let dropped_messages = Arc::new(AtomicU64::new(0));
let total_messages = Arc::new(AtomicU64::new(0));
// Actually handle all of our messages, but before we get here, we
// check the length of the queue below to decide whether or not to
// pass the message on to this.
let dropped_messages2 = Arc::clone(&dropped_messages);
let total_messages2 = Arc::clone(&total_messages);
tokio::spawn(async move {
while let Ok(msg) = metered_rx.recv_async().await {
match msg {
@@ -234,12 +238,15 @@ impl InnerLoop {
tx,
metered_rx.len(),
dropped_messages2.load(Ordering::Relaxed),
total_messages2.load(Ordering::Relaxed),
),
}
}
});
while let Ok(msg) = rx_from_external.recv_async().await {
total_messages.fetch_add(1, Ordering::Relaxed);
// ignore node updates if we have too many messages to handle, in an attempt
// to reduce the queue length back to something reasonable, lest it get out of
// control and start consuming a load of memory.
@@ -266,8 +273,9 @@ impl InnerLoop {
fn handle_gather_metrics(
&mut self,
rx: flume::Sender<Metrics>,
total_messages_to_aggregator: usize,
current_messages_to_aggregator: usize,
dropped_messages_to_aggregator: u64,
total_messages_to_aggregator: u64,
) {
let timestamp_unix_ms = time::now();
let connected_nodes = self.node_ids.len();
@@ -285,6 +293,7 @@ impl InnerLoop {
subscribed_feeds,
subscribed_finality_feeds,
total_messages_to_feeds,
current_messages_to_aggregator,
total_messages_to_aggregator,
dropped_messages_to_aggregator,
connected_nodes,
+42 -27
View File
@@ -507,44 +507,59 @@ async fn return_prometheus_metrics(aggregator: AggregatorSet) -> Response<hyper:
// be handled correctly when pointing a current version of prometheus at it.
//
// Note: '{{' and '}}' are just escaped versions of '{' and '}' in Rust fmt strings.
use std::fmt::Write;
let mut s = String::new();
for (idx, m) in metrics.iter().enumerate() {
s.push_str(&format!(
"telemetry_connected_feeds{{aggregator=\"{}\"}} {} {}\n",
let _ = write!(
&mut s,
"telemetry_core_connected_feeds{{aggregator=\"{}\"}} {} {}\n",
idx, m.connected_feeds, m.timestamp_unix_ms
));
s.push_str(&format!(
"telemetry_connected_nodes{{aggregator=\"{}\"}} {} {}\n",
);
let _ = write!(
&mut s,
"telemetry_core_connected_nodes{{aggregator=\"{}\"}} {} {}\n",
idx, m.connected_nodes, m.timestamp_unix_ms
));
s.push_str(&format!(
"telemetry_connected_shards{{aggregator=\"{}\"}} {} {}\n",
);
let _ = write!(
&mut s,
"telemetry_core_connected_shards{{aggregator=\"{}\"}} {} {}\n",
idx, m.connected_shards, m.timestamp_unix_ms
));
s.push_str(&format!(
"telemetry_chains_subscribed_to{{aggregator=\"{}\"}} {} {}\n",
);
let _ = write!(
&mut s,
"telemetry_core_chains_subscribed_to{{aggregator=\"{}\"}} {} {}\n",
idx, m.chains_subscribed_to, m.timestamp_unix_ms
));
s.push_str(&format!(
"telemetry_subscribed_feeds{{aggregator=\"{}\"}} {} {}\n",
);
let _ = write!(
&mut s,
"telemetry_core_subscribed_feeds{{aggregator=\"{}\"}} {} {}\n",
idx, m.subscribed_feeds, m.timestamp_unix_ms
));
s.push_str(&format!(
"telemetry_subscribed_finality_feeds{{aggregator=\"{}\"}} {} {}\n",
);
let _ = write!(
&mut s,
"telemetry_core_subscribed_finality_feeds{{aggregator=\"{}\"}} {} {}\n",
idx, m.subscribed_finality_feeds, m.timestamp_unix_ms
));
s.push_str(&format!(
"telemetry_total_messages_to_feeds{{aggregator=\"{}\"}} {} {}\n",
);
let _ = write!(
&mut s,
"telemetry_core_total_messages_to_feeds{{aggregator=\"{}\"}} {} {}\n",
idx, m.total_messages_to_feeds, m.timestamp_unix_ms
));
s.push_str(&format!(
"telemetry_total_messages_to_aggregator{{aggregator=\"{}\"}} {} {}\n\n",
);
let _ = write!(
&mut s,
"telemetry_core_current_messages_to_aggregator{{aggregator=\"{}\"}} {} {}\n\n",
idx, m.current_messages_to_aggregator, m.timestamp_unix_ms
);
let _ = write!(
&mut s,
"telemetry_core_total_messages_to_aggregator{{aggregator=\"{}\"}} {} {}\n\n",
idx, m.total_messages_to_aggregator, m.timestamp_unix_ms
));
s.push_str(&format!(
"telemetry_dropped_messages_to_aggregator{{aggregator=\"{}\"}} {} {}\n\n",
);
let _ = write!(
&mut s,
"telemetry_core_dropped_messages_to_aggregator{{aggregator=\"{}\"}} {} {}\n\n",
idx, m.dropped_messages_to_aggregator, m.timestamp_unix_ms
));
);
}
Response::builder()