From 92da674d4d3f4e57846942a3bba4c38b8c98286f Mon Sep 17 00:00:00 2001 From: James Wilson Date: Wed, 11 Aug 2021 15:58:48 +0100 Subject: [PATCH] Expose metrics in a format that prometheus understands --- .../src/aggregator/aggregator.rs | 11 ++ .../src/aggregator/aggregator_set.rs | 57 ++++++++- .../src/aggregator/inner_loop.rs | 119 ++++++++---------- backend/telemetry_core/src/main.rs | 31 ++++- 4 files changed, 141 insertions(+), 77 deletions(-) diff --git a/backend/telemetry_core/src/aggregator/aggregator.rs b/backend/telemetry_core/src/aggregator/aggregator.rs index 98f4935..8ba8cc2 100644 --- a/backend/telemetry_core/src/aggregator/aggregator.rs +++ b/backend/telemetry_core/src/aggregator/aggregator.rs @@ -98,6 +98,17 @@ impl Aggregator { .await; } + /// Gather metrics from our aggregator loop + pub async fn gather_metrics(&self) -> anyhow::Result { + let (tx, rx) = flume::unbounded(); + let msg = inner_loop::ToAggregator::GatherMetrics(tx); + + self.0.tx_to_aggregator.send_async(msg).await?; + + let metrics = rx.recv_async().await?; + Ok(metrics) + } + /// Return a sink that a shard can send messages into to be handled by the aggregator. pub fn subscribe_shard( &self, diff --git a/backend/telemetry_core/src/aggregator/aggregator_set.rs b/backend/telemetry_core/src/aggregator/aggregator_set.rs index 885c119..8fb6251 100644 --- a/backend/telemetry_core/src/aggregator/aggregator_set.rs +++ b/backend/telemetry_core/src/aggregator/aggregator_set.rs @@ -2,9 +2,9 @@ use super::aggregator::{Aggregator, AggregatorOpts}; use super::inner_loop; use common::EitherSink; use futures::{Sink, SinkExt}; -use inner_loop::FromShardWebsocket; +use inner_loop::{ Metrics, FromShardWebsocket }; use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::Arc; +use std::sync::{ Arc, Mutex }; #[derive(Clone)] pub struct AggregatorSet(Arc); @@ -12,6 +12,7 @@ pub struct AggregatorSet(Arc); pub struct AggregatorSetInner { aggregators: Vec, next_idx: AtomicUsize, + metrics: Mutex> } impl AggregatorSet { @@ -27,10 +28,58 @@ impl AggregatorSet { ) .await?; - Ok(AggregatorSet(Arc::new(AggregatorSetInner { + let initial_metrics = (0..num_aggregators) + .map(|_| Metrics::default()) + .collect(); + + let this = AggregatorSet(Arc::new(AggregatorSetInner { aggregators, next_idx: AtomicUsize::new(0), - }))) + metrics: Mutex::new(initial_metrics) + })); + + // Start asking for metrics: + this.spawn_metrics_loops(); + + Ok(this) + } + + /// Spawn loops which periodically ask for metrics from each internal aggregator. + /// Depending on how busy the aggregators are, these metrics won't necessarily be in + /// sync with each other. + fn spawn_metrics_loops(&self) { + let aggregators = self.0.aggregators.clone(); + for (idx, a) in aggregators.into_iter().enumerate() { + let inner = Arc::clone(&self.0); + tokio::spawn(async move { + loop { + let now = tokio::time::Instant::now(); + let metrics = match a.gather_metrics().await { + Ok(metrics) => metrics, + // Any error here is unlikely and probably means that the aggregator + // loop has failed completely. + Err(e) => { + log::error!("Error obtaining metrics (bailing): {}", e); + return + } + }; + + // Lock, update the stored metrics and drop the lock immediately. + // We discard any error; if somethign went wrong talking to the inner loop, + // it's probably a fatal error + { inner.metrics.lock().unwrap()[idx] = metrics; } + + // Sleep *at least* 10 seconds. If it takes a while to get metrics back, we'll + // end up waiting longer between requests. + tokio::time::sleep_until(now + tokio::time::Duration::from_secs(10)).await; + } + }); + } + } + + /// Return the latest metrics we've gathered so far from each internal aggregator. + pub fn latest_metrics(&self) -> Vec { + self.0.metrics.lock().unwrap().clone() } /// Return a sink that a shard can send messages into to be handled by all aggregators. diff --git a/backend/telemetry_core/src/aggregator/inner_loop.rs b/backend/telemetry_core/src/aggregator/inner_loop.rs index 6598bfa..72e468b 100644 --- a/backend/telemetry_core/src/aggregator/inner_loop.rs +++ b/backend/telemetry_core/src/aggregator/inner_loop.rs @@ -37,10 +37,9 @@ pub enum ToAggregator { FromShardWebsocket(ConnId, FromShardWebsocket), FromFeedWebsocket(ConnId, FromFeedWebsocket), FromFindLocation(NodeId, find_location::Location), - /// This message is sent periodically and allows us to monitor - /// or cleanup things in our inner loop. The channel provided - /// is notified when the interval has been handled. - Interval(flume::Sender<()>), + /// Hand back some metrics. The provided sender is expected not to block when + /// a message it sent into it. + GatherMetrics(flume::Sender), } /// An incoming shard connection can send these messages to the aggregator. @@ -102,6 +101,30 @@ pub enum FromFeedWebsocket { Disconnected, } +/// A set of metrics returned when we ask for metrics +#[derive(Clone,Debug,Default)] +pub struct Metrics { + /// When in unix MS from epoch were these metrics obtained + pub timestamp_unix_ms: u64, + /// How many chains are feeds currently subscribed to. + pub chains_subscribed_to: usize, + /// How many feeds are currently subscribed to something. + pub subscribed_feeds: usize, + /// How many feeds have asked for finality information, too. + pub subscribed_finality_feeds: usize, + /// How many messages are currently queued up in internal channels + /// waiting to be sent out to feeds. + pub total_messages_to_feeds: usize, + /// How many messages are queued waiting to be handled by this aggregator. + pub total_messages_to_aggregator: usize, + /// How many nodes are currently known about by this aggregator. + pub connected_nodes: usize, + /// How many feeds are currently connected to this aggregator. + pub connected_feeds: usize, + /// How many shards are currently connected to this aggregator. + pub connected_shards: usize +} + // The frontend sends text based commands; parse them into these messages: impl FromStr for FromFeedWebsocket { type Err = anyhow::Error; @@ -198,45 +221,13 @@ impl InnerLoop { ToAggregator::FromFindLocation(node_id, location) => { self.handle_from_find_location(node_id, location) }, - ToAggregator::Interval(tx) => { - self.handle_interval(tx) + ToAggregator::GatherMetrics(tx) => { + self.handle_gather_metrics(tx, metered_rx.len()) } } } }); - // Periodically send interval messages for cleanup/monitoring. At most 1 - // every 60 seconds, but if the message queue is backed up it may take longer. - tokio::spawn({ - let metered_tx = metered_tx.clone(); - async move { - loop { - let now = tokio::time::Instant::now(); - let (tx, rx) = flume::unbounded(); - - let _ = metered_tx.send_async(ToAggregator::Interval(tx)).await; - let _ = rx.recv_async().await; - - tokio::time::sleep_until(now + tokio::time::Duration::from_secs(60)).await; - } - } - }); - - // TEMP: let's monitor message queue len out of interest - let tx_len = metered_tx.clone(); - std::thread::spawn(move || { - tokio::runtime::Runtime::new() - .unwrap() - .block_on(async move { - let mut n = 0; - loop { - println!("#{} Queue len: {}", n, tx_len.len()); - n += 1; - tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; - } - }); - }); - while let Ok(msg) = rx_from_external.recv_async().await { // ignore node updates if we have too many messages to handle, in an attempt // to reduce the queue length back to something reasonable, lest it get out of @@ -257,43 +248,33 @@ impl InnerLoop { } } - /// The periodic interval message gives us a chance to do some tidy up or monitoring. - fn handle_interval(&mut self, rx: flume::Sender<()>) { + /// Gather and return some metrics. + fn handle_gather_metrics(&mut self, rx: flume::Sender, total_messages_to_aggregator: usize) { - let node_ids = self.node_ids.len(); - let node_count: usize = self.node_state - .iter_chains() - .map(|c| c.node_count()) - .sum(); - - let shard_cound = self.shard_channels.len(); + let timestamp_unix_ms = time::now(); + let connected_nodes = self.node_ids.len(); + let subscribed_feeds = self.feed_conn_id_to_chain.len(); + let chains_subscribed_to = self.chain_to_feed_conn_ids.len(); + let subscribed_finality_feeds = self.feed_conn_id_finality.len(); + let connected_shards = self.shard_channels.len(); let connected_feeds = self.feed_channels.len(); - let finality_feeds = self.feed_conn_id_finality.len(); - let feed_to_chain = self.feed_conn_id_to_chain.len(); - - let num_subscribed_chains = self.chain_to_feed_conn_ids.len(); - let num_subscribed_chain_feeds: usize = self.chain_to_feed_conn_ids + let total_messages_to_feeds: usize = self.feed_channels .values() .map(|c| c.len()) .sum(); - let num_messages_to_feeds: usize = self.feed_channels - .values() - .map(|c| c.len()) - .sum(); - - println!("Periodic update at {:?}:", std::time::SystemTime::now()); - dbg!(node_ids); - dbg!(node_count); - dbg!(shard_cound); - dbg!(connected_feeds); - dbg!(finality_feeds); - dbg!(feed_to_chain); - dbg!(num_subscribed_chains); - dbg!(num_subscribed_chain_feeds); - dbg!(num_messages_to_feeds); - - drop(rx); + // Ignore error sending; assume the receiver stopped caring and dropped the channel: + let _ = rx.send(Metrics { + timestamp_unix_ms, + chains_subscribed_to, + subscribed_feeds, + subscribed_finality_feeds, + total_messages_to_feeds, + total_messages_to_aggregator, + connected_nodes, + connected_feeds, + connected_shards + }); } /// Handle messages that come from the node geographical locator. diff --git a/backend/telemetry_core/src/main.rs b/backend/telemetry_core/src/main.rs index c55ade2..1dc039d 100644 --- a/backend/telemetry_core/src/main.rs +++ b/backend/telemetry_core/src/main.rs @@ -178,7 +178,11 @@ async fn start_server(num_aggregators: usize, opts: Opts) -> anyhow::Result<()> let _ = ws_send.close().await; }, )) - } + }, + // Return metrics in a prometheus-friendly text based format: + (&Method::GET, "/metrics") => { + Ok(return_prometheus_metrics(aggregator).await) + }, // 404 for anything else: _ => Ok(Response::builder() .status(404) @@ -427,9 +431,7 @@ where Some(msgs) => msgs, None => break, }; -if _feed_id == 1 { - println!("FEED 1 message len: {}", msgs.len()); -} + // There is only one message type at the mo; bytes to send // to the websocket. collect them all up to dispatch in one shot. let all_msg_bytes = msgs.into_iter().map(|msg| match msg { @@ -481,3 +483,24 @@ if _feed_id == 1 { // loop ended; give socket back to parent: (tx_to_aggregator, ws_send) } + +async fn return_prometheus_metrics(aggregator: AggregatorSet) -> Response { + let metrics = aggregator.latest_metrics(); + + let mut s = String::new(); + for (idx, m) in metrics.iter().enumerate() { + s.push_str(&format!("telemetry_connected_feeds{{aggregator=\"{}\"}} {} {}\n", idx, m.connected_feeds, m.timestamp_unix_ms)); + s.push_str(&format!("telemetry_connected_nodes{{aggregator=\"{}\"}} {} {}\n", idx, m.connected_nodes, m.timestamp_unix_ms)); + s.push_str(&format!("telemetry_connected_shards{{aggregator=\"{}\"}} {} {}\n", idx, m.connected_shards, m.timestamp_unix_ms)); + s.push_str(&format!("telemetry_chains_subscribed_to{{aggregator=\"{}\"}} {} {}\n", idx, m.chains_subscribed_to, m.timestamp_unix_ms)); + s.push_str(&format!("telemetry_subscribed_feeds{{aggregator=\"{}\"}} {} {}\n", idx, m.subscribed_feeds, m.timestamp_unix_ms)); + s.push_str(&format!("telemetry_subscribed_finality_feeds{{aggregator=\"{}\"}} {} {}\n", idx, m.subscribed_finality_feeds, m.timestamp_unix_ms)); + s.push_str(&format!("telemetry_total_messages_to_feeds{{aggregator=\"{}\"}} {} {}\n", idx, m.total_messages_to_feeds, m.timestamp_unix_ms)); + s.push_str(&format!("telemetry_total_messages_to_aggregator{{aggregator=\"{}\"}} {} {}\n\n", idx, m.total_messages_to_aggregator, m.timestamp_unix_ms)); + } + + Response::builder() + .header(http::header::CONTENT_TYPE, "text/plain; version=0.0.4") + .body(s.into()) + .unwrap() +} \ No newline at end of file