diff --git a/backend/common/src/dense_map.rs b/backend/common/src/dense_map.rs index 3dfd420..d529eb1 100644 --- a/backend/common/src/dense_map.rs +++ b/backend/common/src/dense_map.rs @@ -139,8 +139,7 @@ mod test { #[test] fn len_doesnt_panic_if_lots_of_retired() { - - let mut map = DenseMap::::new(); + let mut map = DenseMap::::new(); let id1 = map.add(1); let id2 = map.add(2); @@ -163,5 +162,4 @@ mod test { assert_eq!(map.len(), 0); } - -} \ No newline at end of file +} diff --git a/backend/common/src/ws_client/sender.rs b/backend/common/src/ws_client/sender.rs index 9cb8ee7..b81529c 100644 --- a/backend/common/src/ws_client/sender.rs +++ b/backend/common/src/ws_client/sender.rs @@ -60,7 +60,9 @@ impl Sender { Ok(()) } /// Convert this sender into a Sink - pub fn into_sink(self) -> impl futures::Sink + std::marker::Unpin + Clone + 'static { + pub fn into_sink( + self, + ) -> impl futures::Sink + std::marker::Unpin + Clone + 'static { self.inner.into_sink() } } diff --git a/backend/telemetry_core/src/aggregator/aggregator.rs b/backend/telemetry_core/src/aggregator/aggregator.rs index 8ba8cc2..d6604a8 100644 --- a/backend/telemetry_core/src/aggregator/aggregator.rs +++ b/backend/telemetry_core/src/aggregator/aggregator.rs @@ -62,11 +62,12 @@ impl Aggregator { let (tx_to_aggregator, rx_from_external) = flume::unbounded(); // Kick off a locator task to locate nodes, which hands back a channel to make location requests - let tx_to_locator = find_location(tx_to_aggregator.clone().into_sink().with(|(node_id, msg)| { - future::ok::<_, flume::SendError<_>>(inner_loop::ToAggregator::FromFindLocation( - node_id, msg, - )) - })); + let tx_to_locator = + find_location(tx_to_aggregator.clone().into_sink().with(|(node_id, msg)| { + future::ok::<_, flume::SendError<_>>(inner_loop::ToAggregator::FromFindLocation( + node_id, msg, + )) + })); // Handle any incoming messages in our handler loop: tokio::spawn(Aggregator::handle_messages( diff --git a/backend/telemetry_core/src/aggregator/aggregator_set.rs b/backend/telemetry_core/src/aggregator/aggregator_set.rs index 8fb6251..7dd368d 100644 --- a/backend/telemetry_core/src/aggregator/aggregator_set.rs +++ b/backend/telemetry_core/src/aggregator/aggregator_set.rs @@ -2,9 +2,9 @@ use super::aggregator::{Aggregator, AggregatorOpts}; use super::inner_loop; use common::EitherSink; use futures::{Sink, SinkExt}; -use inner_loop::{ Metrics, FromShardWebsocket }; +use inner_loop::{FromShardWebsocket, Metrics}; use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::{ Arc, Mutex }; +use std::sync::{Arc, Mutex}; #[derive(Clone)] pub struct AggregatorSet(Arc); @@ -12,7 +12,7 @@ pub struct AggregatorSet(Arc); pub struct AggregatorSetInner { aggregators: Vec, next_idx: AtomicUsize, - metrics: Mutex> + metrics: Mutex>, } impl AggregatorSet { @@ -28,14 +28,12 @@ impl AggregatorSet { ) .await?; - let initial_metrics = (0..num_aggregators) - .map(|_| Metrics::default()) - .collect(); + let initial_metrics = (0..num_aggregators).map(|_| Metrics::default()).collect(); let this = AggregatorSet(Arc::new(AggregatorSetInner { aggregators, next_idx: AtomicUsize::new(0), - metrics: Mutex::new(initial_metrics) + metrics: Mutex::new(initial_metrics), })); // Start asking for metrics: @@ -60,14 +58,16 @@ impl AggregatorSet { // loop has failed completely. Err(e) => { log::error!("Error obtaining metrics (bailing): {}", e); - return + return; } }; // Lock, update the stored metrics and drop the lock immediately. // We discard any error; if somethign went wrong talking to the inner loop, // it's probably a fatal error - { inner.metrics.lock().unwrap()[idx] = metrics; } + { + inner.metrics.lock().unwrap()[idx] = metrics; + } // Sleep *at least* 10 seconds. If it takes a while to get metrics back, we'll // end up waiting longer between requests. diff --git a/backend/telemetry_core/src/aggregator/inner_loop.rs b/backend/telemetry_core/src/aggregator/inner_loop.rs index 72e468b..20ed612 100644 --- a/backend/telemetry_core/src/aggregator/inner_loop.rs +++ b/backend/telemetry_core/src/aggregator/inner_loop.rs @@ -102,7 +102,7 @@ pub enum FromFeedWebsocket { } /// A set of metrics returned when we ask for metrics -#[derive(Clone,Debug,Default)] +#[derive(Clone, Debug, Default)] pub struct Metrics { /// When in unix MS from epoch were these metrics obtained pub timestamp_unix_ms: u64, @@ -122,7 +122,7 @@ pub struct Metrics { /// How many feeds are currently connected to this aggregator. pub connected_feeds: usize, /// How many shards are currently connected to this aggregator. - pub connected_shards: usize + pub connected_shards: usize, } // The frontend sends text based commands; parse them into these messages: @@ -220,7 +220,7 @@ impl InnerLoop { } ToAggregator::FromFindLocation(node_id, location) => { self.handle_from_find_location(node_id, location) - }, + } ToAggregator::GatherMetrics(tx) => { self.handle_gather_metrics(tx, metered_rx.len()) } @@ -249,8 +249,11 @@ impl InnerLoop { } /// Gather and return some metrics. - fn handle_gather_metrics(&mut self, rx: flume::Sender, total_messages_to_aggregator: usize) { - + fn handle_gather_metrics( + &mut self, + rx: flume::Sender, + total_messages_to_aggregator: usize, + ) { let timestamp_unix_ms = time::now(); let connected_nodes = self.node_ids.len(); let subscribed_feeds = self.feed_conn_id_to_chain.len(); @@ -258,10 +261,7 @@ impl InnerLoop { let subscribed_finality_feeds = self.feed_conn_id_finality.len(); let connected_shards = self.shard_channels.len(); let connected_feeds = self.feed_channels.len(); - let total_messages_to_feeds: usize = self.feed_channels - .values() - .map(|c| c.len()) - .sum(); + let total_messages_to_feeds: usize = self.feed_channels.values().map(|c| c.len()).sum(); // Ignore error sending; assume the receiver stopped caring and dropped the channel: let _ = rx.send(Metrics { @@ -273,7 +273,7 @@ impl InnerLoop { total_messages_to_aggregator, connected_nodes, connected_feeds, - connected_shards + connected_shards, }); } diff --git a/backend/telemetry_core/src/main.rs b/backend/telemetry_core/src/main.rs index 0e2c61b..91308b0 100644 --- a/backend/telemetry_core/src/main.rs +++ b/backend/telemetry_core/src/main.rs @@ -178,11 +178,9 @@ async fn start_server(num_aggregators: usize, opts: Opts) -> anyhow::Result<()> let _ = ws_send.close().await; }, )) - }, + } // Return metrics in a prometheus-friendly text based format: - (&Method::GET, "/metrics") => { - Ok(return_prometheus_metrics(aggregator).await) - }, + (&Method::GET, "/metrics") => Ok(return_prometheus_metrics(aggregator).await), // 404 for anything else: _ => Ok(Response::builder() .status(404) @@ -496,14 +494,38 @@ async fn return_prometheus_metrics(aggregator: AggregatorSet) -> Response Response Result<(), flume::SendError> { + pub fn send_json_text( + &mut self, + json: serde_json::Value, + ) -> Result<(), flume::SendError> { let s = serde_json::to_string(&json).expect("valid string"); self.unbounded_send(ws_client::SentMessage::Text(s)) }