cargo fmt

This commit is contained in:
James Wilson
2021-08-12 11:21:36 +01:00
parent 9017f328f0
commit 230987036a
8 changed files with 70 additions and 44 deletions
+2 -4
View File
@@ -139,8 +139,7 @@ mod test {
#[test]
fn len_doesnt_panic_if_lots_of_retired() {
let mut map = DenseMap::<usize,usize>::new();
let mut map = DenseMap::<usize, usize>::new();
let id1 = map.add(1);
let id2 = map.add(2);
@@ -163,5 +162,4 @@ mod test {
assert_eq!(map.len(), 0);
}
}
}
+3 -1
View File
@@ -60,7 +60,9 @@ impl Sender {
Ok(())
}
/// Convert this sender into a Sink
pub fn into_sink(self) -> impl futures::Sink<SentMessage> + std::marker::Unpin + Clone + 'static {
pub fn into_sink(
self,
) -> impl futures::Sink<SentMessage> + std::marker::Unpin + Clone + 'static {
self.inner.into_sink()
}
}
@@ -62,11 +62,12 @@ impl Aggregator {
let (tx_to_aggregator, rx_from_external) = flume::unbounded();
// Kick off a locator task to locate nodes, which hands back a channel to make location requests
let tx_to_locator = find_location(tx_to_aggregator.clone().into_sink().with(|(node_id, msg)| {
future::ok::<_, flume::SendError<_>>(inner_loop::ToAggregator::FromFindLocation(
node_id, msg,
))
}));
let tx_to_locator =
find_location(tx_to_aggregator.clone().into_sink().with(|(node_id, msg)| {
future::ok::<_, flume::SendError<_>>(inner_loop::ToAggregator::FromFindLocation(
node_id, msg,
))
}));
// Handle any incoming messages in our handler loop:
tokio::spawn(Aggregator::handle_messages(
@@ -2,9 +2,9 @@ use super::aggregator::{Aggregator, AggregatorOpts};
use super::inner_loop;
use common::EitherSink;
use futures::{Sink, SinkExt};
use inner_loop::{ Metrics, FromShardWebsocket };
use inner_loop::{FromShardWebsocket, Metrics};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{ Arc, Mutex };
use std::sync::{Arc, Mutex};
#[derive(Clone)]
pub struct AggregatorSet(Arc<AggregatorSetInner>);
@@ -12,7 +12,7 @@ pub struct AggregatorSet(Arc<AggregatorSetInner>);
pub struct AggregatorSetInner {
aggregators: Vec<Aggregator>,
next_idx: AtomicUsize,
metrics: Mutex<Vec<Metrics>>
metrics: Mutex<Vec<Metrics>>,
}
impl AggregatorSet {
@@ -28,14 +28,12 @@ impl AggregatorSet {
)
.await?;
let initial_metrics = (0..num_aggregators)
.map(|_| Metrics::default())
.collect();
let initial_metrics = (0..num_aggregators).map(|_| Metrics::default()).collect();
let this = AggregatorSet(Arc::new(AggregatorSetInner {
aggregators,
next_idx: AtomicUsize::new(0),
metrics: Mutex::new(initial_metrics)
metrics: Mutex::new(initial_metrics),
}));
// Start asking for metrics:
@@ -60,14 +58,16 @@ impl AggregatorSet {
// loop has failed completely.
Err(e) => {
log::error!("Error obtaining metrics (bailing): {}", e);
return
return;
}
};
// Lock, update the stored metrics and drop the lock immediately.
// We discard any error; if somethign went wrong talking to the inner loop,
// it's probably a fatal error
{ inner.metrics.lock().unwrap()[idx] = metrics; }
{
inner.metrics.lock().unwrap()[idx] = metrics;
}
// Sleep *at least* 10 seconds. If it takes a while to get metrics back, we'll
// end up waiting longer between requests.
@@ -102,7 +102,7 @@ pub enum FromFeedWebsocket {
}
/// A set of metrics returned when we ask for metrics
#[derive(Clone,Debug,Default)]
#[derive(Clone, Debug, Default)]
pub struct Metrics {
/// When in unix MS from epoch were these metrics obtained
pub timestamp_unix_ms: u64,
@@ -122,7 +122,7 @@ pub struct Metrics {
/// How many feeds are currently connected to this aggregator.
pub connected_feeds: usize,
/// How many shards are currently connected to this aggregator.
pub connected_shards: usize
pub connected_shards: usize,
}
// The frontend sends text based commands; parse them into these messages:
@@ -220,7 +220,7 @@ impl InnerLoop {
}
ToAggregator::FromFindLocation(node_id, location) => {
self.handle_from_find_location(node_id, location)
},
}
ToAggregator::GatherMetrics(tx) => {
self.handle_gather_metrics(tx, metered_rx.len())
}
@@ -249,8 +249,11 @@ impl InnerLoop {
}
/// Gather and return some metrics.
fn handle_gather_metrics(&mut self, rx: flume::Sender<Metrics>, total_messages_to_aggregator: usize) {
fn handle_gather_metrics(
&mut self,
rx: flume::Sender<Metrics>,
total_messages_to_aggregator: usize,
) {
let timestamp_unix_ms = time::now();
let connected_nodes = self.node_ids.len();
let subscribed_feeds = self.feed_conn_id_to_chain.len();
@@ -258,10 +261,7 @@ impl InnerLoop {
let subscribed_finality_feeds = self.feed_conn_id_finality.len();
let connected_shards = self.shard_channels.len();
let connected_feeds = self.feed_channels.len();
let total_messages_to_feeds: usize = self.feed_channels
.values()
.map(|c| c.len())
.sum();
let total_messages_to_feeds: usize = self.feed_channels.values().map(|c| c.len()).sum();
// Ignore error sending; assume the receiver stopped caring and dropped the channel:
let _ = rx.send(Metrics {
@@ -273,7 +273,7 @@ impl InnerLoop {
total_messages_to_aggregator,
connected_nodes,
connected_feeds,
connected_shards
connected_shards,
});
}
+35 -13
View File
@@ -178,11 +178,9 @@ async fn start_server(num_aggregators: usize, opts: Opts) -> anyhow::Result<()>
let _ = ws_send.close().await;
},
))
},
}
// Return metrics in a prometheus-friendly text based format:
(&Method::GET, "/metrics") => {
Ok(return_prometheus_metrics(aggregator).await)
},
(&Method::GET, "/metrics") => Ok(return_prometheus_metrics(aggregator).await),
// 404 for anything else:
_ => Ok(Response::builder()
.status(404)
@@ -496,14 +494,38 @@ async fn return_prometheus_metrics(aggregator: AggregatorSet) -> Response<hyper:
// be handled correctly when pointing a current version of prometheus at it.
let mut s = String::new();
for (idx, m) in metrics.iter().enumerate() {
s.push_str(&format!("telemetry_connected_feeds{{aggregator=\"{}\"}} {} {}\n", idx, m.connected_feeds, m.timestamp_unix_ms));
s.push_str(&format!("telemetry_connected_nodes{{aggregator=\"{}\"}} {} {}\n", idx, m.connected_nodes, m.timestamp_unix_ms));
s.push_str(&format!("telemetry_connected_shards{{aggregator=\"{}\"}} {} {}\n", idx, m.connected_shards, m.timestamp_unix_ms));
s.push_str(&format!("telemetry_chains_subscribed_to{{aggregator=\"{}\"}} {} {}\n", idx, m.chains_subscribed_to, m.timestamp_unix_ms));
s.push_str(&format!("telemetry_subscribed_feeds{{aggregator=\"{}\"}} {} {}\n", idx, m.subscribed_feeds, m.timestamp_unix_ms));
s.push_str(&format!("telemetry_subscribed_finality_feeds{{aggregator=\"{}\"}} {} {}\n", idx, m.subscribed_finality_feeds, m.timestamp_unix_ms));
s.push_str(&format!("telemetry_total_messages_to_feeds{{aggregator=\"{}\"}} {} {}\n", idx, m.total_messages_to_feeds, m.timestamp_unix_ms));
s.push_str(&format!("telemetry_total_messages_to_aggregator{{aggregator=\"{}\"}} {} {}\n\n", idx, m.total_messages_to_aggregator, m.timestamp_unix_ms));
s.push_str(&format!(
"telemetry_connected_feeds{{aggregator=\"{}\"}} {} {}\n",
idx, m.connected_feeds, m.timestamp_unix_ms
));
s.push_str(&format!(
"telemetry_connected_nodes{{aggregator=\"{}\"}} {} {}\n",
idx, m.connected_nodes, m.timestamp_unix_ms
));
s.push_str(&format!(
"telemetry_connected_shards{{aggregator=\"{}\"}} {} {}\n",
idx, m.connected_shards, m.timestamp_unix_ms
));
s.push_str(&format!(
"telemetry_chains_subscribed_to{{aggregator=\"{}\"}} {} {}\n",
idx, m.chains_subscribed_to, m.timestamp_unix_ms
));
s.push_str(&format!(
"telemetry_subscribed_feeds{{aggregator=\"{}\"}} {} {}\n",
idx, m.subscribed_feeds, m.timestamp_unix_ms
));
s.push_str(&format!(
"telemetry_subscribed_finality_feeds{{aggregator=\"{}\"}} {} {}\n",
idx, m.subscribed_finality_feeds, m.timestamp_unix_ms
));
s.push_str(&format!(
"telemetry_total_messages_to_feeds{{aggregator=\"{}\"}} {} {}\n",
idx, m.total_messages_to_feeds, m.timestamp_unix_ms
));
s.push_str(&format!(
"telemetry_total_messages_to_aggregator{{aggregator=\"{}\"}} {} {}\n\n",
idx, m.total_messages_to_aggregator, m.timestamp_unix_ms
));
}
Response::builder()
@@ -511,4 +533,4 @@ async fn return_prometheus_metrics(aggregator: AggregatorSet) -> Response<hyper:
.header(http::header::CONTENT_TYPE, "text/plain; version=0.0.4")
.body(s.into())
.unwrap()
}
}
+1 -1
View File
@@ -242,7 +242,7 @@ struct SoakTestOpts {
log_output: bool,
/// How many worker threads should the soak test runner use?
#[structopt(long, default_value = "4")]
test_worker_threads: usize
test_worker_threads: usize,
}
/// Get soak test args from an envvar and parse them via structopt.
+4 -1
View File
@@ -42,7 +42,10 @@ impl ShardSender {
self.unbounded_send(ws_client::SentMessage::Binary(bytes))
}
/// Send JSON as a textual websocket message
pub fn send_json_text(&mut self, json: serde_json::Value) -> Result<(), flume::SendError<ws_client::SentMessage>> {
pub fn send_json_text(
&mut self,
json: serde_json::Value,
) -> Result<(), flume::SendError<ws_client::SentMessage>> {
let s = serde_json::to_string(&json).expect("valid string");
self.unbounded_send(ws_client::SentMessage::Text(s))
}