Expose metrics in a format that prometheus understands

This commit is contained in:
James Wilson
2021-08-11 15:58:48 +01:00
parent 4f7b2c8ec5
commit 92da674d4d
4 changed files with 141 additions and 77 deletions
@@ -98,6 +98,17 @@ impl Aggregator {
.await;
}
/// Gather metrics from our aggregator loop
pub async fn gather_metrics(&self) -> anyhow::Result<inner_loop::Metrics> {
let (tx, rx) = flume::unbounded();
let msg = inner_loop::ToAggregator::GatherMetrics(tx);
self.0.tx_to_aggregator.send_async(msg).await?;
let metrics = rx.recv_async().await?;
Ok(metrics)
}
/// Return a sink that a shard can send messages into to be handled by the aggregator.
pub fn subscribe_shard(
&self,
@@ -2,9 +2,9 @@ use super::aggregator::{Aggregator, AggregatorOpts};
use super::inner_loop;
use common::EitherSink;
use futures::{Sink, SinkExt};
use inner_loop::FromShardWebsocket;
use inner_loop::{ Metrics, FromShardWebsocket };
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::sync::{ Arc, Mutex };
#[derive(Clone)]
pub struct AggregatorSet(Arc<AggregatorSetInner>);
@@ -12,6 +12,7 @@ pub struct AggregatorSet(Arc<AggregatorSetInner>);
pub struct AggregatorSetInner {
aggregators: Vec<Aggregator>,
next_idx: AtomicUsize,
metrics: Mutex<Vec<Metrics>>
}
impl AggregatorSet {
@@ -27,10 +28,58 @@ impl AggregatorSet {
)
.await?;
Ok(AggregatorSet(Arc::new(AggregatorSetInner {
let initial_metrics = (0..num_aggregators)
.map(|_| Metrics::default())
.collect();
let this = AggregatorSet(Arc::new(AggregatorSetInner {
aggregators,
next_idx: AtomicUsize::new(0),
})))
metrics: Mutex::new(initial_metrics)
}));
// Start asking for metrics:
this.spawn_metrics_loops();
Ok(this)
}
/// Spawn loops which periodically ask for metrics from each internal aggregator.
/// Depending on how busy the aggregators are, these metrics won't necessarily be in
/// sync with each other.
fn spawn_metrics_loops(&self) {
let aggregators = self.0.aggregators.clone();
for (idx, a) in aggregators.into_iter().enumerate() {
let inner = Arc::clone(&self.0);
tokio::spawn(async move {
loop {
let now = tokio::time::Instant::now();
let metrics = match a.gather_metrics().await {
Ok(metrics) => metrics,
// Any error here is unlikely and probably means that the aggregator
// loop has failed completely.
Err(e) => {
log::error!("Error obtaining metrics (bailing): {}", e);
return
}
};
// Lock, update the stored metrics and drop the lock immediately.
// We discard any error; if somethign went wrong talking to the inner loop,
// it's probably a fatal error
{ inner.metrics.lock().unwrap()[idx] = metrics; }
// Sleep *at least* 10 seconds. If it takes a while to get metrics back, we'll
// end up waiting longer between requests.
tokio::time::sleep_until(now + tokio::time::Duration::from_secs(10)).await;
}
});
}
}
/// Return the latest metrics we've gathered so far from each internal aggregator.
pub fn latest_metrics(&self) -> Vec<Metrics> {
self.0.metrics.lock().unwrap().clone()
}
/// Return a sink that a shard can send messages into to be handled by all aggregators.
@@ -37,10 +37,9 @@ pub enum ToAggregator {
FromShardWebsocket(ConnId, FromShardWebsocket),
FromFeedWebsocket(ConnId, FromFeedWebsocket),
FromFindLocation(NodeId, find_location::Location),
/// This message is sent periodically and allows us to monitor
/// or cleanup things in our inner loop. The channel provided
/// is notified when the interval has been handled.
Interval(flume::Sender<()>),
/// Hand back some metrics. The provided sender is expected not to block when
/// a message it sent into it.
GatherMetrics(flume::Sender<Metrics>),
}
/// An incoming shard connection can send these messages to the aggregator.
@@ -102,6 +101,30 @@ pub enum FromFeedWebsocket {
Disconnected,
}
/// A set of metrics returned when we ask for metrics
#[derive(Clone,Debug,Default)]
pub struct Metrics {
/// When in unix MS from epoch were these metrics obtained
pub timestamp_unix_ms: u64,
/// How many chains are feeds currently subscribed to.
pub chains_subscribed_to: usize,
/// How many feeds are currently subscribed to something.
pub subscribed_feeds: usize,
/// How many feeds have asked for finality information, too.
pub subscribed_finality_feeds: usize,
/// How many messages are currently queued up in internal channels
/// waiting to be sent out to feeds.
pub total_messages_to_feeds: usize,
/// How many messages are queued waiting to be handled by this aggregator.
pub total_messages_to_aggregator: usize,
/// How many nodes are currently known about by this aggregator.
pub connected_nodes: usize,
/// How many feeds are currently connected to this aggregator.
pub connected_feeds: usize,
/// How many shards are currently connected to this aggregator.
pub connected_shards: usize
}
// The frontend sends text based commands; parse them into these messages:
impl FromStr for FromFeedWebsocket {
type Err = anyhow::Error;
@@ -198,45 +221,13 @@ impl InnerLoop {
ToAggregator::FromFindLocation(node_id, location) => {
self.handle_from_find_location(node_id, location)
},
ToAggregator::Interval(tx) => {
self.handle_interval(tx)
ToAggregator::GatherMetrics(tx) => {
self.handle_gather_metrics(tx, metered_rx.len())
}
}
}
});
// Periodically send interval messages for cleanup/monitoring. At most 1
// every 60 seconds, but if the message queue is backed up it may take longer.
tokio::spawn({
let metered_tx = metered_tx.clone();
async move {
loop {
let now = tokio::time::Instant::now();
let (tx, rx) = flume::unbounded();
let _ = metered_tx.send_async(ToAggregator::Interval(tx)).await;
let _ = rx.recv_async().await;
tokio::time::sleep_until(now + tokio::time::Duration::from_secs(60)).await;
}
}
});
// TEMP: let's monitor message queue len out of interest
let tx_len = metered_tx.clone();
std::thread::spawn(move || {
tokio::runtime::Runtime::new()
.unwrap()
.block_on(async move {
let mut n = 0;
loop {
println!("#{} Queue len: {}", n, tx_len.len());
n += 1;
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
}
});
});
while let Ok(msg) = rx_from_external.recv_async().await {
// ignore node updates if we have too many messages to handle, in an attempt
// to reduce the queue length back to something reasonable, lest it get out of
@@ -257,43 +248,33 @@ impl InnerLoop {
}
}
/// The periodic interval message gives us a chance to do some tidy up or monitoring.
fn handle_interval(&mut self, rx: flume::Sender<()>) {
/// Gather and return some metrics.
fn handle_gather_metrics(&mut self, rx: flume::Sender<Metrics>, total_messages_to_aggregator: usize) {
let node_ids = self.node_ids.len();
let node_count: usize = self.node_state
.iter_chains()
.map(|c| c.node_count())
.sum();
let shard_cound = self.shard_channels.len();
let timestamp_unix_ms = time::now();
let connected_nodes = self.node_ids.len();
let subscribed_feeds = self.feed_conn_id_to_chain.len();
let chains_subscribed_to = self.chain_to_feed_conn_ids.len();
let subscribed_finality_feeds = self.feed_conn_id_finality.len();
let connected_shards = self.shard_channels.len();
let connected_feeds = self.feed_channels.len();
let finality_feeds = self.feed_conn_id_finality.len();
let feed_to_chain = self.feed_conn_id_to_chain.len();
let num_subscribed_chains = self.chain_to_feed_conn_ids.len();
let num_subscribed_chain_feeds: usize = self.chain_to_feed_conn_ids
let total_messages_to_feeds: usize = self.feed_channels
.values()
.map(|c| c.len())
.sum();
let num_messages_to_feeds: usize = self.feed_channels
.values()
.map(|c| c.len())
.sum();
println!("Periodic update at {:?}:", std::time::SystemTime::now());
dbg!(node_ids);
dbg!(node_count);
dbg!(shard_cound);
dbg!(connected_feeds);
dbg!(finality_feeds);
dbg!(feed_to_chain);
dbg!(num_subscribed_chains);
dbg!(num_subscribed_chain_feeds);
dbg!(num_messages_to_feeds);
drop(rx);
// Ignore error sending; assume the receiver stopped caring and dropped the channel:
let _ = rx.send(Metrics {
timestamp_unix_ms,
chains_subscribed_to,
subscribed_feeds,
subscribed_finality_feeds,
total_messages_to_feeds,
total_messages_to_aggregator,
connected_nodes,
connected_feeds,
connected_shards
});
}
/// Handle messages that come from the node geographical locator.
+27 -4
View File
@@ -178,7 +178,11 @@ async fn start_server(num_aggregators: usize, opts: Opts) -> anyhow::Result<()>
let _ = ws_send.close().await;
},
))
}
},
// Return metrics in a prometheus-friendly text based format:
(&Method::GET, "/metrics") => {
Ok(return_prometheus_metrics(aggregator).await)
},
// 404 for anything else:
_ => Ok(Response::builder()
.status(404)
@@ -427,9 +431,7 @@ where
Some(msgs) => msgs,
None => break,
};
if _feed_id == 1 {
println!("FEED 1 message len: {}", msgs.len());
}
// There is only one message type at the mo; bytes to send
// to the websocket. collect them all up to dispatch in one shot.
let all_msg_bytes = msgs.into_iter().map(|msg| match msg {
@@ -481,3 +483,24 @@ if _feed_id == 1 {
// loop ended; give socket back to parent:
(tx_to_aggregator, ws_send)
}
async fn return_prometheus_metrics(aggregator: AggregatorSet) -> Response<hyper::Body> {
let metrics = aggregator.latest_metrics();
let mut s = String::new();
for (idx, m) in metrics.iter().enumerate() {
s.push_str(&format!("telemetry_connected_feeds{{aggregator=\"{}\"}} {} {}\n", idx, m.connected_feeds, m.timestamp_unix_ms));
s.push_str(&format!("telemetry_connected_nodes{{aggregator=\"{}\"}} {} {}\n", idx, m.connected_nodes, m.timestamp_unix_ms));
s.push_str(&format!("telemetry_connected_shards{{aggregator=\"{}\"}} {} {}\n", idx, m.connected_shards, m.timestamp_unix_ms));
s.push_str(&format!("telemetry_chains_subscribed_to{{aggregator=\"{}\"}} {} {}\n", idx, m.chains_subscribed_to, m.timestamp_unix_ms));
s.push_str(&format!("telemetry_subscribed_feeds{{aggregator=\"{}\"}} {} {}\n", idx, m.subscribed_feeds, m.timestamp_unix_ms));
s.push_str(&format!("telemetry_subscribed_finality_feeds{{aggregator=\"{}\"}} {} {}\n", idx, m.subscribed_finality_feeds, m.timestamp_unix_ms));
s.push_str(&format!("telemetry_total_messages_to_feeds{{aggregator=\"{}\"}} {} {}\n", idx, m.total_messages_to_feeds, m.timestamp_unix_ms));
s.push_str(&format!("telemetry_total_messages_to_aggregator{{aggregator=\"{}\"}} {} {}\n\n", idx, m.total_messages_to_aggregator, m.timestamp_unix_ms));
}
Response::builder()
.header(http::header::CONTENT_TYPE, "text/plain; version=0.0.4")
.body(s.into())
.unwrap()
}