From 87c0ee7d0def7cebb2fe7cf19eed0e7d48787284 Mon Sep 17 00:00:00 2001 From: James Wilson Date: Mon, 9 Aug 2021 16:11:41 +0100 Subject: [PATCH 01/22] monitor aggregator length (dont discard msgs yet) --- .../common/src/channel/metered_unbounded.rs | 191 ++++++++++++++++++ backend/common/src/channel/mod.rs | 3 + backend/common/src/lib.rs | 1 + .../src/aggregator/aggregator.rs | 10 +- .../src/aggregator/inner_loop.rs | 49 +++-- 5 files changed, 233 insertions(+), 21 deletions(-) create mode 100644 backend/common/src/channel/metered_unbounded.rs create mode 100644 backend/common/src/channel/mod.rs diff --git a/backend/common/src/channel/metered_unbounded.rs b/backend/common/src/channel/metered_unbounded.rs new file mode 100644 index 0000000..f76e440 --- /dev/null +++ b/backend/common/src/channel/metered_unbounded.rs @@ -0,0 +1,191 @@ +use futures::channel::mpsc::{ SendError, TrySendError, UnboundedSender, UnboundedReceiver, unbounded }; +use futures::{ Sink, Stream, SinkExt, StreamExt }; +use std::sync::atomic::{ AtomicUsize, Ordering }; +use std::sync::Arc; +use std::task::Poll; + +/// Create an unbounded channel where we record the current length of the message queue. +pub fn metered_unbounded() -> (MeteredUnboundedSender, MeteredUnboundedReceiver) { + let (tx, rx) = unbounded(); + let len = Arc::new(AtomicUsize::new(0)); + let len2 = Arc::clone(&len); + + let tx = MeteredUnboundedSender { + inner: tx, + len: len + }; + let rx = MeteredUnboundedReceiver { + inner: rx, + len: len2 + }; + + (tx, rx) +} + +/// This is similar to an `UnboundedSender`, except that we keep track +/// of the length of the internal message buffer. +#[derive(Debug, Clone)] +pub struct MeteredUnboundedSender { + inner: UnboundedSender, + len: Arc, +} + +impl MeteredUnboundedSender { + /// The current number of messages in the queue. + pub fn len(&self) -> usize { + self.len.load(Ordering::Relaxed) + } + + /// Send a message. + pub fn unbounded_send(&self, item: T) -> Result<(), TrySendError> { + self.len.fetch_add(1, Ordering::Relaxed); + self.inner.unbounded_send(item) + } +} + +impl Sink for MeteredUnboundedSender { + type Error = SendError; + + fn poll_ready(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll> { + self.inner.poll_ready(cx) + } + + fn start_send(self: std::pin::Pin<&mut Self>, item: T) -> Result<(), Self::Error> { + self.unbounded_send(item).map_err(|e| e.into_send_error()) + } + + fn poll_flush(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll> { + self.inner.poll_flush_unpin(cx) + } + + fn poll_close(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll> { + self.inner.poll_close_unpin(cx) + } +} + +impl Stream for MeteredUnboundedReceiver { + type Item = T; + + fn poll_next(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll> { + let res = self.inner.poll_next_unpin(cx); + if matches!(res, Poll::Ready(Some(..))) { + self.len.fetch_sub(1, Ordering::Relaxed); + } + res + } +} + +/// This is similar to an `UnboundedReceiver`, except that we keep track +/// of the length of the internal message buffer. +#[derive(Debug)] +pub struct MeteredUnboundedReceiver { + inner: UnboundedReceiver, + len: Arc, +} + +impl MeteredUnboundedReceiver { + /// The current number of messages in the queue. + pub fn len(&self) -> usize { + self.len.load(Ordering::Relaxed) + } +} + +#[cfg(test)] +mod test { + + use super::*; + + #[tokio::test] + async fn channel_len_consistent_with_msgs() { + let (tx, mut rx) = metered_unbounded(); + + assert_eq!(tx.len(), 0); + tx.unbounded_send(1).unwrap(); + assert_eq!(tx.len(), 1); + tx.unbounded_send(2).unwrap(); + assert_eq!(tx.len(), 2); + tx.unbounded_send(3).unwrap(); + assert_eq!(tx.len(), 3); + + rx.next().await.unwrap(); + assert_eq!(tx.len(), 2); + rx.next().await.unwrap(); + assert_eq!(tx.len(), 1); + rx.next().await.unwrap(); + assert_eq!(tx.len(), 0); + } + + #[tokio::test] + async fn channel_len_consistent_with_msgs_sink() { + let (mut tx, mut rx) = metered_unbounded::(); + + assert_eq!(tx.len(), 0); + tx.send(1).await.unwrap(); + assert_eq!(tx.len(), 1); + tx.send(2).await.unwrap(); + assert_eq!(tx.len(), 2); + tx.send(3).await.unwrap(); + assert_eq!(tx.len(), 3); + + rx.next().await.unwrap(); + assert_eq!(tx.len(), 2); + rx.next().await.unwrap(); + assert_eq!(tx.len(), 1); + rx.next().await.unwrap(); + assert_eq!(tx.len(), 0); + } + + #[tokio::test] + async fn channel_len_consistent_when_send_parallelised() { + let (mut tx, mut rx) = metered_unbounded::(); + + // Send lots of messages on a bunch of real threads: + let mut join_handles = vec![]; + for _ in 0..50 { + let tx = tx.clone(); + let join_handle = std::thread::spawn(move || { + for i in 0..10000 { + tx.unbounded_send(i).unwrap(); + } + }); + join_handles.push(join_handle); + } + + // When they are done, our len should be accurate: + for handle in join_handles { + handle.join().unwrap(); + } + assert_eq!(tx.len(), 50 * 10_000); + + } + + #[tokio::test] + async fn channel_len_consistent_when_send_and_recv_parallelised() { + let (mut tx, mut rx) = metered_unbounded::(); + + // Send lots of messages on a bunch of real threads: + let mut join_handles = vec![]; + for _ in 0..50 { + let tx = tx.clone(); + let join_handle = std::thread::spawn(move || { + for i in 0..10000 { + tx.unbounded_send(i).unwrap(); + } + }); + join_handles.push(join_handle); + } + + // While this is happenening, we are trying to receive that same number of msgs: + for _ in 0..500_000 { + rx.next().await.unwrap(); + } + + // When they are done, our len should be accurate: + for handle in join_handles { + handle.join().unwrap(); + } + assert_eq!(tx.len(), 0); + + } + +} \ No newline at end of file diff --git a/backend/common/src/channel/mod.rs b/backend/common/src/channel/mod.rs new file mode 100644 index 0000000..dd4b412 --- /dev/null +++ b/backend/common/src/channel/mod.rs @@ -0,0 +1,3 @@ +mod metered_unbounded; + +pub use metered_unbounded::*; \ No newline at end of file diff --git a/backend/common/src/lib.rs b/backend/common/src/lib.rs index 85e14fa..b863e45 100644 --- a/backend/common/src/lib.rs +++ b/backend/common/src/lib.rs @@ -24,6 +24,7 @@ pub mod ready_chunks_all; pub mod rolling_total; pub mod time; pub mod ws_client; +pub mod channel; mod assign_id; mod dense_map; diff --git a/backend/telemetry_core/src/aggregator/aggregator.rs b/backend/telemetry_core/src/aggregator/aggregator.rs index 7c975ee..172a405 100644 --- a/backend/telemetry_core/src/aggregator/aggregator.rs +++ b/backend/telemetry_core/src/aggregator/aggregator.rs @@ -74,16 +74,16 @@ impl Aggregator { }))) } - // This is spawned into a separate task and handles any messages coming - // in to the aggregator. If nobody is tolding the tx side of the channel - // any more, this task will gracefully end. + /// This is spawned into a separate task and handles any messages coming + /// in to the aggregator. If nobody is tolding the tx side of the channel + /// any more, this task will gracefully end. async fn handle_messages( rx_from_external: mpsc::UnboundedReceiver, tx_to_aggregator: mpsc::UnboundedSender<(NodeId, Ipv4Addr)>, denylist: Vec, ) { - inner_loop::InnerLoop::new(rx_from_external, tx_to_aggregator, denylist) - .handle() + inner_loop::InnerLoop::new(tx_to_aggregator, denylist) + .handle(rx_from_external) .await; } diff --git a/backend/telemetry_core/src/aggregator/inner_loop.rs b/backend/telemetry_core/src/aggregator/inner_loop.rs index 6cfec73..1b6e0fe 100644 --- a/backend/telemetry_core/src/aggregator/inner_loop.rs +++ b/backend/telemetry_core/src/aggregator/inner_loop.rs @@ -23,6 +23,7 @@ use common::{ internal_messages::{self, MuteReason, ShardNodeId}, node_message, node_types::BlockHash, + channel::metered_unbounded, time, }; use futures::channel::mpsc; @@ -127,9 +128,6 @@ pub enum ToFeedWebsocket { /// Instances of this are responsible for handling incoming and /// outgoing messages in the main aggregator loop. pub struct InnerLoop { - /// Messages from the outside world come into this: - rx_from_external: mpsc::UnboundedReceiver, - /// The state of our chains and nodes lives here: node_state: State, /// We maintain a mapping between NodeId and ConnId+LocalId, so that we know @@ -158,12 +156,10 @@ pub struct InnerLoop { impl InnerLoop { /// Create a new inner loop handler with the various state it needs. pub fn new( - rx_from_external: mpsc::UnboundedReceiver, tx_to_locator: mpsc::UnboundedSender<(NodeId, Ipv4Addr)>, denylist: Vec, ) -> Self { InnerLoop { - rx_from_external, node_state: State::new(denylist), node_ids: BiMap::new(), feed_channels: HashMap::new(), @@ -178,19 +174,40 @@ impl InnerLoop { /// Start handling and responding to incoming messages. Owing to unbounded channels, we actually /// only have a single `.await` (in this function). This helps to make it clear that the aggregator loop /// will be able to make progress quickly without any potential yield points. - pub async fn handle(mut self) { - while let Some(msg) = self.rx_from_external.next().await { - match msg { - ToAggregator::FromFeedWebsocket(feed_conn_id, msg) => { - self.handle_from_feed(feed_conn_id, msg) - } - ToAggregator::FromShardWebsocket(shard_conn_id, msg) => { - self.handle_from_shard(shard_conn_id, msg) - } - ToAggregator::FromFindLocation(node_id, location) => { - self.handle_from_find_location(node_id, location) + pub async fn handle(mut self, mut rx_from_external: mpsc::UnboundedReceiver) { + + let (metered_tx, mut metered_rx) = metered_unbounded(); + + tokio::spawn(async move { + while let Some(msg) = metered_rx.next().await { + match msg { + ToAggregator::FromFeedWebsocket(feed_conn_id, msg) => { + self.handle_from_feed(feed_conn_id, msg) + } + ToAggregator::FromShardWebsocket(shard_conn_id, msg) => { + self.handle_from_shard(shard_conn_id, msg) + } + ToAggregator::FromFindLocation(node_id, location) => { + self.handle_from_find_location(node_id, location) + } } } + }); + + // TEMP: let's monitor message queue len out of interest + let tx_len = metered_tx.clone(); + tokio::spawn(async move { + loop { + println!("Queue len: {}", tx_len.len()); + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await + } + }); + + while let Some(msg) = rx_from_external.next().await { + if let Err(e) = metered_tx.unbounded_send(msg) { + log::error!("Cannot send message into aggregator: {}", e); + break; + } } } From b97aec99a8639bf110418389602837581087ad6e Mon Sep 17 00:00:00 2001 From: James Wilson Date: Mon, 9 Aug 2021 16:38:23 +0100 Subject: [PATCH 02/22] monitoring queue len --- backend/telemetry_core/src/aggregator/inner_loop.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/backend/telemetry_core/src/aggregator/inner_loop.rs b/backend/telemetry_core/src/aggregator/inner_loop.rs index 1b6e0fe..4715c9e 100644 --- a/backend/telemetry_core/src/aggregator/inner_loop.rs +++ b/backend/telemetry_core/src/aggregator/inner_loop.rs @@ -197,9 +197,11 @@ impl InnerLoop { // TEMP: let's monitor message queue len out of interest let tx_len = metered_tx.clone(); tokio::spawn(async move { + let mut n = 0; loop { - println!("Queue len: {}", tx_len.len()); - tokio::time::sleep(tokio::time::Duration::from_secs(1)).await + println!("#{} Queue len: {}", n, tx_len.len()); + n += 1; + tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; } }); From 968dd2b957cc84c80b1285dcc7928b0fb14deda7 Mon Sep 17 00:00:00 2001 From: James Wilson Date: Mon, 9 Aug 2021 16:50:09 +0100 Subject: [PATCH 03/22] Try to force new thread for msg counter to ensure it has time to print --- .../telemetry_core/src/aggregator/inner_loop.rs | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/backend/telemetry_core/src/aggregator/inner_loop.rs b/backend/telemetry_core/src/aggregator/inner_loop.rs index 4715c9e..2426090 100644 --- a/backend/telemetry_core/src/aggregator/inner_loop.rs +++ b/backend/telemetry_core/src/aggregator/inner_loop.rs @@ -196,13 +196,15 @@ impl InnerLoop { // TEMP: let's monitor message queue len out of interest let tx_len = metered_tx.clone(); - tokio::spawn(async move { - let mut n = 0; - loop { - println!("#{} Queue len: {}", n, tx_len.len()); - n += 1; - tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; - } + std::thread::spawn(move || { + tokio::runtime::Runtime::new().unwrap().block_on(async move { + let mut n = 0; + loop { + println!("#{} Queue len: {}", n, tx_len.len()); + n += 1; + tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; + } + }); }); while let Some(msg) = rx_from_external.next().await { From 98c9ccd278fc15ef1d231182a54a3714deb44a0b Mon Sep 17 00:00:00 2001 From: James Wilson Date: Mon, 9 Aug 2021 17:52:56 +0100 Subject: [PATCH 04/22] fmt, clean warnings, tidy aggregator opts and add queue length limit --- .../common/src/channel/metered_unbounded.rs | 49 ++++++++++++------- backend/common/src/channel/mod.rs | 2 +- backend/common/src/lib.rs | 2 +- .../src/aggregator/aggregator.rs | 18 +++++-- .../src/aggregator/aggregator_set.rs | 6 +-- .../src/aggregator/inner_loop.rs | 39 +++++++++++---- backend/telemetry_core/src/aggregator/mod.rs | 1 + backend/telemetry_core/src/main.rs | 17 ++++++- 8 files changed, 95 insertions(+), 39 deletions(-) diff --git a/backend/common/src/channel/metered_unbounded.rs b/backend/common/src/channel/metered_unbounded.rs index f76e440..81c8f00 100644 --- a/backend/common/src/channel/metered_unbounded.rs +++ b/backend/common/src/channel/metered_unbounded.rs @@ -1,6 +1,8 @@ -use futures::channel::mpsc::{ SendError, TrySendError, UnboundedSender, UnboundedReceiver, unbounded }; -use futures::{ Sink, Stream, SinkExt, StreamExt }; -use std::sync::atomic::{ AtomicUsize, Ordering }; +use futures::channel::mpsc::{ + unbounded, SendError, TrySendError, UnboundedReceiver, UnboundedSender, +}; +use futures::{Sink, SinkExt, Stream, StreamExt}; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use std::task::Poll; @@ -12,11 +14,11 @@ pub fn metered_unbounded() -> (MeteredUnboundedSender, MeteredUnboundedRec let tx = MeteredUnboundedSender { inner: tx, - len: len + len: len, }; let rx = MeteredUnboundedReceiver { inner: rx, - len: len2 + len: len2, }; (tx, rx) @@ -30,7 +32,7 @@ pub struct MeteredUnboundedSender { len: Arc, } -impl MeteredUnboundedSender { +impl MeteredUnboundedSender { /// The current number of messages in the queue. pub fn len(&self) -> usize { self.len.load(Ordering::Relaxed) @@ -43,10 +45,13 @@ impl MeteredUnboundedSender { } } -impl Sink for MeteredUnboundedSender { +impl Sink for MeteredUnboundedSender { type Error = SendError; - fn poll_ready(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll> { + fn poll_ready( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll> { self.inner.poll_ready(cx) } @@ -54,19 +59,28 @@ impl Sink for MeteredUnboundedSender { self.unbounded_send(item).map_err(|e| e.into_send_error()) } - fn poll_flush(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll> { + fn poll_flush( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll> { self.inner.poll_flush_unpin(cx) } - fn poll_close(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll> { + fn poll_close( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll> { self.inner.poll_close_unpin(cx) } } -impl Stream for MeteredUnboundedReceiver { +impl Stream for MeteredUnboundedReceiver { type Item = T; - fn poll_next(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll> { + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { let res = self.inner.poll_next_unpin(cx); if matches!(res, Poll::Ready(Some(..))) { self.len.fetch_sub(1, Ordering::Relaxed); @@ -83,7 +97,7 @@ pub struct MeteredUnboundedReceiver { len: Arc, } -impl MeteredUnboundedReceiver { +impl MeteredUnboundedReceiver { /// The current number of messages in the queue. pub fn len(&self) -> usize { self.len.load(Ordering::Relaxed) @@ -137,7 +151,7 @@ mod test { #[tokio::test] async fn channel_len_consistent_when_send_parallelised() { - let (mut tx, mut rx) = metered_unbounded::(); + let (tx, _rx) = metered_unbounded::(); // Send lots of messages on a bunch of real threads: let mut join_handles = vec![]; @@ -156,12 +170,11 @@ mod test { handle.join().unwrap(); } assert_eq!(tx.len(), 50 * 10_000); - } #[tokio::test] async fn channel_len_consistent_when_send_and_recv_parallelised() { - let (mut tx, mut rx) = metered_unbounded::(); + let (tx, mut rx) = metered_unbounded::(); // Send lots of messages on a bunch of real threads: let mut join_handles = vec![]; @@ -185,7 +198,5 @@ mod test { handle.join().unwrap(); } assert_eq!(tx.len(), 0); - } - -} \ No newline at end of file +} diff --git a/backend/common/src/channel/mod.rs b/backend/common/src/channel/mod.rs index dd4b412..3b1b209 100644 --- a/backend/common/src/channel/mod.rs +++ b/backend/common/src/channel/mod.rs @@ -1,3 +1,3 @@ mod metered_unbounded; -pub use metered_unbounded::*; \ No newline at end of file +pub use metered_unbounded::*; diff --git a/backend/common/src/lib.rs b/backend/common/src/lib.rs index b863e45..690f118 100644 --- a/backend/common/src/lib.rs +++ b/backend/common/src/lib.rs @@ -15,6 +15,7 @@ // along with this program. If not, see . pub mod byte_size; +pub mod channel; pub mod http_utils; pub mod id_type; pub mod internal_messages; @@ -24,7 +25,6 @@ pub mod ready_chunks_all; pub mod rolling_total; pub mod time; pub mod ws_client; -pub mod channel; mod assign_id; mod dense_map; diff --git a/backend/telemetry_core/src/aggregator/aggregator.rs b/backend/telemetry_core/src/aggregator/aggregator.rs index 172a405..7f644c2 100644 --- a/backend/telemetry_core/src/aggregator/aggregator.rs +++ b/backend/telemetry_core/src/aggregator/aggregator.rs @@ -34,6 +34,16 @@ id_type! { #[derive(Clone)] pub struct Aggregator(Arc); +/// Options to configure the aggregator loop(s) +#[derive(Debug, Clone)] +pub struct AggregatorOpts { + /// Any node from these chains is muted + pub denylist: Vec, + /// If our incoming message queue exceeds this length, we start + /// dropping non-essential messages. + pub max_queue_len: usize, +} + struct AggregatorInternal { /// Shards that connect are each assigned a unique connection ID. /// This helps us know who to send messages back to (especially in @@ -49,7 +59,7 @@ struct AggregatorInternal { impl Aggregator { /// Spawn a new Aggregator. This connects to the telemetry backend - pub async fn spawn(denylist: Vec) -> anyhow::Result { + pub async fn spawn(opts: AggregatorOpts) -> anyhow::Result { let (tx_to_aggregator, rx_from_external) = mpsc::unbounded(); // Kick off a locator task to locate nodes, which hands back a channel to make location requests @@ -63,7 +73,8 @@ impl Aggregator { tokio::spawn(Aggregator::handle_messages( rx_from_external, tx_to_locator, - denylist, + opts.max_queue_len, + opts.denylist, )); // Return a handle to our aggregator: @@ -80,9 +91,10 @@ impl Aggregator { async fn handle_messages( rx_from_external: mpsc::UnboundedReceiver, tx_to_aggregator: mpsc::UnboundedSender<(NodeId, Ipv4Addr)>, + max_queue_len: usize, denylist: Vec, ) { - inner_loop::InnerLoop::new(tx_to_aggregator, denylist) + inner_loop::InnerLoop::new(tx_to_aggregator, denylist, max_queue_len) .handle(rx_from_external) .await; } diff --git a/backend/telemetry_core/src/aggregator/aggregator_set.rs b/backend/telemetry_core/src/aggregator/aggregator_set.rs index 8ebfa25..5fa991b 100644 --- a/backend/telemetry_core/src/aggregator/aggregator_set.rs +++ b/backend/telemetry_core/src/aggregator/aggregator_set.rs @@ -1,4 +1,4 @@ -use super::aggregator::Aggregator; +use super::aggregator::{Aggregator, AggregatorOpts}; use super::inner_loop; use common::EitherSink; use futures::{Sink, SinkExt, StreamExt}; @@ -18,12 +18,12 @@ impl AggregatorSet { /// Spawn the number of aggregators we're asked to. pub async fn spawn( num_aggregators: usize, - denylist: Vec, + opts: AggregatorOpts, ) -> anyhow::Result { assert_ne!(num_aggregators, 0, "You must have 1 or more aggregator"); let aggregators = futures::future::try_join_all( - (0..num_aggregators).map(|_| Aggregator::spawn(denylist.clone())), + (0..num_aggregators).map(|_| Aggregator::spawn(opts.clone())), ) .await?; diff --git a/backend/telemetry_core/src/aggregator/inner_loop.rs b/backend/telemetry_core/src/aggregator/inner_loop.rs index 2426090..abfd1fc 100644 --- a/backend/telemetry_core/src/aggregator/inner_loop.rs +++ b/backend/telemetry_core/src/aggregator/inner_loop.rs @@ -20,10 +20,10 @@ use crate::find_location; use crate::state::{self, NodeId, State}; use bimap::BiMap; use common::{ + channel::metered_unbounded, internal_messages::{self, MuteReason, ShardNodeId}, node_message, node_types::BlockHash, - channel::metered_unbounded, time, }; use futures::channel::mpsc; @@ -151,6 +151,10 @@ pub struct InnerLoop { /// Send messages here to make geographical location requests. tx_to_locator: mpsc::UnboundedSender<(NodeId, Ipv4Addr)>, + + /// How big can the queue of messages coming in to the aggregator get before messages + /// are prioritised and dropped to try and get back on track. + max_queue_len: usize, } impl InnerLoop { @@ -158,6 +162,7 @@ impl InnerLoop { pub fn new( tx_to_locator: mpsc::UnboundedSender<(NodeId, Ipv4Addr)>, denylist: Vec, + max_queue_len: usize, ) -> Self { InnerLoop { node_state: State::new(denylist), @@ -168,6 +173,7 @@ impl InnerLoop { chain_to_feed_conn_ids: HashMap::new(), feed_conn_id_finality: HashSet::new(), tx_to_locator, + max_queue_len, } } @@ -175,7 +181,7 @@ impl InnerLoop { /// only have a single `.await` (in this function). This helps to make it clear that the aggregator loop /// will be able to make progress quickly without any potential yield points. pub async fn handle(mut self, mut rx_from_external: mpsc::UnboundedReceiver) { - + let max_queue_len = self.max_queue_len; let (metered_tx, mut metered_rx) = metered_unbounded(); tokio::spawn(async move { @@ -197,17 +203,30 @@ impl InnerLoop { // TEMP: let's monitor message queue len out of interest let tx_len = metered_tx.clone(); std::thread::spawn(move || { - tokio::runtime::Runtime::new().unwrap().block_on(async move { - let mut n = 0; - loop { - println!("#{} Queue len: {}", n, tx_len.len()); - n += 1; - tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; - } - }); + tokio::runtime::Runtime::new() + .unwrap() + .block_on(async move { + let mut n = 0; + loop { + println!("#{} Queue len: {}", n, tx_len.len()); + n += 1; + tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; + } + }); }); while let Some(msg) = rx_from_external.next().await { + // ignore node updates if we have too many messages to handle, in an attempt + // to reduce the queue length back to something reasonable. + if metered_tx.len() > max_queue_len { + if matches!( + msg, + ToAggregator::FromShardWebsocket(.., FromShardWebsocket::Update { .. }) + ) { + continue; + } + } + if let Err(e) = metered_tx.unbounded_send(msg) { log::error!("Cannot send message into aggregator: {}", e); break; diff --git a/backend/telemetry_core/src/aggregator/mod.rs b/backend/telemetry_core/src/aggregator/mod.rs index 2865ed9..9caab51 100644 --- a/backend/telemetry_core/src/aggregator/mod.rs +++ b/backend/telemetry_core/src/aggregator/mod.rs @@ -19,6 +19,7 @@ mod aggregator_set; mod inner_loop; // Expose the various message types that can be worked with externally: +pub use aggregator::AggregatorOpts; pub use inner_loop::{FromFeedWebsocket, FromShardWebsocket, ToFeedWebsocket, ToShardWebsocket}; pub use aggregator_set::*; diff --git a/backend/telemetry_core/src/main.rs b/backend/telemetry_core/src/main.rs index e7f1414..beed2ec 100644 --- a/backend/telemetry_core/src/main.rs +++ b/backend/telemetry_core/src/main.rs @@ -22,7 +22,8 @@ use std::str::FromStr; use tokio::time::{Duration, Instant}; use aggregator::{ - AggregatorSet, FromFeedWebsocket, FromShardWebsocket, ToFeedWebsocket, ToShardWebsocket, + AggregatorOpts, AggregatorSet, FromFeedWebsocket, FromShardWebsocket, ToFeedWebsocket, + ToShardWebsocket, }; use bincode::Options; use common::http_utils; @@ -67,6 +68,10 @@ struct Opts { /// aggregators. #[structopt(long)] num_aggregators: Option, + /// How big can the message queue for each aggregator grow before we start dropping non-essential + /// messages in an attempt to let it reduce? + #[structopt(long)] + aggregator_queue_len: Option, } fn main() { @@ -110,7 +115,15 @@ fn main() { /// Declare our routes and start the server. async fn start_server(num_aggregators: usize, opts: Opts) -> anyhow::Result<()> { - let aggregator = AggregatorSet::spawn(num_aggregators, opts.denylist).await?; + let aggregator_queue_len = opts.aggregator_queue_len.unwrap_or(10_000); + let aggregator = AggregatorSet::spawn( + num_aggregators, + AggregatorOpts { + max_queue_len: aggregator_queue_len, + denylist: opts.denylist, + }, + ) + .await?; let socket_addr = opts.socket; let feed_timeout = opts.feed_timeout; From 8268cf2afe900481f5bfadb72a05d5480321106b Mon Sep 17 00:00:00 2001 From: James Wilson Date: Mon, 9 Aug 2021 18:05:01 +0100 Subject: [PATCH 05/22] print feed 1 msg len --- backend/telemetry_core/src/main.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/backend/telemetry_core/src/main.rs b/backend/telemetry_core/src/main.rs index beed2ec..d96ab9d 100644 --- a/backend/telemetry_core/src/main.rs +++ b/backend/telemetry_core/src/main.rs @@ -363,7 +363,6 @@ where let recv_handle = tokio::spawn(async move { loop { let mut bytes = Vec::new(); - // Receive a message, or bail if closer called. We don't care about cancel safety; // if we're halfway through receiving a message, no biggie since we're closing the // connection anyway. @@ -427,7 +426,9 @@ where Some(msgs) => msgs, None => break, }; - +if _feed_id == 1 { + println!("FEED 1 message len: {}", msgs.len()); +} // There is only one message type at the mo; bytes to send // to the websocket. collect them all up to dispatch in one shot. let all_msg_bytes = msgs.into_iter().map(|msg| match msg { From 703a9ddc4e9db5a460441b6953426db8bba0c8f0 Mon Sep 17 00:00:00 2001 From: James Wilson Date: Tue, 10 Aug 2021 10:26:13 +0100 Subject: [PATCH 06/22] use flume throughout telemetry_core --- backend/Cargo.lock | 54 +++++ .../common/src/channel/metered_unbounded.rs | 202 ------------------ backend/common/src/channel/mod.rs | 3 - backend/common/src/lib.rs | 1 - backend/telemetry_core/Cargo.toml | 1 + .../src/aggregator/aggregator.rs | 17 +- .../src/aggregator/inner_loop.rs | 52 ++--- backend/telemetry_core/src/find_location.rs | 6 +- backend/telemetry_core/src/main.rs | 9 +- 9 files changed, 98 insertions(+), 247 deletions(-) delete mode 100644 backend/common/src/channel/metered_unbounded.rs delete mode 100644 backend/common/src/channel/mod.rs diff --git a/backend/Cargo.lock b/backend/Cargo.lock index 2536d50..c9235fe 100644 --- a/backend/Cargo.lock +++ b/backend/Cargo.lock @@ -392,6 +392,19 @@ dependencies = [ "static_assertions", ] +[[package]] +name = "flume" +version = "0.10.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e90cc80fad5bb391b38127896b0fa27d97e7fef74742797f4da518d67e1292f" +dependencies = [ + "futures-core", + "futures-sink", + "nanorand", + "pin-project", + "spinning_top", +] + [[package]] name = "fnv" version = "1.0.7" @@ -540,8 +553,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7fcd999463524c52659517fe2cea98493cfe485d10565e7b0fb07dbba7ad2753" dependencies = [ "cfg-if", + "js-sys", "libc", "wasi", + "wasm-bindgen", ] [[package]] @@ -827,6 +842,15 @@ dependencies = [ "winapi", ] +[[package]] +name = "nanorand" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "729eb334247daa1803e0a094d0a5c55711b85571179f5ec6e53eccfdf7008958" +dependencies = [ + "getrandom", +] + [[package]] name = "native-tls" version = "0.2.7" @@ -977,6 +1001,26 @@ version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e" +[[package]] +name = "pin-project" +version = "1.0.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "576bc800220cc65dac09e99e97b08b358cfab6e17078de8dc5fee223bd2d0c08" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.0.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e8fe8163d14ce7f0cdac2e040116f22eac817edabff0be91e8aff7e9accf389" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "pin-project-lite" version = "0.2.7" @@ -1458,6 +1502,15 @@ dependencies = [ "sha-1", ] +[[package]] +name = "spinning_top" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75adad84ee84b521fb2cca2d4fd0f1dab1d8d026bda3c5bea4ca63b5f9f9293c" +dependencies = [ + "lock_api", +] + [[package]] name = "static_assertions" version = "1.1.0" @@ -1521,6 +1574,7 @@ dependencies = [ "bytes", "common", "criterion", + "flume", "futures", "hex", "http", diff --git a/backend/common/src/channel/metered_unbounded.rs b/backend/common/src/channel/metered_unbounded.rs deleted file mode 100644 index 81c8f00..0000000 --- a/backend/common/src/channel/metered_unbounded.rs +++ /dev/null @@ -1,202 +0,0 @@ -use futures::channel::mpsc::{ - unbounded, SendError, TrySendError, UnboundedReceiver, UnboundedSender, -}; -use futures::{Sink, SinkExt, Stream, StreamExt}; -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::Arc; -use std::task::Poll; - -/// Create an unbounded channel where we record the current length of the message queue. -pub fn metered_unbounded() -> (MeteredUnboundedSender, MeteredUnboundedReceiver) { - let (tx, rx) = unbounded(); - let len = Arc::new(AtomicUsize::new(0)); - let len2 = Arc::clone(&len); - - let tx = MeteredUnboundedSender { - inner: tx, - len: len, - }; - let rx = MeteredUnboundedReceiver { - inner: rx, - len: len2, - }; - - (tx, rx) -} - -/// This is similar to an `UnboundedSender`, except that we keep track -/// of the length of the internal message buffer. -#[derive(Debug, Clone)] -pub struct MeteredUnboundedSender { - inner: UnboundedSender, - len: Arc, -} - -impl MeteredUnboundedSender { - /// The current number of messages in the queue. - pub fn len(&self) -> usize { - self.len.load(Ordering::Relaxed) - } - - /// Send a message. - pub fn unbounded_send(&self, item: T) -> Result<(), TrySendError> { - self.len.fetch_add(1, Ordering::Relaxed); - self.inner.unbounded_send(item) - } -} - -impl Sink for MeteredUnboundedSender { - type Error = SendError; - - fn poll_ready( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> Poll> { - self.inner.poll_ready(cx) - } - - fn start_send(self: std::pin::Pin<&mut Self>, item: T) -> Result<(), Self::Error> { - self.unbounded_send(item).map_err(|e| e.into_send_error()) - } - - fn poll_flush( - mut self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> Poll> { - self.inner.poll_flush_unpin(cx) - } - - fn poll_close( - mut self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> Poll> { - self.inner.poll_close_unpin(cx) - } -} - -impl Stream for MeteredUnboundedReceiver { - type Item = T; - - fn poll_next( - mut self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - let res = self.inner.poll_next_unpin(cx); - if matches!(res, Poll::Ready(Some(..))) { - self.len.fetch_sub(1, Ordering::Relaxed); - } - res - } -} - -/// This is similar to an `UnboundedReceiver`, except that we keep track -/// of the length of the internal message buffer. -#[derive(Debug)] -pub struct MeteredUnboundedReceiver { - inner: UnboundedReceiver, - len: Arc, -} - -impl MeteredUnboundedReceiver { - /// The current number of messages in the queue. - pub fn len(&self) -> usize { - self.len.load(Ordering::Relaxed) - } -} - -#[cfg(test)] -mod test { - - use super::*; - - #[tokio::test] - async fn channel_len_consistent_with_msgs() { - let (tx, mut rx) = metered_unbounded(); - - assert_eq!(tx.len(), 0); - tx.unbounded_send(1).unwrap(); - assert_eq!(tx.len(), 1); - tx.unbounded_send(2).unwrap(); - assert_eq!(tx.len(), 2); - tx.unbounded_send(3).unwrap(); - assert_eq!(tx.len(), 3); - - rx.next().await.unwrap(); - assert_eq!(tx.len(), 2); - rx.next().await.unwrap(); - assert_eq!(tx.len(), 1); - rx.next().await.unwrap(); - assert_eq!(tx.len(), 0); - } - - #[tokio::test] - async fn channel_len_consistent_with_msgs_sink() { - let (mut tx, mut rx) = metered_unbounded::(); - - assert_eq!(tx.len(), 0); - tx.send(1).await.unwrap(); - assert_eq!(tx.len(), 1); - tx.send(2).await.unwrap(); - assert_eq!(tx.len(), 2); - tx.send(3).await.unwrap(); - assert_eq!(tx.len(), 3); - - rx.next().await.unwrap(); - assert_eq!(tx.len(), 2); - rx.next().await.unwrap(); - assert_eq!(tx.len(), 1); - rx.next().await.unwrap(); - assert_eq!(tx.len(), 0); - } - - #[tokio::test] - async fn channel_len_consistent_when_send_parallelised() { - let (tx, _rx) = metered_unbounded::(); - - // Send lots of messages on a bunch of real threads: - let mut join_handles = vec![]; - for _ in 0..50 { - let tx = tx.clone(); - let join_handle = std::thread::spawn(move || { - for i in 0..10000 { - tx.unbounded_send(i).unwrap(); - } - }); - join_handles.push(join_handle); - } - - // When they are done, our len should be accurate: - for handle in join_handles { - handle.join().unwrap(); - } - assert_eq!(tx.len(), 50 * 10_000); - } - - #[tokio::test] - async fn channel_len_consistent_when_send_and_recv_parallelised() { - let (tx, mut rx) = metered_unbounded::(); - - // Send lots of messages on a bunch of real threads: - let mut join_handles = vec![]; - for _ in 0..50 { - let tx = tx.clone(); - let join_handle = std::thread::spawn(move || { - for i in 0..10000 { - tx.unbounded_send(i).unwrap(); - } - }); - join_handles.push(join_handle); - } - - // While this is happenening, we are trying to receive that same number of msgs: - for _ in 0..500_000 { - rx.next().await.unwrap(); - } - - // When they are done, our len should be accurate: - for handle in join_handles { - handle.join().unwrap(); - } - assert_eq!(tx.len(), 0); - } -} diff --git a/backend/common/src/channel/mod.rs b/backend/common/src/channel/mod.rs deleted file mode 100644 index 3b1b209..0000000 --- a/backend/common/src/channel/mod.rs +++ /dev/null @@ -1,3 +0,0 @@ -mod metered_unbounded; - -pub use metered_unbounded::*; diff --git a/backend/common/src/lib.rs b/backend/common/src/lib.rs index 690f118..85e14fa 100644 --- a/backend/common/src/lib.rs +++ b/backend/common/src/lib.rs @@ -15,7 +15,6 @@ // along with this program. If not, see . pub mod byte_size; -pub mod channel; pub mod http_utils; pub mod id_type; pub mod internal_messages; diff --git a/backend/telemetry_core/Cargo.toml b/backend/telemetry_core/Cargo.toml index 35ca51d..2f1c46e 100644 --- a/backend/telemetry_core/Cargo.toml +++ b/backend/telemetry_core/Cargo.toml @@ -15,6 +15,7 @@ bimap = "0.6.1" bincode = "1.3.3" bytes = "1.0.1" common = { path = "../common" } +flume = "0.10.8" futures = "0.3.15" hex = "0.4.3" http = "0.2.4" diff --git a/backend/telemetry_core/src/aggregator/aggregator.rs b/backend/telemetry_core/src/aggregator/aggregator.rs index 7f644c2..98f4935 100644 --- a/backend/telemetry_core/src/aggregator/aggregator.rs +++ b/backend/telemetry_core/src/aggregator/aggregator.rs @@ -18,7 +18,6 @@ use super::inner_loop; use crate::find_location::find_location; use crate::state::NodeId; use common::id_type; -use futures::channel::mpsc; use futures::{future, Sink, SinkExt}; use std::net::Ipv4Addr; use std::sync::atomic::AtomicU64; @@ -54,17 +53,17 @@ struct AggregatorInternal { /// Send messages in to the aggregator from the outside via this. This is /// stored here so that anybody holding an `Aggregator` handle can /// make use of it. - tx_to_aggregator: mpsc::UnboundedSender, + tx_to_aggregator: flume::Sender, } impl Aggregator { /// Spawn a new Aggregator. This connects to the telemetry backend pub async fn spawn(opts: AggregatorOpts) -> anyhow::Result { - let (tx_to_aggregator, rx_from_external) = mpsc::unbounded(); + let (tx_to_aggregator, rx_from_external) = flume::unbounded(); // Kick off a locator task to locate nodes, which hands back a channel to make location requests - let tx_to_locator = find_location(tx_to_aggregator.clone().with(|(node_id, msg)| { - future::ok::<_, mpsc::SendError>(inner_loop::ToAggregator::FromFindLocation( + let tx_to_locator = find_location(tx_to_aggregator.clone().into_sink().with(|(node_id, msg)| { + future::ok::<_, flume::SendError<_>>(inner_loop::ToAggregator::FromFindLocation( node_id, msg, )) })); @@ -89,8 +88,8 @@ impl Aggregator { /// in to the aggregator. If nobody is tolding the tx side of the channel /// any more, this task will gracefully end. async fn handle_messages( - rx_from_external: mpsc::UnboundedReceiver, - tx_to_aggregator: mpsc::UnboundedSender<(NodeId, Ipv4Addr)>, + rx_from_external: flume::Receiver, + tx_to_aggregator: flume::Sender<(NodeId, Ipv4Addr)>, max_queue_len: usize, denylist: Vec, ) { @@ -114,7 +113,7 @@ impl Aggregator { // Calling `send` on this Sink requires Unpin. There may be a nicer way than this, // but pinning by boxing is the easy solution for now: - Box::pin(tx_to_aggregator.with(move |msg| async move { + Box::pin(tx_to_aggregator.into_sink().with(move |msg| async move { Ok(inner_loop::ToAggregator::FromShardWebsocket( shard_conn_id.into(), msg, @@ -141,7 +140,7 @@ impl Aggregator { // but pinning by boxing is the easy solution for now: ( feed_conn_id, - Box::pin(tx_to_aggregator.with(move |msg| async move { + Box::pin(tx_to_aggregator.into_sink().with(move |msg| async move { Ok(inner_loop::ToAggregator::FromFeedWebsocket( feed_conn_id.into(), msg, diff --git a/backend/telemetry_core/src/aggregator/inner_loop.rs b/backend/telemetry_core/src/aggregator/inner_loop.rs index abfd1fc..88cb402 100644 --- a/backend/telemetry_core/src/aggregator/inner_loop.rs +++ b/backend/telemetry_core/src/aggregator/inner_loop.rs @@ -20,13 +20,11 @@ use crate::find_location; use crate::state::{self, NodeId, State}; use bimap::BiMap; use common::{ - channel::metered_unbounded, internal_messages::{self, MuteReason, ShardNodeId}, node_message, node_types::BlockHash, time, }; -use futures::channel::mpsc; use futures::StreamExt; use std::collections::{HashMap, HashSet}; use std::{ @@ -48,7 +46,7 @@ pub enum FromShardWebsocket { /// When the socket is opened, it'll send this first /// so that we have a way to communicate back to it. Initialize { - channel: mpsc::UnboundedSender, + channel: flume::Sender, }, /// Tell the aggregator about a new node. Add { @@ -86,7 +84,7 @@ pub enum FromFeedWebsocket { /// Unbounded so that slow feeds don't block aggregato /// progress. Initialize { - channel: mpsc::UnboundedSender, + channel: flume::Sender, }, /// The feed can subscribe to a chain to receive /// messages relating to it. @@ -135,9 +133,9 @@ pub struct InnerLoop { node_ids: BiMap, /// Keep track of how to send messages out to feeds. - feed_channels: HashMap>, + feed_channels: HashMap>, /// Keep track of how to send messages out to shards. - shard_channels: HashMap>, + shard_channels: HashMap>, /// Which chain is a feed subscribed to? /// Feed Connection ID -> Chain Genesis Hash @@ -150,7 +148,7 @@ pub struct InnerLoop { feed_conn_id_finality: HashSet, /// Send messages here to make geographical location requests. - tx_to_locator: mpsc::UnboundedSender<(NodeId, Ipv4Addr)>, + tx_to_locator: flume::Sender<(NodeId, Ipv4Addr)>, /// How big can the queue of messages coming in to the aggregator get before messages /// are prioritised and dropped to try and get back on track. @@ -160,7 +158,7 @@ pub struct InnerLoop { impl InnerLoop { /// Create a new inner loop handler with the various state it needs. pub fn new( - tx_to_locator: mpsc::UnboundedSender<(NodeId, Ipv4Addr)>, + tx_to_locator: flume::Sender<(NodeId, Ipv4Addr)>, denylist: Vec, max_queue_len: usize, ) -> Self { @@ -177,14 +175,16 @@ impl InnerLoop { } } - /// Start handling and responding to incoming messages. Owing to unbounded channels, we actually - /// only have a single `.await` (in this function). This helps to make it clear that the aggregator loop - /// will be able to make progress quickly without any potential yield points. - pub async fn handle(mut self, mut rx_from_external: mpsc::UnboundedReceiver) { + /// Start handling and responding to incoming messages. + pub async fn handle(mut self, rx_from_external: flume::Receiver) { let max_queue_len = self.max_queue_len; - let (metered_tx, mut metered_rx) = metered_unbounded(); + let (metered_tx, metered_rx) = flume::unbounded(); + // Actually handle all of our messages, but before we get here, we + // check the length of the queue below to decide whether or not to + // pass the message on to this. tokio::spawn(async move { + let mut metered_rx = metered_rx.into_stream(); while let Some(msg) = metered_rx.next().await { match msg { ToAggregator::FromFeedWebsocket(feed_conn_id, msg) => { @@ -215,9 +215,11 @@ impl InnerLoop { }); }); + let mut rx_from_external = rx_from_external.into_stream(); while let Some(msg) = rx_from_external.next().await { // ignore node updates if we have too many messages to handle, in an attempt - // to reduce the queue length back to something reasonable. + // to reduce the queue length back to something reasonable, lest it get out of + // control and start consuming a load of memory. if metered_tx.len() > max_queue_len { if matches!( msg, @@ -227,7 +229,7 @@ impl InnerLoop { } } - if let Err(e) = metered_tx.unbounded_send(msg) { + if let Err(e) = metered_tx.send(msg) { log::error!("Cannot send message into aggregator: {}", e); break; } @@ -277,7 +279,7 @@ impl InnerLoop { match self.node_state.add_node(genesis_hash, node) { state::AddNodeResult::ChainOnDenyList => { if let Some(shard_conn) = self.shard_channels.get_mut(&shard_conn_id) { - let _ = shard_conn.unbounded_send(ToShardWebsocket::Mute { + let _ = shard_conn.send(ToShardWebsocket::Mute { local_id, reason: MuteReason::ChainNotAllowed, }); @@ -285,7 +287,7 @@ impl InnerLoop { } state::AddNodeResult::ChainOverQuota => { if let Some(shard_conn) = self.shard_channels.get_mut(&shard_conn_id) { - let _ = shard_conn.unbounded_send(ToShardWebsocket::Mute { + let _ = shard_conn.send(ToShardWebsocket::Mute { local_id, reason: MuteReason::Overquota, }); @@ -326,7 +328,7 @@ impl InnerLoop { // Ask for the grographical location of the node. // Currently we only geographically locate IPV4 addresses so ignore IPV6. if let IpAddr::V4(ip_v4) = ip { - let _ = self.tx_to_locator.unbounded_send((node_id, ip_v4)); + let _ = self.tx_to_locator.send((node_id, ip_v4)); } } } @@ -409,7 +411,7 @@ impl InnerLoop { // Send this to the channel that subscribed: if let Some(bytes) = feed_serializer.into_finalized() { - let _ = channel.unbounded_send(ToFeedWebsocket::Bytes(bytes)); + let _ = channel.send(ToFeedWebsocket::Bytes(bytes)); } } FromFeedWebsocket::Ping { value } => { @@ -422,7 +424,7 @@ impl InnerLoop { let mut feed_serializer = FeedMessageSerializer::new(); feed_serializer.push(feed_message::Pong(&value)); if let Some(bytes) = feed_serializer.into_finalized() { - let _ = feed_channel.unbounded_send(ToFeedWebsocket::Bytes(bytes)); + let _ = feed_channel.send(ToFeedWebsocket::Bytes(bytes)); } } FromFeedWebsocket::Subscribe { chain } => { @@ -470,7 +472,7 @@ impl InnerLoop { new_chain.finalized_block().hash, )); if let Some(bytes) = feed_serializer.into_finalized() { - let _ = feed_channel.unbounded_send(ToFeedWebsocket::Bytes(bytes)); + let _ = feed_channel.send(ToFeedWebsocket::Bytes(bytes)); } // If many (eg 10k) nodes are connected, serializing all of their info takes time. @@ -505,7 +507,7 @@ impl InnerLoop { }) .collect(); for bytes in all_feed_messages { - let _ = feed_channel.unbounded_send(ToFeedWebsocket::Bytes(bytes)); + let _ = feed_channel.send(ToFeedWebsocket::Bytes(bytes)); } // Actually make a note of the new chain subsciption: @@ -620,7 +622,7 @@ impl InnerLoop { if let Some(feeds) = self.chain_to_feed_conn_ids.get(genesis_hash) { for &feed_id in feeds { if let Some(chan) = self.feed_channels.get_mut(&feed_id) { - let _ = chan.unbounded_send(message.clone()); + let _ = chan.send(message.clone()); } } } @@ -636,7 +638,7 @@ impl InnerLoop { /// Send a message to everybody. fn broadcast_to_all_feeds(&mut self, message: ToFeedWebsocket) { for chan in self.feed_channels.values_mut() { - let _ = chan.unbounded_send(message.clone()); + let _ = chan.send(message.clone()); } } @@ -662,7 +664,7 @@ impl InnerLoop { // are also subscribed to receive finality updates. for &feed_id in feeds.union(&self.feed_conn_id_finality) { if let Some(chan) = self.feed_channels.get_mut(&feed_id) { - let _ = chan.unbounded_send(message.clone()); + let _ = chan.send(message.clone()); } } } diff --git a/backend/telemetry_core/src/find_location.rs b/backend/telemetry_core/src/find_location.rs index 680a123..326bff1 100644 --- a/backend/telemetry_core/src/find_location.rs +++ b/backend/telemetry_core/src/find_location.rs @@ -17,7 +17,6 @@ use std::net::Ipv4Addr; use std::sync::Arc; -use futures::channel::mpsc; use futures::{Sink, SinkExt, StreamExt}; use parking_lot::RwLock; use rustc_hash::FxHashMap; @@ -31,12 +30,13 @@ pub type Location = Option>; /// This is responsible for taking an IP address and attempting /// to find a geographical location from this -pub fn find_location(response_chan: R) -> mpsc::UnboundedSender<(Id, Ipv4Addr)> +pub fn find_location(response_chan: R) -> flume::Sender<(Id, Ipv4Addr)> where R: Sink<(Id, Option>)> + Unpin + Send + Clone + 'static, Id: Clone + Send + 'static, { - let (tx, mut rx) = mpsc::unbounded(); + let (tx, rx) = flume::unbounded(); + let mut rx = rx.into_stream(); // cache entries let mut cache: FxHashMap>> = FxHashMap::default(); diff --git a/backend/telemetry_core/src/main.rs b/backend/telemetry_core/src/main.rs index d96ab9d..c55ade2 100644 --- a/backend/telemetry_core/src/main.rs +++ b/backend/telemetry_core/src/main.rs @@ -29,7 +29,7 @@ use bincode::Options; use common::http_utils; use common::internal_messages; use common::ready_chunks_all::ReadyChunksAll; -use futures::{channel::mpsc, SinkExt, StreamExt}; +use futures::{SinkExt, StreamExt}; use hyper::{Method, Response}; use simple_logger::SimpleLogger; use structopt::StructOpt; @@ -201,7 +201,8 @@ async fn handle_shard_websocket_connection( where S: futures::Sink + Unpin + Send + 'static, { - let (tx_to_shard_conn, mut rx_from_aggregator) = mpsc::unbounded(); + let (tx_to_shard_conn, rx_from_aggregator) = flume::unbounded(); + let mut rx_from_aggregator = rx_from_aggregator.into_stream(); // Tell the aggregator about this new connection, and give it a way to send messages to us: let init_msg = FromShardWebsocket::Initialize { @@ -343,8 +344,8 @@ where S: futures::Sink + Unpin + Send + 'static, { // unbounded channel so that slow feeds don't block aggregator progress: - let (tx_to_feed_conn, rx_from_aggregator) = mpsc::unbounded(); - let mut rx_from_aggregator_chunks = ReadyChunksAll::new(rx_from_aggregator); + let (tx_to_feed_conn, rx_from_aggregator) = flume::unbounded(); + let mut rx_from_aggregator_chunks = ReadyChunksAll::new(rx_from_aggregator.into_stream()); // Tell the aggregator about this new connection, and give it a way to send messages to us: let init_msg = FromFeedWebsocket::Initialize { From 11b0b3a3c7b87d902014e0d3d7ab3f14e072c5fa Mon Sep 17 00:00:00 2001 From: James Wilson Date: Tue, 10 Aug 2021 10:35:11 +0100 Subject: [PATCH 07/22] remove final use of futures::mpsc and replace with flume --- backend/telemetry_core/src/aggregator/aggregator_set.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/backend/telemetry_core/src/aggregator/aggregator_set.rs b/backend/telemetry_core/src/aggregator/aggregator_set.rs index 5fa991b..113eb15 100644 --- a/backend/telemetry_core/src/aggregator/aggregator_set.rs +++ b/backend/telemetry_core/src/aggregator/aggregator_set.rs @@ -52,8 +52,11 @@ impl AggregatorSet { .map(|a| a.subscribe_shard()) .collect(); + let (tx, rx) = flume::unbounded::(); + let mut rx = rx.into_stream(); + let tx = tx.into_sink(); + // Send every incoming message to all aggregators. - let (tx, mut rx) = futures::channel::mpsc::unbounded::(); tokio::spawn(async move { while let Some(msg) = rx.next().await { for conn in &mut conns { From bd7a21ec39f4b6be6220017d85a05f11fb15c61e Mon Sep 17 00:00:00 2001 From: James Wilson Date: Tue, 10 Aug 2021 11:19:26 +0100 Subject: [PATCH 08/22] Flumify everything --- backend/Cargo.lock | 3 + backend/common/Cargo.toml | 1 + backend/common/src/ws_client/connect.rs | 16 ++--- backend/common/src/ws_client/receiver.rs | 3 +- backend/common/src/ws_client/sender.rs | 50 ++++----------- .../src/aggregator/aggregator_set.rs | 8 +-- .../src/aggregator/inner_loop.rs | 7 +- backend/telemetry_core/src/find_location.rs | 5 +- backend/telemetry_shard/Cargo.toml | 1 + backend/telemetry_shard/src/aggregator.rs | 36 +++++------ backend/telemetry_shard/src/connection.rs | 37 +++++------ backend/telemetry_shard/src/main.rs | 6 +- backend/test_utils/Cargo.toml | 1 + backend/test_utils/src/server/channels.rs | 64 ++----------------- 14 files changed, 73 insertions(+), 165 deletions(-) diff --git a/backend/Cargo.lock b/backend/Cargo.lock index c9235fe..9de9792 100644 --- a/backend/Cargo.lock +++ b/backend/Cargo.lock @@ -202,6 +202,7 @@ dependencies = [ "bimap", "bincode", "bytes", + "flume", "fnv", "futures", "hex", @@ -1607,6 +1608,7 @@ dependencies = [ "anyhow", "bincode", "common", + "flume", "futures", "hex", "http", @@ -1644,6 +1646,7 @@ version = "0.1.0" dependencies = [ "anyhow", "common", + "flume", "futures", "http", "log", diff --git a/backend/common/Cargo.toml b/backend/common/Cargo.toml index a6fcd58..a0ccdfa 100644 --- a/backend/common/Cargo.toml +++ b/backend/common/Cargo.toml @@ -10,6 +10,7 @@ anyhow = "1.0.42" base64 = { default-features = false, features = ["alloc"], version = "0.13" } bimap = "0.6.1" bytes = "1.0.1" +flume = "0.10.8" fnv = "1.0.7" futures = "0.3.15" hex = "0.4.3" diff --git a/backend/common/src/ws_client/connect.rs b/backend/common/src/ws_client/connect.rs index 0ec6256..787a88b 100644 --- a/backend/common/src/ws_client/connect.rs +++ b/backend/common/src/ws_client/connect.rs @@ -14,8 +14,6 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . use super::on_close::OnClose; -use futures::channel::mpsc; -use futures::{SinkExt, StreamExt}; use soketto::handshake::{Client, ServerResponse}; use std::sync::Arc; use tokio::net::TcpStream; @@ -73,7 +71,7 @@ impl Connection { let mut rx_closed2 = tx_closed1.subscribe(); // Receive messages from the socket: - let (mut tx_to_external, rx_from_ws) = mpsc::unbounded(); + let (tx_to_external, rx_from_ws) = flume::unbounded(); tokio::spawn(async move { let mut send_to_external = true; loop { @@ -112,7 +110,7 @@ impl Connection { .map_err(|e| e.into()), }; - if let Err(e) = tx_to_external.send(msg).await { + if let Err(e) = tx_to_external.send_async(msg).await { // Our external channel may have closed or errored, but the socket hasn't // been closed, so keep receiving in order to allow the socket to continue to // function properly (we may be happy just sending messages to it), but stop @@ -124,12 +122,12 @@ impl Connection { }); // Send messages to the socket: - let (tx_to_ws, mut rx_from_external) = mpsc::unbounded(); + let (tx_to_ws, rx_from_external) = flume::unbounded::(); tokio::spawn(async move { loop { // Wait for messages, or bail entirely if asked to close. let msg = tokio::select! { - msg = rx_from_external.next() => { msg }, + msg = rx_from_external.recv_async() => { msg }, _ = rx_closed2.recv() => { // attempt to gracefully end the connection. let _ = ws_to_connection.close().await; @@ -141,8 +139,8 @@ impl Connection { // needs to keep receiving data for the WS connection to stay open, there's no // reason to keep this side of the loop open if our channel is closed. let msg = match msg { - None => break, - Some(msg) => msg, + Ok(msg) => msg, + _ => break, }; // We don't explicitly shut down the channel if we hit send errors. Why? Because the @@ -207,7 +205,7 @@ impl Connection { closer: Arc::clone(&on_close), }, Receiver { - inner: rx_from_ws, + inner: rx_from_ws.into_stream(), closer: on_close, }, ) diff --git a/backend/common/src/ws_client/receiver.rs b/backend/common/src/ws_client/receiver.rs index 6f45da7..e1e2397 100644 --- a/backend/common/src/ws_client/receiver.rs +++ b/backend/common/src/ws_client/receiver.rs @@ -15,13 +15,12 @@ // along with this program. If not, see . use super::on_close::OnClose; -use futures::channel::mpsc; use futures::{Stream, StreamExt}; use std::sync::Arc; /// Receive messages out of a connection pub struct Receiver { - pub(super) inner: mpsc::UnboundedReceiver>, + pub(super) inner: flume::r#async::RecvStream<'static, Result>, pub(super) closer: Arc, } diff --git a/backend/common/src/ws_client/sender.rs b/backend/common/src/ws_client/sender.rs index 45c3d66..9cb8ee7 100644 --- a/backend/common/src/ws_client/sender.rs +++ b/backend/common/src/ws_client/sender.rs @@ -15,8 +15,6 @@ // along with this program. If not, see . use super::on_close::OnClose; -use futures::channel::mpsc; -use futures::{Sink, SinkExt}; use std::sync::Arc; /// A message that can be sent into the channel interface @@ -41,62 +39,36 @@ pub enum SentMessage { /// Send messages into the connection #[derive(Clone)] pub struct Sender { - pub(super) inner: mpsc::UnboundedSender, + pub(super) inner: flume::Sender, pub(super) closer: Arc, } impl Sender { /// Ask the underlying Websocket connection to close. - pub async fn close(&mut self) -> Result<(), SendError> { + pub async fn close(&mut self) -> Result<(), SendError> { self.closer.0.send(()).map_err(|_| SendError::CloseError)?; Ok(()) } /// Returns whether this channel is closed. pub fn is_closed(&self) -> bool { - self.inner.is_closed() + self.inner.is_disconnected() } /// Unbounded send will always queue the message and doesn't /// need to be awaited. - pub fn unbounded_send(&self, msg: SentMessage) -> Result<(), SendError> { - self.inner - .unbounded_send(msg) - .map_err(|e| e.into_send_error())?; + pub fn unbounded_send(&self, msg: SentMessage) -> Result<(), flume::SendError> { + self.inner.send(msg)?; Ok(()) } + /// Convert this sender into a Sink + pub fn into_sink(self) -> impl futures::Sink + std::marker::Unpin + Clone + 'static { + self.inner.into_sink() + } } #[derive(thiserror::Error, Debug, Clone)] -pub enum SendError { +pub enum SendError { #[error("Failed to send message: {0}")] - ChannelError(#[from] mpsc::SendError), + ChannelError(#[from] flume::SendError), #[error("Failed to send close message")] CloseError, } - -impl Sink for Sender { - type Error = SendError; - fn poll_ready( - mut self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - self.inner.poll_ready_unpin(cx).map_err(|e| e.into()) - } - fn start_send( - mut self: std::pin::Pin<&mut Self>, - item: SentMessage, - ) -> Result<(), Self::Error> { - self.inner.start_send_unpin(item).map_err(|e| e.into()) - } - fn poll_flush( - mut self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - self.inner.poll_flush_unpin(cx).map_err(|e| e.into()) - } - fn poll_close( - mut self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - self.inner.poll_close_unpin(cx).map_err(|e| e.into()) - } -} diff --git a/backend/telemetry_core/src/aggregator/aggregator_set.rs b/backend/telemetry_core/src/aggregator/aggregator_set.rs index 113eb15..885c119 100644 --- a/backend/telemetry_core/src/aggregator/aggregator_set.rs +++ b/backend/telemetry_core/src/aggregator/aggregator_set.rs @@ -1,7 +1,7 @@ use super::aggregator::{Aggregator, AggregatorOpts}; use super::inner_loop; use common::EitherSink; -use futures::{Sink, SinkExt, StreamExt}; +use futures::{Sink, SinkExt}; use inner_loop::FromShardWebsocket; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; @@ -53,12 +53,10 @@ impl AggregatorSet { .collect(); let (tx, rx) = flume::unbounded::(); - let mut rx = rx.into_stream(); - let tx = tx.into_sink(); // Send every incoming message to all aggregators. tokio::spawn(async move { - while let Some(msg) = rx.next().await { + while let Ok(msg) = rx.recv_async().await { for conn in &mut conns { // Unbounded channel under the hood, so this await // shouldn't ever need to yield. @@ -70,7 +68,7 @@ impl AggregatorSet { } }); - EitherSink::b(tx.sink_map_err(|e| anyhow::anyhow!("{}", e))) + EitherSink::b(tx.into_sink().sink_map_err(|e| anyhow::anyhow!("{}", e))) } /// Return a sink that a feed can send messages into to be handled by a single aggregator. diff --git a/backend/telemetry_core/src/aggregator/inner_loop.rs b/backend/telemetry_core/src/aggregator/inner_loop.rs index 88cb402..9ae511e 100644 --- a/backend/telemetry_core/src/aggregator/inner_loop.rs +++ b/backend/telemetry_core/src/aggregator/inner_loop.rs @@ -25,7 +25,6 @@ use common::{ node_types::BlockHash, time, }; -use futures::StreamExt; use std::collections::{HashMap, HashSet}; use std::{ net::{IpAddr, Ipv4Addr}, @@ -184,8 +183,7 @@ impl InnerLoop { // check the length of the queue below to decide whether or not to // pass the message on to this. tokio::spawn(async move { - let mut metered_rx = metered_rx.into_stream(); - while let Some(msg) = metered_rx.next().await { + while let Ok(msg) = metered_rx.recv_async().await { match msg { ToAggregator::FromFeedWebsocket(feed_conn_id, msg) => { self.handle_from_feed(feed_conn_id, msg) @@ -215,8 +213,7 @@ impl InnerLoop { }); }); - let mut rx_from_external = rx_from_external.into_stream(); - while let Some(msg) = rx_from_external.next().await { + while let Ok(msg) = rx_from_external.recv_async().await { // ignore node updates if we have too many messages to handle, in an attempt // to reduce the queue length back to something reasonable, lest it get out of // control and start consuming a load of memory. diff --git a/backend/telemetry_core/src/find_location.rs b/backend/telemetry_core/src/find_location.rs index 326bff1..f020747 100644 --- a/backend/telemetry_core/src/find_location.rs +++ b/backend/telemetry_core/src/find_location.rs @@ -17,7 +17,7 @@ use std::net::Ipv4Addr; use std::sync::Arc; -use futures::{Sink, SinkExt, StreamExt}; +use futures::{Sink, SinkExt}; use parking_lot::RwLock; use rustc_hash::FxHashMap; use serde::Deserialize; @@ -36,7 +36,6 @@ where Id: Clone + Send + 'static, { let (tx, rx) = flume::unbounded(); - let mut rx = rx.into_stream(); // cache entries let mut cache: FxHashMap>> = FxHashMap::default(); @@ -61,7 +60,7 @@ where let semaphore = Arc::new(Semaphore::new(4)); loop { - while let Some((id, ip_address)) = rx.next().await { + while let Ok((id, ip_address)) = rx.recv_async().await { let permit = semaphore.clone().acquire_owned().await.unwrap(); let mut response_chan = response_chan.clone(); let locator = locator.clone(); diff --git a/backend/telemetry_shard/Cargo.toml b/backend/telemetry_shard/Cargo.toml index 2f9d505..370d739 100644 --- a/backend/telemetry_shard/Cargo.toml +++ b/backend/telemetry_shard/Cargo.toml @@ -9,6 +9,7 @@ license = "GPL-3.0" anyhow = "1.0.41" bincode = "1.3.3" common = { path = "../common" } +flume = "0.10.8" futures = "0.3.15" hex = "0.4.3" http = "0.2.4" diff --git a/backend/telemetry_shard/src/aggregator.rs b/backend/telemetry_shard/src/aggregator.rs index 1200240..50ded1d 100644 --- a/backend/telemetry_shard/src/aggregator.rs +++ b/backend/telemetry_shard/src/aggregator.rs @@ -21,8 +21,7 @@ use common::{ node_types::BlockHash, AssignId, }; -use futures::channel::mpsc; -use futures::{Sink, SinkExt, StreamExt}; +use futures::{Sink, SinkExt}; use std::collections::{HashMap, HashSet}; use std::sync::atomic::AtomicU64; use std::sync::Arc; @@ -60,7 +59,7 @@ pub enum FromWebsocket { /// the websocket connection and force the node to reconnect /// so that it sends its system info again incase the telemetry /// core has restarted. - close_connection: mpsc::Sender<()>, + close_connection: flume::Sender<()>, }, /// Tell the aggregator about a new node. Add { @@ -94,28 +93,28 @@ struct AggregatorInternal { /// Send messages to the aggregator from websockets via this. This is /// stored here so that anybody holding an `Aggregator` handle can /// make use of it. - tx_to_aggregator: mpsc::Sender, + tx_to_aggregator: flume::Sender, } impl Aggregator { /// Spawn a new Aggregator. This connects to the telemetry backend pub async fn spawn(telemetry_uri: http::Uri) -> anyhow::Result { - let (tx_to_aggregator, rx_from_external) = mpsc::channel(10); + let (tx_to_aggregator, rx_from_external) = flume::bounded(10); // Establish a resiliant connection to the core (this retries as needed): - let (tx_to_telemetry_core, mut rx_from_telemetry_core) = + let (tx_to_telemetry_core, rx_from_telemetry_core) = create_ws_connection_to_core(telemetry_uri).await; // Forward messages from the telemetry core into the aggregator: - let mut tx_to_aggregator2 = tx_to_aggregator.clone(); + let tx_to_aggregator2 = tx_to_aggregator.clone(); tokio::spawn(async move { - while let Some(msg) = rx_from_telemetry_core.next().await { + while let Ok(msg) = rx_from_telemetry_core.recv_async().await { let msg_to_aggregator = match msg { Message::Connected => ToAggregator::ConnectedToTelemetryCore, Message::Disconnected => ToAggregator::DisconnectedFromTelemetryCore, Message::Data(data) => ToAggregator::FromTelemetryCore(data), }; - if let Err(_) = tx_to_aggregator2.send(msg_to_aggregator).await { + if let Err(_) = tx_to_aggregator2.send_async(msg_to_aggregator).await { // This will close the ws channels, which themselves log messages. break; } @@ -139,8 +138,8 @@ impl Aggregator { // in to the aggregator. If nobody is holding the tx side of the channel // any more, this task will gracefully end. async fn handle_messages( - mut rx_from_external: mpsc::Receiver, - mut tx_to_telemetry_core: mpsc::Sender, + rx_from_external: flume::Receiver, + tx_to_telemetry_core: flume::Sender, ) { use internal_messages::{FromShardAggregator, FromTelemetryCore}; @@ -150,7 +149,7 @@ impl Aggregator { // A list of close channels for the currently connected substrate nodes. Send an empty // tuple to these to ask the connections to be closed. - let mut close_connections: HashMap> = HashMap::new(); + let mut close_connections: HashMap> = HashMap::new(); // Maintain mappings from the connection ID and node message ID to the "local ID" which we // broadcast to the telemetry core. @@ -160,15 +159,15 @@ impl Aggregator { let mut muted: HashSet = HashSet::new(); // Now, loop and receive messages to handle. - while let Some(msg) = rx_from_external.next().await { + while let Ok(msg) = rx_from_external.recv_async().await { match msg { ToAggregator::ConnectedToTelemetryCore => { // Take hold of the connection closers and run them all. let closers = close_connections; - for (_, mut closer) in closers { + for (_, closer) in closers { // if this fails, it probably means the connection has died already anyway. - let _ = closer.send(()).await; + let _ = closer.send_async(()).await; } // We've told everything to disconnect. Now, reset our state: @@ -212,7 +211,7 @@ impl Aggregator { // Send the message to the telemetry core with this local ID: let _ = tx_to_telemetry_core - .send(FromShardAggregator::AddNode { + .send_async(FromShardAggregator::AddNode { ip, node, genesis_hash, @@ -245,7 +244,7 @@ impl Aggregator { // Send the message to the telemetry core with this local ID: let _ = tx_to_telemetry_core - .send(FromShardAggregator::UpdateNode { local_id, payload }) + .send_async(FromShardAggregator::UpdateNode { local_id, payload }) .await; } ToAggregator::FromWebsocket(disconnected_conn_id, FromWebsocket::Disconnected) => { @@ -264,7 +263,7 @@ impl Aggregator { to_local_id.remove_by_id(local_id); muted.remove(&local_id); let _ = tx_to_telemetry_core - .send(FromShardAggregator::RemoveNode { local_id }) + .send_async(FromShardAggregator::RemoveNode { local_id }) .await; } } @@ -293,6 +292,7 @@ impl Aggregator { // but pinning by boxing is the easy solution for now: Box::pin( tx_to_aggregator + .into_sink() .with(move |msg| async move { Ok(ToAggregator::FromWebsocket(conn_id, msg)) }), ) } diff --git a/backend/telemetry_shard/src/connection.rs b/backend/telemetry_shard/src/connection.rs index 4ebf39d..4b24bee 100644 --- a/backend/telemetry_shard/src/connection.rs +++ b/backend/telemetry_shard/src/connection.rs @@ -16,8 +16,7 @@ use bincode::Options; use common::ws_client; -use futures::channel::mpsc; -use futures::{SinkExt, StreamExt}; +use futures::StreamExt; #[derive(Clone, Debug)] pub enum Message { @@ -36,13 +35,13 @@ pub enum Message { /// between aggregator and core. pub async fn create_ws_connection_to_core( telemetry_uri: http::Uri, -) -> (mpsc::Sender, mpsc::Receiver>) +) -> (flume::Sender, flume::Receiver>) where In: serde::Serialize + Send + 'static, Out: serde::de::DeserializeOwned + Send + 'static, { - let (tx_in, mut rx_in) = mpsc::channel(10); - let (mut tx_out, rx_out) = mpsc::channel(10); + let (tx_in, rx_in) = flume::bounded::(10); + let (tx_out, rx_out) = flume::bounded(10); let mut is_connected = false; @@ -51,7 +50,7 @@ where // Throw away any pending messages from the incoming channel so that it // doesn't get filled up and begin blocking while we're looping and waiting // for a reconnection. - while let Ok(Some(_)) = rx_in.try_next() {} + while let Ok(_) = rx_in.try_recv() {} // Try to connect. If connection established, we serialize and forward messages // to/from the core. If the external channels break, we end for good. If the internal @@ -60,9 +59,9 @@ where Ok(connection) => { let (tx_to_core, mut rx_from_core) = connection.into_channels(); is_connected = true; - let mut tx_out = tx_out.clone(); + let tx_out = tx_out.clone(); - if let Err(e) = tx_out.send(Message::Connected).await { + if let Err(e) = tx_out.send_async(Message::Connected).await { // If receiving end is closed, bail now. log::warn!("Aggregator is no longer receiving messages from core; disconnecting (permanently): {}", e); return; @@ -73,35 +72,31 @@ where tokio::select! { msg = rx_from_core.next() => { let msg = match msg { - Some(msg) => msg, + Some(Ok(msg)) => msg, // No more messages from core? core WS is disconnected. - None => { + _ => { log::warn!("No more messages from core: shutting down connection (will reconnect)"); break } }; let bytes = match msg { - Ok(ws_client::RecvMessage::Binary(bytes)) => bytes, - Ok(ws_client::RecvMessage::Text(s)) => s.into_bytes(), - Err(e) => { - log::warn!("Unable to receive message from core: shutting down connection (will reconnect): {}", e); - break; - } + ws_client::RecvMessage::Binary(bytes) => bytes, + ws_client::RecvMessage::Text(s) => s.into_bytes() }; let msg = bincode::options() .deserialize(&bytes) .expect("internal messages must be deserializable"); - if let Err(e) = tx_out.send(Message::Data(msg)).await { + if let Err(e) = tx_out.send_async(Message::Data(msg)).await { log::error!("Aggregator is no longer receiving messages from core; disconnecting (permanently): {}", e); return; } }, - msg = rx_in.next() => { + msg = rx_in.recv_async() => { let msg = match msg { - Some(msg) => msg, - None => { + Ok(msg) => msg, + Err(flume::RecvError::Disconnected) => { log::error!("Aggregator is no longer sending messages to core; disconnecting (permanently)"); return } @@ -131,7 +126,7 @@ where if is_connected { is_connected = false; - if let Err(e) = tx_out.send(Message::Disconnected).await { + if let Err(e) = tx_out.send_async(Message::Disconnected).await { log::error!("Aggregator is no longer receiving messages from core; disconnecting (permanently): {}", e); return; } diff --git a/backend/telemetry_shard/src/main.rs b/backend/telemetry_shard/src/main.rs index 4e62d76..86f632c 100644 --- a/backend/telemetry_shard/src/main.rs +++ b/backend/telemetry_shard/src/main.rs @@ -29,7 +29,7 @@ use common::byte_size::ByteSize; use common::http_utils; use common::node_message; use common::rolling_total::RollingTotalBuilder; -use futures::{channel::mpsc, SinkExt, StreamExt}; +use futures::SinkExt; use http::Uri; use hyper::{Method, Response}; use simple_logger::SimpleLogger; @@ -203,7 +203,7 @@ where // This could be a oneshot channel, but it's useful to be able to clone // messages, and we can't clone oneshot channel senders. - let (close_connection_tx, mut close_connection_rx) = mpsc::channel(0); + let (close_connection_tx, close_connection_rx) = flume::bounded(1); // Tell the aggregator about this new connection, and give it a way to close this connection: let init_msg = FromWebsocket::Initialize { @@ -223,7 +223,7 @@ where tokio::select! { // The close channel has fired, so end the loop. `ws_recv.receive_data` is // *not* cancel safe, but since we're closing the connection we don't care. - _ = close_connection_rx.next() => { + _ = close_connection_rx.recv_async() => { log::info!("connection to {:?} being closed by aggregator", real_addr); break }, diff --git a/backend/test_utils/Cargo.toml b/backend/test_utils/Cargo.toml index e3800c7..7dccdae 100644 --- a/backend/test_utils/Cargo.toml +++ b/backend/test_utils/Cargo.toml @@ -18,3 +18,4 @@ tokio = { version = "1.7.1", features = ["full"] } tokio-util = { version = "0.6.7", features = ["full"] } common = { path = "../common" } time = { version = "0.3.0", features = ["formatting"] } +flume = "0.10.8" diff --git a/backend/test_utils/src/server/channels.rs b/backend/test_utils/src/server/channels.rs index f130f7e..4ca9274 100644 --- a/backend/test_utils/src/server/channels.rs +++ b/backend/test_utils/src/server/channels.rs @@ -21,7 +21,7 @@ use std::{ use crate::feed_message_de::FeedMessage; use common::ws_client; -use futures::{Sink, SinkExt, Stream, StreamExt}; +use futures::{Stream, StreamExt}; /// Wrap a `ws_client::Sender` with convenient utility methods for shard connections pub struct ShardSender(ws_client::Sender); @@ -32,45 +32,17 @@ impl From for ShardSender { } } -impl Sink for ShardSender { - type Error = ws_client::SendError; - fn poll_ready( - mut self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - self.0.poll_ready_unpin(cx) - } - fn start_send( - mut self: std::pin::Pin<&mut Self>, - item: ws_client::SentMessage, - ) -> Result<(), Self::Error> { - self.0.start_send_unpin(item) - } - fn poll_flush( - mut self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - self.0.poll_flush_unpin(cx) - } - fn poll_close( - mut self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - self.0.poll_close_unpin(cx) - } -} - impl ShardSender { /// Send JSON as a binary websocket message pub fn send_json_binary( &mut self, json: serde_json::Value, - ) -> Result<(), ws_client::SendError> { + ) -> Result<(), flume::SendError> { let bytes = serde_json::to_vec(&json).expect("valid bytes"); self.unbounded_send(ws_client::SentMessage::Binary(bytes)) } /// Send JSON as a textual websocket message - pub fn send_json_text(&mut self, json: serde_json::Value) -> Result<(), ws_client::SendError> { + pub fn send_json_text(&mut self, json: serde_json::Value) -> Result<(), flume::SendError> { let s = serde_json::to_string(&json).expect("valid string"); self.unbounded_send(ws_client::SentMessage::Text(s)) } @@ -128,34 +100,6 @@ impl From for FeedSender { } } -impl Sink for FeedSender { - type Error = ws_client::SendError; - fn poll_ready( - mut self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - self.0.poll_ready_unpin(cx) - } - fn start_send( - mut self: std::pin::Pin<&mut Self>, - item: ws_client::SentMessage, - ) -> Result<(), Self::Error> { - self.0.start_send_unpin(item) - } - fn poll_flush( - mut self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - self.0.poll_flush_unpin(cx) - } - fn poll_close( - mut self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - self.0.poll_close_unpin(cx) - } -} - impl Deref for FeedSender { type Target = ws_client::Sender; fn deref(&self) -> &Self::Target { @@ -176,7 +120,7 @@ impl FeedSender { &self, command: S, param: S, - ) -> Result<(), ws_client::SendError> { + ) -> Result<(), flume::SendError> { self.unbounded_send(ws_client::SentMessage::Text(format!( "{}:{}", command.as_ref(), From e3fcd4e8c2b7a68ef96ee0875735abd1c8c8cf92 Mon Sep 17 00:00:00 2001 From: James Wilson Date: Tue, 10 Aug 2021 11:47:54 +0100 Subject: [PATCH 09/22] Clean up soak test runner and add more config options --- backend/telemetry_core/tests/soak_tests.rs | 248 +++------------------ 1 file changed, 36 insertions(+), 212 deletions(-) diff --git a/backend/telemetry_core/tests/soak_tests.rs b/backend/telemetry_core/tests/soak_tests.rs index 78b3dbe..a49be8b 100644 --- a/backend/telemetry_core/tests/soak_tests.rs +++ b/backend/telemetry_core/tests/soak_tests.rs @@ -34,207 +34,17 @@ In general, if you run into issues, it may be better to run this on a linux box; MacOS seems to hit limits quicker in general. */ -use common::node_types::BlockHash; use common::ws_client::SentMessage; use futures::{future, StreamExt}; -use serde_json::json; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use std::time::Duration; use structopt::StructOpt; use test_utils::workspace::{start_server, CoreOpts, ServerOpts, ShardOpts}; -/// A configurable soak_test runner. Configure by providing the expected args as -/// an environment variable. One example to run this test is: -/// -/// ```sh -/// SOAK_TEST_ARGS='--feeds 10 --nodes 100 --shards 4' cargo test --release -- soak_test --ignored --nocapture -/// ``` -/// -/// You can also run this test against the pre-sharding actix binary with something like this: -/// ```sh -/// TELEMETRY_BIN=~/old_telemetry_binary SOAK_TEST_ARGS='--feeds 100 --nodes 100 --shards 4' cargo test --release -- soak_test --ignored --nocapture -/// ``` -/// -/// Or, you can run it against existing processes with something like this: -/// ```sh -/// TELEMETRY_SUBMIT_HOSTS='127.0.0.1:8001' TELEMETRY_FEED_HOST='127.0.0.1:8000' SOAK_TEST_ARGS='--feeds 100 --nodes 100 --shards 4' cargo test --release -- soak_test --ignored --nocapture -/// ``` -/// -/// Each will establish the same total number of connections and send the same messages. -#[ignore] -#[tokio::test(flavor = "multi_thread", worker_threads = 4)] -pub async fn soak_test() { - let opts = get_soak_test_opts(); - run_soak_test(opts).await; -} - -/// A general soak test runner. -/// This test sends the same message over and over, and so -/// the results should be pretty reproducible. -async fn run_soak_test(opts: SoakTestOpts) { - let mut server = start_server( - ServerOpts { - release_mode: true, - log_output: opts.log_output, - ..Default::default() - }, - CoreOpts { - worker_threads: opts.core_worker_threads, - ..Default::default() - }, - ShardOpts { - worker_threads: opts.shard_worker_threads, - ..Default::default() - }, - ) - .await; - println!("Telemetry core running at {}", server.get_core().host()); - - // Start up the shards we requested: - let mut shard_ids = vec![]; - for _ in 0..opts.shards { - let shard_id = server.add_shard().await.expect("shard can't be added"); - shard_ids.push(shard_id); - } - - // Connect nodes to each shard: - let mut nodes = vec![]; - for &shard_id in &shard_ids { - let mut conns = server - .get_shard(shard_id) - .unwrap() - .connect_multiple_nodes(opts.nodes) - .await - .expect("node connections failed"); - nodes.append(&mut conns); - } - - // Each node tells the shard about itself: - for (idx, (node_tx, _)) in nodes.iter_mut().enumerate() { - node_tx - .send_json_binary(json!({ - "id":1, // Only needs to be unique per node - "ts":"2021-07-12T10:37:47.714666+01:00", - "payload": { - "authority":true, - "chain": "Polkadot", // <- so that we don't go over quota with lots of nodes. - "config":"", - "genesis_hash": BlockHash::from_low_u64_ne(1), - "implementation":"Substrate Node", - "msg":"system.connected", - "name": format!("Node #{}", idx), - "network_id":"12D3KooWEyoppNCUx8Yx66oV9fJnriXwCcXwDDUA2kj6vnc6iDEp", - "startup_time":"1625565542717", - "version":"2.0.0-07a1af348-aarch64-macos" - }, - })) - .unwrap(); - } - - // Connect feeds to the core: - let mut feeds = server - .get_core() - .connect_multiple_feeds(opts.feeds) - .await - .expect("feed connections failed"); - - // Every feed subscribes to the chain above to recv messages about it: - for (feed_tx, _) in &mut feeds { - feed_tx.send_command("subscribe", "Polkadot").unwrap(); - } - - // Start sending "update" messages from nodes at time intervals. - let bytes_in = Arc::new(AtomicUsize::new(0)); - let bytes_in2 = Arc::clone(&bytes_in); - tokio::task::spawn(async move { - let msg = json!({ - "id":1, - "payload":{ - "bandwidth_download":576, - "bandwidth_upload":576, - "msg":"system.interval", - "peers":1 - }, - "ts":"2021-07-12T10:37:48.330433+01:00" - }); - let msg_bytes: &'static [u8] = Box::new(serde_json::to_vec(&msg).unwrap()).leak(); - - loop { - // every ~1second we aim to have sent messages from all of the nodes. So we cycle through - // the node IDs and send a message from each at roughly 1s / number_of_nodes. - let mut interval = - tokio::time::interval(Duration::from_secs_f64(1.0 / nodes.len() as f64)); - - for node_id in (0..nodes.len()).cycle() { - interval.tick().await; - let node_tx = &mut nodes[node_id].0; - node_tx - .unbounded_send(SentMessage::StaticBinary(msg_bytes)) - .unwrap(); - bytes_in2.fetch_add(msg_bytes.len(), Ordering::Relaxed); - } - } - }); - - // Also start receiving messages, counting the bytes received so far. - let bytes_out = Arc::new(AtomicUsize::new(0)); - let msgs_out = Arc::new(AtomicUsize::new(0)); - for (_, mut feed_rx) in feeds { - let bytes_out = Arc::clone(&bytes_out); - let msgs_out = Arc::clone(&msgs_out); - tokio::task::spawn(async move { - while let Some(msg) = feed_rx.next().await { - let msg = msg.expect("message could be received"); - let num_bytes = msg.len(); - bytes_out.fetch_add(num_bytes, Ordering::Relaxed); - msgs_out.fetch_add(1, Ordering::Relaxed); - } - eprintln!("Error: feed has been closed unexpectedly"); - }); - } - - // Periodically report on bytes out - tokio::task::spawn(async move { - let one_mb = 1024.0 * 1024.0; - let mut last_bytes_in = 0; - let mut last_bytes_out = 0; - let mut last_msgs_out = 0; - let mut n = 1; - loop { - tokio::time::sleep(Duration::from_secs(1)).await; - let bytes_in_val = bytes_in.load(Ordering::Relaxed); - let bytes_out_val = bytes_out.load(Ordering::Relaxed); - let msgs_out_val = msgs_out.load(Ordering::Relaxed); - - println!( - "#{}: MB in/out per measurement: {:.4} / {:.4}, total bytes in/out: {} / {}, msgs out: {}, total msgs out: {})", - n, - (bytes_in_val - last_bytes_in) as f64 / one_mb, - (bytes_out_val - last_bytes_out) as f64 / one_mb, - bytes_in_val, - bytes_out_val, - (msgs_out_val - last_msgs_out), - msgs_out_val - ); - - n += 1; - last_bytes_in = bytes_in_val; - last_bytes_out = bytes_out_val; - last_msgs_out = msgs_out_val; - } - }); - - // Wait forever. - future::pending().await -} - -/// Identical to `soak_test`, except that we try to send realistic messages from fake nodes. -/// This means it's potentially less reproducable, but presents a more accurate picture of -/// the load, and lets us see the UI working more or less. -/// -/// We can provide the same arguments as we would to `soak_test`: +/// A test runner which sends realistic(ish) messages from fake nodes to a telemetry server. /// +/// To start up 4 telemetry_shards and 1 telemetry_core with 10 feeds and 100 nodes: /// ```sh /// SOAK_TEST_ARGS='--feeds 10 --nodes 100 --shards 4' cargo test --release -- realistic_soak_test --ignored --nocapture /// ``` @@ -244,16 +54,22 @@ async fn run_soak_test(opts: SoakTestOpts) { /// TELEMETRY_BIN=~/old_telemetry_binary SOAK_TEST_ARGS='--feeds 100 --nodes 100 --shards 4' cargo test --release -- realistic_soak_test --ignored --nocapture /// ``` /// -/// Or, you can run it against existing processes with something like this: +/// Or, you can run it against existing processes on the network with something like this: /// ```sh /// TELEMETRY_SUBMIT_HOSTS='127.0.0.1:8001' TELEMETRY_FEED_HOST='127.0.0.1:8000' SOAK_TEST_ARGS='--feeds 100 --nodes 100 --shards 4' cargo test --release -- realistic_soak_test --ignored --nocapture /// ``` /// #[ignore] -#[tokio::test(flavor = "multi_thread", worker_threads = 4)] -pub async fn realistic_soak_test() { +#[test] +pub fn soak_test() { let opts = get_soak_test_opts(); - run_realistic_soak_test(opts).await; + + tokio::runtime::Builder::new_multi_thread() + .worker_threads(opts.test_worker_threads) + .thread_name("telemetry_test_runner") + .build() + .unwrap() + .block_on(run_realistic_soak_test(opts)); } /// A general soak test runner. @@ -300,28 +116,30 @@ async fn run_realistic_soak_test(opts: SoakTestOpts) { // Start nodes talking to the shards: let bytes_in = Arc::new(AtomicUsize::new(0)); + let ids_per_node = opts.ids_per_node; for node in nodes.into_iter().enumerate() { let bytes_in = Arc::clone(&bytes_in); tokio::spawn(async move { let (idx, (tx, _)) = node; + for id in 0..ids_per_node { + let telemetry = test_utils::fake_telemetry::FakeTelemetry::new( + Duration::from_secs(3), + format!("Node {}", idx + 1), + "Polkadot".to_owned(), + id + 1, + ); - let telemetry = test_utils::fake_telemetry::FakeTelemetry::new( - Duration::from_secs(3), - format!("Node {}", idx + 1), - "Polkadot".to_owned(), - idx + 1, - ); + let res = telemetry + .start(|msg| async { + bytes_in.fetch_add(msg.len(), Ordering::Relaxed); + tx.unbounded_send(SentMessage::Binary(msg))?; + Ok::<_, anyhow::Error>(()) + }) + .await; - let res = telemetry - .start(|msg| async { - bytes_in.fetch_add(msg.len(), Ordering::Relaxed); - tx.unbounded_send(SentMessage::Binary(msg))?; - Ok::<_, anyhow::Error>(()) - }) - .await; - - if let Err(e) = res { - log::error!("Telemetry Node #{} has died with error: {}", idx, e); + if let Err(e) = res { + log::error!("Telemetry Node #{} has died with error: {}", idx, e); + } } }); } @@ -404,6 +222,9 @@ struct SoakTestOpts { /// The number of nodes to connect to each feed #[structopt(long)] nodes: usize, + /// The number of different virtual nodes to connect per actual node socket connection + #[structopt(long, default_value = "1")] + ids_per_node: usize, /// Number of aggregator loops to use in the core #[structopt(long)] core_num_aggregators: Option, @@ -416,6 +237,9 @@ struct SoakTestOpts { /// Should we log output from the core/shards to stdout? #[structopt(long)] log_output: bool, + /// How many worker threads should the soak test runner use? + #[structopt(long, default_value = "4")] + test_worker_threads: usize } /// Get soak test args from an envvar and parse them via structopt. From 20463ce1592da1bb907b4421fb72a91335ed6f52 Mon Sep 17 00:00:00 2001 From: James Wilson Date: Tue, 10 Aug 2021 11:49:40 +0100 Subject: [PATCH 10/22] test runner: enable tokio features --- backend/telemetry_core/tests/soak_tests.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/backend/telemetry_core/tests/soak_tests.rs b/backend/telemetry_core/tests/soak_tests.rs index a49be8b..cd1f394 100644 --- a/backend/telemetry_core/tests/soak_tests.rs +++ b/backend/telemetry_core/tests/soak_tests.rs @@ -66,6 +66,7 @@ pub fn soak_test() { tokio::runtime::Builder::new_multi_thread() .worker_threads(opts.test_worker_threads) + .enable_all() .thread_name("telemetry_test_runner") .build() .unwrap() From f72f8c1fd56de2cde76a3dfa36e75eb5beec005c Mon Sep 17 00:00:00 2001 From: James Wilson Date: Tue, 10 Aug 2021 11:54:14 +0100 Subject: [PATCH 11/22] test runner: fix soak test for multiple ids per ndoe --- backend/telemetry_core/tests/soak_tests.rs | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/backend/telemetry_core/tests/soak_tests.rs b/backend/telemetry_core/tests/soak_tests.rs index cd1f394..da7e93b 100644 --- a/backend/telemetry_core/tests/soak_tests.rs +++ b/backend/telemetry_core/tests/soak_tests.rs @@ -119,10 +119,12 @@ async fn run_realistic_soak_test(opts: SoakTestOpts) { let bytes_in = Arc::new(AtomicUsize::new(0)); let ids_per_node = opts.ids_per_node; for node in nodes.into_iter().enumerate() { - let bytes_in = Arc::clone(&bytes_in); - tokio::spawn(async move { - let (idx, (tx, _)) = node; - for id in 0..ids_per_node { + let (idx, (tx, _)) = node; + for id in 0..ids_per_node { + let bytes_in = Arc::clone(&bytes_in); + let tx = tx.clone(); + + tokio::spawn(async move { let telemetry = test_utils::fake_telemetry::FakeTelemetry::new( Duration::from_secs(3), format!("Node {}", idx + 1), @@ -141,8 +143,8 @@ async fn run_realistic_soak_test(opts: SoakTestOpts) { if let Err(e) = res { log::error!("Telemetry Node #{} has died with error: {}", idx, e); } - } - }); + }); + } } // Connect feeds to the core: From 3319709f7be7f5d20748cb0e755a9b5a70f687d9 Mon Sep 17 00:00:00 2001 From: James Wilson Date: Tue, 10 Aug 2021 14:52:28 +0100 Subject: [PATCH 12/22] Add periodic interval to core loop and print debug info --- .../src/aggregator/inner_loop.rs | 57 +++++++++++++++++++ 1 file changed, 57 insertions(+) diff --git a/backend/telemetry_core/src/aggregator/inner_loop.rs b/backend/telemetry_core/src/aggregator/inner_loop.rs index 9ae511e..4a1e314 100644 --- a/backend/telemetry_core/src/aggregator/inner_loop.rs +++ b/backend/telemetry_core/src/aggregator/inner_loop.rs @@ -37,6 +37,10 @@ pub enum ToAggregator { FromShardWebsocket(ConnId, FromShardWebsocket), FromFeedWebsocket(ConnId, FromFeedWebsocket), FromFindLocation(NodeId, find_location::Location), + /// This message is sent periodically and allows us to monitor + /// or cleanup things in our inner loop. The channel provided + /// is notified when the interval has been handled. + Interval(flume::Sender<()>), } /// An incoming shard connection can send these messages to the aggregator. @@ -193,11 +197,31 @@ impl InnerLoop { } ToAggregator::FromFindLocation(node_id, location) => { self.handle_from_find_location(node_id, location) + }, + ToAggregator::Interval(tx) => { + self.handle_interval(tx) } } } }); + // Periodically send interval messages for cleanup/monitoring. At most 1 + // every 60 seconds, but if the message queue is backed up it may take longer. + tokio::spawn({ + let metered_tx = metered_tx.clone(); + async move { + loop { + let now = tokio::time::Instant::now(); + let (tx, rx) = flume::unbounded(); + + let _ = metered_tx.send_async(ToAggregator::Interval(tx)).await; + let _ = rx.recv_async().await; + + tokio::time::sleep_until(now + tokio::time::Duration::from_secs(60)).await; + } + } + }); + // TEMP: let's monitor message queue len out of interest let tx_len = metered_tx.clone(); std::thread::spawn(move || { @@ -233,6 +257,39 @@ impl InnerLoop { } } + /// The periodic interval message gives us a chance to do some tidy up or monitoring. + fn handle_interval(&mut self, rx: flume::Sender<()>) { + + let node_ids = self.node_ids.len(); + let node_count: usize = self.node_state + .iter_chains() + .map(|c| c.node_count()) + .sum(); + + let shard_cound = self.shard_channels.len(); + let connected_feeds = self.feed_channels.len(); + let finality_feeds = self.feed_conn_id_finality.len(); + let feed_to_chain = self.feed_conn_id_to_chain.len(); + + let num_subscribed_chains = self.chain_to_feed_conn_ids.len(); + let num_subscribed_chain_feeds: usize = self.chain_to_feed_conn_ids + .values() + .map(|c| c.len()) + .sum(); + + println!("Periodic update at {:?}:", std::time::SystemTime::now()); + dbg!(node_ids); + dbg!(node_count); + dbg!(shard_cound); + dbg!(connected_feeds); + dbg!(finality_feeds); + dbg!(feed_to_chain); + dbg!(num_subscribed_chains); + dbg!(num_subscribed_chain_feeds); + + drop(rx); + } + /// Handle messages that come from the node geographical locator. fn handle_from_find_location(&mut self, node_id: NodeId, location: find_location::Location) { self.node_state From ab2303ce5cdc68157c2dc80bd87ca9ed8434d63e Mon Sep 17 00:00:00 2001 From: James Wilson Date: Tue, 10 Aug 2021 16:49:04 +0100 Subject: [PATCH 13/22] more diagnostic logging --- backend/telemetry_core/src/aggregator/inner_loop.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/backend/telemetry_core/src/aggregator/inner_loop.rs b/backend/telemetry_core/src/aggregator/inner_loop.rs index 4a1e314..bdb3c94 100644 --- a/backend/telemetry_core/src/aggregator/inner_loop.rs +++ b/backend/telemetry_core/src/aggregator/inner_loop.rs @@ -277,6 +277,11 @@ impl InnerLoop { .map(|c| c.len()) .sum(); + let num_messages_to_feeds = self.feed_channels + .values() + .map(|c| c.len()) + .sum(); + println!("Periodic update at {:?}:", std::time::SystemTime::now()); dbg!(node_ids); dbg!(node_count); @@ -286,6 +291,7 @@ impl InnerLoop { dbg!(feed_to_chain); dbg!(num_subscribed_chains); dbg!(num_subscribed_chain_feeds); + dbg!(num_messages_to_feeds); drop(rx); } From 6db7f484ef5a6bd27a76e9114f414ac0dab662ca Mon Sep 17 00:00:00 2001 From: James Wilson Date: Tue, 10 Aug 2021 16:51:09 +0100 Subject: [PATCH 14/22] Fix compile err with diagnostic msg --- backend/telemetry_core/src/aggregator/inner_loop.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/telemetry_core/src/aggregator/inner_loop.rs b/backend/telemetry_core/src/aggregator/inner_loop.rs index bdb3c94..6598bfa 100644 --- a/backend/telemetry_core/src/aggregator/inner_loop.rs +++ b/backend/telemetry_core/src/aggregator/inner_loop.rs @@ -277,7 +277,7 @@ impl InnerLoop { .map(|c| c.len()) .sum(); - let num_messages_to_feeds = self.feed_channels + let num_messages_to_feeds: usize = self.feed_channels .values() .map(|c| c.len()) .sum(); From 4f7b2c8ec59587ba11908bec975dd0719f389797 Mon Sep 17 00:00:00 2001 From: James Wilson Date: Tue, 10 Aug 2021 17:16:33 +0100 Subject: [PATCH 15/22] Confirm that densemap len wont panic if lots of retired items --- backend/common/src/dense_map.rs | 34 +++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/backend/common/src/dense_map.rs b/backend/common/src/dense_map.rs index f3ee90f..3dfd420 100644 --- a/backend/common/src/dense_map.rs +++ b/backend/common/src/dense_map.rs @@ -131,3 +131,37 @@ where } } } + +#[cfg(test)] +mod test { + + use super::*; + + #[test] + fn len_doesnt_panic_if_lots_of_retired() { + + let mut map = DenseMap::::new(); + + let id1 = map.add(1); + let id2 = map.add(2); + let id3 = map.add(3); + + assert_eq!(map.len(), 3); + + map.remove(id1); + map.remove(id2); + + assert_eq!(map.len(), 1); + + map.remove(id3); + + assert_eq!(map.len(), 0); + + map.remove(id1); + map.remove(id1); + map.remove(id1); + + assert_eq!(map.len(), 0); + } + +} \ No newline at end of file From 92da674d4d3f4e57846942a3bba4c38b8c98286f Mon Sep 17 00:00:00 2001 From: James Wilson Date: Wed, 11 Aug 2021 15:58:48 +0100 Subject: [PATCH 16/22] Expose metrics in a format that prometheus understands --- .../src/aggregator/aggregator.rs | 11 ++ .../src/aggregator/aggregator_set.rs | 57 ++++++++- .../src/aggregator/inner_loop.rs | 119 ++++++++---------- backend/telemetry_core/src/main.rs | 31 ++++- 4 files changed, 141 insertions(+), 77 deletions(-) diff --git a/backend/telemetry_core/src/aggregator/aggregator.rs b/backend/telemetry_core/src/aggregator/aggregator.rs index 98f4935..8ba8cc2 100644 --- a/backend/telemetry_core/src/aggregator/aggregator.rs +++ b/backend/telemetry_core/src/aggregator/aggregator.rs @@ -98,6 +98,17 @@ impl Aggregator { .await; } + /// Gather metrics from our aggregator loop + pub async fn gather_metrics(&self) -> anyhow::Result { + let (tx, rx) = flume::unbounded(); + let msg = inner_loop::ToAggregator::GatherMetrics(tx); + + self.0.tx_to_aggregator.send_async(msg).await?; + + let metrics = rx.recv_async().await?; + Ok(metrics) + } + /// Return a sink that a shard can send messages into to be handled by the aggregator. pub fn subscribe_shard( &self, diff --git a/backend/telemetry_core/src/aggregator/aggregator_set.rs b/backend/telemetry_core/src/aggregator/aggregator_set.rs index 885c119..8fb6251 100644 --- a/backend/telemetry_core/src/aggregator/aggregator_set.rs +++ b/backend/telemetry_core/src/aggregator/aggregator_set.rs @@ -2,9 +2,9 @@ use super::aggregator::{Aggregator, AggregatorOpts}; use super::inner_loop; use common::EitherSink; use futures::{Sink, SinkExt}; -use inner_loop::FromShardWebsocket; +use inner_loop::{ Metrics, FromShardWebsocket }; use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::Arc; +use std::sync::{ Arc, Mutex }; #[derive(Clone)] pub struct AggregatorSet(Arc); @@ -12,6 +12,7 @@ pub struct AggregatorSet(Arc); pub struct AggregatorSetInner { aggregators: Vec, next_idx: AtomicUsize, + metrics: Mutex> } impl AggregatorSet { @@ -27,10 +28,58 @@ impl AggregatorSet { ) .await?; - Ok(AggregatorSet(Arc::new(AggregatorSetInner { + let initial_metrics = (0..num_aggregators) + .map(|_| Metrics::default()) + .collect(); + + let this = AggregatorSet(Arc::new(AggregatorSetInner { aggregators, next_idx: AtomicUsize::new(0), - }))) + metrics: Mutex::new(initial_metrics) + })); + + // Start asking for metrics: + this.spawn_metrics_loops(); + + Ok(this) + } + + /// Spawn loops which periodically ask for metrics from each internal aggregator. + /// Depending on how busy the aggregators are, these metrics won't necessarily be in + /// sync with each other. + fn spawn_metrics_loops(&self) { + let aggregators = self.0.aggregators.clone(); + for (idx, a) in aggregators.into_iter().enumerate() { + let inner = Arc::clone(&self.0); + tokio::spawn(async move { + loop { + let now = tokio::time::Instant::now(); + let metrics = match a.gather_metrics().await { + Ok(metrics) => metrics, + // Any error here is unlikely and probably means that the aggregator + // loop has failed completely. + Err(e) => { + log::error!("Error obtaining metrics (bailing): {}", e); + return + } + }; + + // Lock, update the stored metrics and drop the lock immediately. + // We discard any error; if somethign went wrong talking to the inner loop, + // it's probably a fatal error + { inner.metrics.lock().unwrap()[idx] = metrics; } + + // Sleep *at least* 10 seconds. If it takes a while to get metrics back, we'll + // end up waiting longer between requests. + tokio::time::sleep_until(now + tokio::time::Duration::from_secs(10)).await; + } + }); + } + } + + /// Return the latest metrics we've gathered so far from each internal aggregator. + pub fn latest_metrics(&self) -> Vec { + self.0.metrics.lock().unwrap().clone() } /// Return a sink that a shard can send messages into to be handled by all aggregators. diff --git a/backend/telemetry_core/src/aggregator/inner_loop.rs b/backend/telemetry_core/src/aggregator/inner_loop.rs index 6598bfa..72e468b 100644 --- a/backend/telemetry_core/src/aggregator/inner_loop.rs +++ b/backend/telemetry_core/src/aggregator/inner_loop.rs @@ -37,10 +37,9 @@ pub enum ToAggregator { FromShardWebsocket(ConnId, FromShardWebsocket), FromFeedWebsocket(ConnId, FromFeedWebsocket), FromFindLocation(NodeId, find_location::Location), - /// This message is sent periodically and allows us to monitor - /// or cleanup things in our inner loop. The channel provided - /// is notified when the interval has been handled. - Interval(flume::Sender<()>), + /// Hand back some metrics. The provided sender is expected not to block when + /// a message it sent into it. + GatherMetrics(flume::Sender), } /// An incoming shard connection can send these messages to the aggregator. @@ -102,6 +101,30 @@ pub enum FromFeedWebsocket { Disconnected, } +/// A set of metrics returned when we ask for metrics +#[derive(Clone,Debug,Default)] +pub struct Metrics { + /// When in unix MS from epoch were these metrics obtained + pub timestamp_unix_ms: u64, + /// How many chains are feeds currently subscribed to. + pub chains_subscribed_to: usize, + /// How many feeds are currently subscribed to something. + pub subscribed_feeds: usize, + /// How many feeds have asked for finality information, too. + pub subscribed_finality_feeds: usize, + /// How many messages are currently queued up in internal channels + /// waiting to be sent out to feeds. + pub total_messages_to_feeds: usize, + /// How many messages are queued waiting to be handled by this aggregator. + pub total_messages_to_aggregator: usize, + /// How many nodes are currently known about by this aggregator. + pub connected_nodes: usize, + /// How many feeds are currently connected to this aggregator. + pub connected_feeds: usize, + /// How many shards are currently connected to this aggregator. + pub connected_shards: usize +} + // The frontend sends text based commands; parse them into these messages: impl FromStr for FromFeedWebsocket { type Err = anyhow::Error; @@ -198,45 +221,13 @@ impl InnerLoop { ToAggregator::FromFindLocation(node_id, location) => { self.handle_from_find_location(node_id, location) }, - ToAggregator::Interval(tx) => { - self.handle_interval(tx) + ToAggregator::GatherMetrics(tx) => { + self.handle_gather_metrics(tx, metered_rx.len()) } } } }); - // Periodically send interval messages for cleanup/monitoring. At most 1 - // every 60 seconds, but if the message queue is backed up it may take longer. - tokio::spawn({ - let metered_tx = metered_tx.clone(); - async move { - loop { - let now = tokio::time::Instant::now(); - let (tx, rx) = flume::unbounded(); - - let _ = metered_tx.send_async(ToAggregator::Interval(tx)).await; - let _ = rx.recv_async().await; - - tokio::time::sleep_until(now + tokio::time::Duration::from_secs(60)).await; - } - } - }); - - // TEMP: let's monitor message queue len out of interest - let tx_len = metered_tx.clone(); - std::thread::spawn(move || { - tokio::runtime::Runtime::new() - .unwrap() - .block_on(async move { - let mut n = 0; - loop { - println!("#{} Queue len: {}", n, tx_len.len()); - n += 1; - tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; - } - }); - }); - while let Ok(msg) = rx_from_external.recv_async().await { // ignore node updates if we have too many messages to handle, in an attempt // to reduce the queue length back to something reasonable, lest it get out of @@ -257,43 +248,33 @@ impl InnerLoop { } } - /// The periodic interval message gives us a chance to do some tidy up or monitoring. - fn handle_interval(&mut self, rx: flume::Sender<()>) { + /// Gather and return some metrics. + fn handle_gather_metrics(&mut self, rx: flume::Sender, total_messages_to_aggregator: usize) { - let node_ids = self.node_ids.len(); - let node_count: usize = self.node_state - .iter_chains() - .map(|c| c.node_count()) - .sum(); - - let shard_cound = self.shard_channels.len(); + let timestamp_unix_ms = time::now(); + let connected_nodes = self.node_ids.len(); + let subscribed_feeds = self.feed_conn_id_to_chain.len(); + let chains_subscribed_to = self.chain_to_feed_conn_ids.len(); + let subscribed_finality_feeds = self.feed_conn_id_finality.len(); + let connected_shards = self.shard_channels.len(); let connected_feeds = self.feed_channels.len(); - let finality_feeds = self.feed_conn_id_finality.len(); - let feed_to_chain = self.feed_conn_id_to_chain.len(); - - let num_subscribed_chains = self.chain_to_feed_conn_ids.len(); - let num_subscribed_chain_feeds: usize = self.chain_to_feed_conn_ids + let total_messages_to_feeds: usize = self.feed_channels .values() .map(|c| c.len()) .sum(); - let num_messages_to_feeds: usize = self.feed_channels - .values() - .map(|c| c.len()) - .sum(); - - println!("Periodic update at {:?}:", std::time::SystemTime::now()); - dbg!(node_ids); - dbg!(node_count); - dbg!(shard_cound); - dbg!(connected_feeds); - dbg!(finality_feeds); - dbg!(feed_to_chain); - dbg!(num_subscribed_chains); - dbg!(num_subscribed_chain_feeds); - dbg!(num_messages_to_feeds); - - drop(rx); + // Ignore error sending; assume the receiver stopped caring and dropped the channel: + let _ = rx.send(Metrics { + timestamp_unix_ms, + chains_subscribed_to, + subscribed_feeds, + subscribed_finality_feeds, + total_messages_to_feeds, + total_messages_to_aggregator, + connected_nodes, + connected_feeds, + connected_shards + }); } /// Handle messages that come from the node geographical locator. diff --git a/backend/telemetry_core/src/main.rs b/backend/telemetry_core/src/main.rs index c55ade2..1dc039d 100644 --- a/backend/telemetry_core/src/main.rs +++ b/backend/telemetry_core/src/main.rs @@ -178,7 +178,11 @@ async fn start_server(num_aggregators: usize, opts: Opts) -> anyhow::Result<()> let _ = ws_send.close().await; }, )) - } + }, + // Return metrics in a prometheus-friendly text based format: + (&Method::GET, "/metrics") => { + Ok(return_prometheus_metrics(aggregator).await) + }, // 404 for anything else: _ => Ok(Response::builder() .status(404) @@ -427,9 +431,7 @@ where Some(msgs) => msgs, None => break, }; -if _feed_id == 1 { - println!("FEED 1 message len: {}", msgs.len()); -} + // There is only one message type at the mo; bytes to send // to the websocket. collect them all up to dispatch in one shot. let all_msg_bytes = msgs.into_iter().map(|msg| match msg { @@ -481,3 +483,24 @@ if _feed_id == 1 { // loop ended; give socket back to parent: (tx_to_aggregator, ws_send) } + +async fn return_prometheus_metrics(aggregator: AggregatorSet) -> Response { + let metrics = aggregator.latest_metrics(); + + let mut s = String::new(); + for (idx, m) in metrics.iter().enumerate() { + s.push_str(&format!("telemetry_connected_feeds{{aggregator=\"{}\"}} {} {}\n", idx, m.connected_feeds, m.timestamp_unix_ms)); + s.push_str(&format!("telemetry_connected_nodes{{aggregator=\"{}\"}} {} {}\n", idx, m.connected_nodes, m.timestamp_unix_ms)); + s.push_str(&format!("telemetry_connected_shards{{aggregator=\"{}\"}} {} {}\n", idx, m.connected_shards, m.timestamp_unix_ms)); + s.push_str(&format!("telemetry_chains_subscribed_to{{aggregator=\"{}\"}} {} {}\n", idx, m.chains_subscribed_to, m.timestamp_unix_ms)); + s.push_str(&format!("telemetry_subscribed_feeds{{aggregator=\"{}\"}} {} {}\n", idx, m.subscribed_feeds, m.timestamp_unix_ms)); + s.push_str(&format!("telemetry_subscribed_finality_feeds{{aggregator=\"{}\"}} {} {}\n", idx, m.subscribed_finality_feeds, m.timestamp_unix_ms)); + s.push_str(&format!("telemetry_total_messages_to_feeds{{aggregator=\"{}\"}} {} {}\n", idx, m.total_messages_to_feeds, m.timestamp_unix_ms)); + s.push_str(&format!("telemetry_total_messages_to_aggregator{{aggregator=\"{}\"}} {} {}\n\n", idx, m.total_messages_to_aggregator, m.timestamp_unix_ms)); + } + + Response::builder() + .header(http::header::CONTENT_TYPE, "text/plain; version=0.0.4") + .body(s.into()) + .unwrap() +} \ No newline at end of file From 9017f328f08c2b5f9caf597abd6195bd8da426ce Mon Sep 17 00:00:00 2001 From: James Wilson Date: Wed, 11 Aug 2021 16:05:29 +0100 Subject: [PATCH 17/22] Add comment explaining prometheus metrics endpoint body --- backend/telemetry_core/src/main.rs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/backend/telemetry_core/src/main.rs b/backend/telemetry_core/src/main.rs index 1dc039d..0e2c61b 100644 --- a/backend/telemetry_core/src/main.rs +++ b/backend/telemetry_core/src/main.rs @@ -487,6 +487,13 @@ where async fn return_prometheus_metrics(aggregator: AggregatorSet) -> Response { let metrics = aggregator.latest_metrics(); + // Instead of using the rust prometheus library, we just split out the text format that prometheus expects + // ourselves, using whatever the latest metrics that we've captured so far are. See: + // + // https://github.com/prometheus/docs/blob/master/content/docs/instrumenting/exposition_formats.md#text-format-details + // + // For an example and explanation of this text based format. The minimal output we produce here seems to + // be handled correctly when pointing a current version of prometheus at it. let mut s = String::new(); for (idx, m) in metrics.iter().enumerate() { s.push_str(&format!("telemetry_connected_feeds{{aggregator=\"{}\"}} {} {}\n", idx, m.connected_feeds, m.timestamp_unix_ms)); @@ -500,6 +507,7 @@ async fn return_prometheus_metrics(aggregator: AggregatorSet) -> Response Date: Thu, 12 Aug 2021 11:21:36 +0100 Subject: [PATCH 18/22] cargo fmt --- backend/common/src/dense_map.rs | 6 +-- backend/common/src/ws_client/sender.rs | 4 +- .../src/aggregator/aggregator.rs | 11 +++-- .../src/aggregator/aggregator_set.rs | 18 +++---- .../src/aggregator/inner_loop.rs | 20 ++++---- backend/telemetry_core/src/main.rs | 48 ++++++++++++++----- backend/telemetry_core/tests/soak_tests.rs | 2 +- backend/test_utils/src/server/channels.rs | 5 +- 8 files changed, 70 insertions(+), 44 deletions(-) diff --git a/backend/common/src/dense_map.rs b/backend/common/src/dense_map.rs index 3dfd420..d529eb1 100644 --- a/backend/common/src/dense_map.rs +++ b/backend/common/src/dense_map.rs @@ -139,8 +139,7 @@ mod test { #[test] fn len_doesnt_panic_if_lots_of_retired() { - - let mut map = DenseMap::::new(); + let mut map = DenseMap::::new(); let id1 = map.add(1); let id2 = map.add(2); @@ -163,5 +162,4 @@ mod test { assert_eq!(map.len(), 0); } - -} \ No newline at end of file +} diff --git a/backend/common/src/ws_client/sender.rs b/backend/common/src/ws_client/sender.rs index 9cb8ee7..b81529c 100644 --- a/backend/common/src/ws_client/sender.rs +++ b/backend/common/src/ws_client/sender.rs @@ -60,7 +60,9 @@ impl Sender { Ok(()) } /// Convert this sender into a Sink - pub fn into_sink(self) -> impl futures::Sink + std::marker::Unpin + Clone + 'static { + pub fn into_sink( + self, + ) -> impl futures::Sink + std::marker::Unpin + Clone + 'static { self.inner.into_sink() } } diff --git a/backend/telemetry_core/src/aggregator/aggregator.rs b/backend/telemetry_core/src/aggregator/aggregator.rs index 8ba8cc2..d6604a8 100644 --- a/backend/telemetry_core/src/aggregator/aggregator.rs +++ b/backend/telemetry_core/src/aggregator/aggregator.rs @@ -62,11 +62,12 @@ impl Aggregator { let (tx_to_aggregator, rx_from_external) = flume::unbounded(); // Kick off a locator task to locate nodes, which hands back a channel to make location requests - let tx_to_locator = find_location(tx_to_aggregator.clone().into_sink().with(|(node_id, msg)| { - future::ok::<_, flume::SendError<_>>(inner_loop::ToAggregator::FromFindLocation( - node_id, msg, - )) - })); + let tx_to_locator = + find_location(tx_to_aggregator.clone().into_sink().with(|(node_id, msg)| { + future::ok::<_, flume::SendError<_>>(inner_loop::ToAggregator::FromFindLocation( + node_id, msg, + )) + })); // Handle any incoming messages in our handler loop: tokio::spawn(Aggregator::handle_messages( diff --git a/backend/telemetry_core/src/aggregator/aggregator_set.rs b/backend/telemetry_core/src/aggregator/aggregator_set.rs index 8fb6251..7dd368d 100644 --- a/backend/telemetry_core/src/aggregator/aggregator_set.rs +++ b/backend/telemetry_core/src/aggregator/aggregator_set.rs @@ -2,9 +2,9 @@ use super::aggregator::{Aggregator, AggregatorOpts}; use super::inner_loop; use common::EitherSink; use futures::{Sink, SinkExt}; -use inner_loop::{ Metrics, FromShardWebsocket }; +use inner_loop::{FromShardWebsocket, Metrics}; use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::{ Arc, Mutex }; +use std::sync::{Arc, Mutex}; #[derive(Clone)] pub struct AggregatorSet(Arc); @@ -12,7 +12,7 @@ pub struct AggregatorSet(Arc); pub struct AggregatorSetInner { aggregators: Vec, next_idx: AtomicUsize, - metrics: Mutex> + metrics: Mutex>, } impl AggregatorSet { @@ -28,14 +28,12 @@ impl AggregatorSet { ) .await?; - let initial_metrics = (0..num_aggregators) - .map(|_| Metrics::default()) - .collect(); + let initial_metrics = (0..num_aggregators).map(|_| Metrics::default()).collect(); let this = AggregatorSet(Arc::new(AggregatorSetInner { aggregators, next_idx: AtomicUsize::new(0), - metrics: Mutex::new(initial_metrics) + metrics: Mutex::new(initial_metrics), })); // Start asking for metrics: @@ -60,14 +58,16 @@ impl AggregatorSet { // loop has failed completely. Err(e) => { log::error!("Error obtaining metrics (bailing): {}", e); - return + return; } }; // Lock, update the stored metrics and drop the lock immediately. // We discard any error; if somethign went wrong talking to the inner loop, // it's probably a fatal error - { inner.metrics.lock().unwrap()[idx] = metrics; } + { + inner.metrics.lock().unwrap()[idx] = metrics; + } // Sleep *at least* 10 seconds. If it takes a while to get metrics back, we'll // end up waiting longer between requests. diff --git a/backend/telemetry_core/src/aggregator/inner_loop.rs b/backend/telemetry_core/src/aggregator/inner_loop.rs index 72e468b..20ed612 100644 --- a/backend/telemetry_core/src/aggregator/inner_loop.rs +++ b/backend/telemetry_core/src/aggregator/inner_loop.rs @@ -102,7 +102,7 @@ pub enum FromFeedWebsocket { } /// A set of metrics returned when we ask for metrics -#[derive(Clone,Debug,Default)] +#[derive(Clone, Debug, Default)] pub struct Metrics { /// When in unix MS from epoch were these metrics obtained pub timestamp_unix_ms: u64, @@ -122,7 +122,7 @@ pub struct Metrics { /// How many feeds are currently connected to this aggregator. pub connected_feeds: usize, /// How many shards are currently connected to this aggregator. - pub connected_shards: usize + pub connected_shards: usize, } // The frontend sends text based commands; parse them into these messages: @@ -220,7 +220,7 @@ impl InnerLoop { } ToAggregator::FromFindLocation(node_id, location) => { self.handle_from_find_location(node_id, location) - }, + } ToAggregator::GatherMetrics(tx) => { self.handle_gather_metrics(tx, metered_rx.len()) } @@ -249,8 +249,11 @@ impl InnerLoop { } /// Gather and return some metrics. - fn handle_gather_metrics(&mut self, rx: flume::Sender, total_messages_to_aggregator: usize) { - + fn handle_gather_metrics( + &mut self, + rx: flume::Sender, + total_messages_to_aggregator: usize, + ) { let timestamp_unix_ms = time::now(); let connected_nodes = self.node_ids.len(); let subscribed_feeds = self.feed_conn_id_to_chain.len(); @@ -258,10 +261,7 @@ impl InnerLoop { let subscribed_finality_feeds = self.feed_conn_id_finality.len(); let connected_shards = self.shard_channels.len(); let connected_feeds = self.feed_channels.len(); - let total_messages_to_feeds: usize = self.feed_channels - .values() - .map(|c| c.len()) - .sum(); + let total_messages_to_feeds: usize = self.feed_channels.values().map(|c| c.len()).sum(); // Ignore error sending; assume the receiver stopped caring and dropped the channel: let _ = rx.send(Metrics { @@ -273,7 +273,7 @@ impl InnerLoop { total_messages_to_aggregator, connected_nodes, connected_feeds, - connected_shards + connected_shards, }); } diff --git a/backend/telemetry_core/src/main.rs b/backend/telemetry_core/src/main.rs index 0e2c61b..91308b0 100644 --- a/backend/telemetry_core/src/main.rs +++ b/backend/telemetry_core/src/main.rs @@ -178,11 +178,9 @@ async fn start_server(num_aggregators: usize, opts: Opts) -> anyhow::Result<()> let _ = ws_send.close().await; }, )) - }, + } // Return metrics in a prometheus-friendly text based format: - (&Method::GET, "/metrics") => { - Ok(return_prometheus_metrics(aggregator).await) - }, + (&Method::GET, "/metrics") => Ok(return_prometheus_metrics(aggregator).await), // 404 for anything else: _ => Ok(Response::builder() .status(404) @@ -496,14 +494,38 @@ async fn return_prometheus_metrics(aggregator: AggregatorSet) -> Response Response Result<(), flume::SendError> { + pub fn send_json_text( + &mut self, + json: serde_json::Value, + ) -> Result<(), flume::SendError> { let s = serde_json::to_string(&json).expect("valid string"); self.unbounded_send(ws_client::SentMessage::Text(s)) } From 05a3ba3fefe11dfb09a00d4dec4a1075d2c4f920 Mon Sep 17 00:00:00 2001 From: James Wilson Date: Thu, 12 Aug 2021 16:20:05 +0100 Subject: [PATCH 19/22] Fix/expand a few comments --- .../telemetry_core/src/aggregator/aggregator_set.rs | 2 +- backend/telemetry_core/src/main.rs | 7 +++++-- backend/telemetry_core/tests/soak_tests.rs | 10 +++++----- 3 files changed, 11 insertions(+), 8 deletions(-) diff --git a/backend/telemetry_core/src/aggregator/aggregator_set.rs b/backend/telemetry_core/src/aggregator/aggregator_set.rs index 7dd368d..c5a324d 100644 --- a/backend/telemetry_core/src/aggregator/aggregator_set.rs +++ b/backend/telemetry_core/src/aggregator/aggregator_set.rs @@ -63,7 +63,7 @@ impl AggregatorSet { }; // Lock, update the stored metrics and drop the lock immediately. - // We discard any error; if somethign went wrong talking to the inner loop, + // We discard any error; if something went wrong talking to the inner loop, // it's probably a fatal error { inner.metrics.lock().unwrap()[idx] = metrics; diff --git a/backend/telemetry_core/src/main.rs b/backend/telemetry_core/src/main.rs index 91308b0..9be898f 100644 --- a/backend/telemetry_core/src/main.rs +++ b/backend/telemetry_core/src/main.rs @@ -485,13 +485,16 @@ where async fn return_prometheus_metrics(aggregator: AggregatorSet) -> Response { let metrics = aggregator.latest_metrics(); - // Instead of using the rust prometheus library, we just split out the text format that prometheus expects - // ourselves, using whatever the latest metrics that we've captured so far are. See: + // Instead of using the rust prometheus library (which is optimised around global variables updated across a codebase), + // we just split out the text format that prometheus expects ourselves, using whatever the latest metrics that we've + // captured so far from the aggregators are. See: // // https://github.com/prometheus/docs/blob/master/content/docs/instrumenting/exposition_formats.md#text-format-details // // For an example and explanation of this text based format. The minimal output we produce here seems to // be handled correctly when pointing a current version of prometheus at it. + // + // Note: '{{' and '}}' are just escaped versions of '{' and '}' in Rust fmt strings. let mut s = String::new(); for (idx, m) in metrics.iter().enumerate() { s.push_str(&format!( diff --git a/backend/telemetry_core/tests/soak_tests.rs b/backend/telemetry_core/tests/soak_tests.rs index d033a74..2aef316 100644 --- a/backend/telemetry_core/tests/soak_tests.rs +++ b/backend/telemetry_core/tests/soak_tests.rs @@ -46,17 +46,17 @@ use test_utils::workspace::{start_server, CoreOpts, ServerOpts, ShardOpts}; /// /// To start up 4 telemetry_shards and 1 telemetry_core with 10 feeds and 100 nodes: /// ```sh -/// SOAK_TEST_ARGS='--feeds 10 --nodes 100 --shards 4' cargo test --release -- realistic_soak_test --ignored --nocapture +/// SOAK_TEST_ARGS='--feeds 10 --nodes 100 --shards 4' cargo test --release -- soak_test --ignored --nocapture /// ``` /// /// You can also run this test against the pre-sharding actix binary with something like this: /// ```sh -/// TELEMETRY_BIN=~/old_telemetry_binary SOAK_TEST_ARGS='--feeds 100 --nodes 100 --shards 4' cargo test --release -- realistic_soak_test --ignored --nocapture +/// TELEMETRY_BIN=~/old_telemetry_binary SOAK_TEST_ARGS='--feeds 100 --nodes 100 --shards 4' cargo test --release -- soak_test --ignored --nocapture /// ``` /// /// Or, you can run it against existing processes on the network with something like this: /// ```sh -/// TELEMETRY_SUBMIT_HOSTS='127.0.0.1:8001' TELEMETRY_FEED_HOST='127.0.0.1:8000' SOAK_TEST_ARGS='--feeds 100 --nodes 100 --shards 4' cargo test --release -- realistic_soak_test --ignored --nocapture +/// TELEMETRY_SUBMIT_HOSTS='127.0.0.1:8001' TELEMETRY_FEED_HOST='127.0.0.1:8000' SOAK_TEST_ARGS='--feeds 100 --nodes 100 --shards 4' cargo test --release -- soak_test --ignored --nocapture /// ``` /// #[ignore] @@ -70,14 +70,14 @@ pub fn soak_test() { .thread_name("telemetry_test_runner") .build() .unwrap() - .block_on(run_realistic_soak_test(opts)); + .block_on(run_soak_test(opts)); } /// A general soak test runner. /// This test sends realistic messages from connected nodes /// so that we can see how things react under more normal /// circumstances -async fn run_realistic_soak_test(opts: SoakTestOpts) { +async fn run_soak_test(opts: SoakTestOpts) { let mut server = start_server( ServerOpts { release_mode: true, From b842c7fc8bbfa5b25cd087283efbd6a62db55790 Mon Sep 17 00:00:00 2001 From: James Wilson Date: Fri, 13 Aug 2021 11:33:53 +0100 Subject: [PATCH 20/22] expose dropped message counts and fix some typos/wording --- backend/common/src/dense_map.rs | 2 +- .../src/aggregator/aggregator.rs | 2 +- .../src/aggregator/inner_loop.rs | 18 +++++++++++++++--- backend/telemetry_core/src/main.rs | 8 ++++++-- 4 files changed, 23 insertions(+), 7 deletions(-) diff --git a/backend/common/src/dense_map.rs b/backend/common/src/dense_map.rs index d529eb1..24bc60d 100644 --- a/backend/common/src/dense_map.rs +++ b/backend/common/src/dense_map.rs @@ -138,7 +138,7 @@ mod test { use super::*; #[test] - fn len_doesnt_panic_if_lots_of_retired() { + fn len_doesnt_panic_if_lots_of_ids_are_retired() { let mut map = DenseMap::::new(); let id1 = map.add(1); diff --git a/backend/telemetry_core/src/aggregator/aggregator.rs b/backend/telemetry_core/src/aggregator/aggregator.rs index d6604a8..c172680 100644 --- a/backend/telemetry_core/src/aggregator/aggregator.rs +++ b/backend/telemetry_core/src/aggregator/aggregator.rs @@ -86,7 +86,7 @@ impl Aggregator { } /// This is spawned into a separate task and handles any messages coming - /// in to the aggregator. If nobody is tolding the tx side of the channel + /// in to the aggregator. If nobody is holding the tx side of the channel /// any more, this task will gracefully end. async fn handle_messages( rx_from_external: flume::Receiver, diff --git a/backend/telemetry_core/src/aggregator/inner_loop.rs b/backend/telemetry_core/src/aggregator/inner_loop.rs index 20ed612..35cc332 100644 --- a/backend/telemetry_core/src/aggregator/inner_loop.rs +++ b/backend/telemetry_core/src/aggregator/inner_loop.rs @@ -30,6 +30,7 @@ use std::{ net::{IpAddr, Ipv4Addr}, str::FromStr, }; +use std::sync::{ Arc, atomic::{ Ordering, AtomicU64 } }; /// Incoming messages come via subscriptions, and end up looking like this. #[derive(Clone, Debug)] @@ -38,7 +39,7 @@ pub enum ToAggregator { FromFeedWebsocket(ConnId, FromFeedWebsocket), FromFindLocation(NodeId, find_location::Location), /// Hand back some metrics. The provided sender is expected not to block when - /// a message it sent into it. + /// a message is sent into it. GatherMetrics(flume::Sender), } @@ -117,7 +118,9 @@ pub struct Metrics { pub total_messages_to_feeds: usize, /// How many messages are queued waiting to be handled by this aggregator. pub total_messages_to_aggregator: usize, - /// How many nodes are currently known about by this aggregator. + /// How many (non-critical) messages have been dropped by the aggregator because it was overwhelmed. + pub dropped_messages_to_aggregator: u64, + /// How many nodes are currently known to this aggregator. pub connected_nodes: usize, /// How many feeds are currently connected to this aggregator. pub connected_feeds: usize, @@ -206,9 +209,13 @@ impl InnerLoop { let max_queue_len = self.max_queue_len; let (metered_tx, metered_rx) = flume::unbounded(); + // Keep count of the number of messages we drop for the sake of metric reporting + let dropped_messages = Arc::new(AtomicU64::new(0)); + // Actually handle all of our messages, but before we get here, we // check the length of the queue below to decide whether or not to // pass the message on to this. + let dropped_messages2 = Arc::clone(&dropped_messages); tokio::spawn(async move { while let Ok(msg) = metered_rx.recv_async().await { match msg { @@ -222,7 +229,7 @@ impl InnerLoop { self.handle_from_find_location(node_id, location) } ToAggregator::GatherMetrics(tx) => { - self.handle_gather_metrics(tx, metered_rx.len()) + self.handle_gather_metrics(tx, metered_rx.len(), dropped_messages2.load(Ordering::Relaxed)) } } } @@ -237,6 +244,9 @@ impl InnerLoop { msg, ToAggregator::FromShardWebsocket(.., FromShardWebsocket::Update { .. }) ) { + // Note: this wraps on overflow (which is probably the best + // behaviour for graphing it anyway) + dropped_messages.fetch_add(1, Ordering::Relaxed); continue; } } @@ -253,6 +263,7 @@ impl InnerLoop { &mut self, rx: flume::Sender, total_messages_to_aggregator: usize, + dropped_messages_to_aggregator: u64 ) { let timestamp_unix_ms = time::now(); let connected_nodes = self.node_ids.len(); @@ -271,6 +282,7 @@ impl InnerLoop { subscribed_finality_feeds, total_messages_to_feeds, total_messages_to_aggregator, + dropped_messages_to_aggregator, connected_nodes, connected_feeds, connected_shards, diff --git a/backend/telemetry_core/src/main.rs b/backend/telemetry_core/src/main.rs index 9be898f..1508daa 100644 --- a/backend/telemetry_core/src/main.rs +++ b/backend/telemetry_core/src/main.rs @@ -486,8 +486,8 @@ async fn return_prometheus_metrics(aggregator: AggregatorSet) -> Response Response Date: Fri, 13 Aug 2021 11:35:24 +0100 Subject: [PATCH 21/22] cargo fmt --- .../telemetry_core/src/aggregator/inner_loop.rs | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/backend/telemetry_core/src/aggregator/inner_loop.rs b/backend/telemetry_core/src/aggregator/inner_loop.rs index 35cc332..ac6e144 100644 --- a/backend/telemetry_core/src/aggregator/inner_loop.rs +++ b/backend/telemetry_core/src/aggregator/inner_loop.rs @@ -26,11 +26,14 @@ use common::{ time, }; use std::collections::{HashMap, HashSet}; +use std::sync::{ + atomic::{AtomicU64, Ordering}, + Arc, +}; use std::{ net::{IpAddr, Ipv4Addr}, str::FromStr, }; -use std::sync::{ Arc, atomic::{ Ordering, AtomicU64 } }; /// Incoming messages come via subscriptions, and end up looking like this. #[derive(Clone, Debug)] @@ -228,9 +231,11 @@ impl InnerLoop { ToAggregator::FromFindLocation(node_id, location) => { self.handle_from_find_location(node_id, location) } - ToAggregator::GatherMetrics(tx) => { - self.handle_gather_metrics(tx, metered_rx.len(), dropped_messages2.load(Ordering::Relaxed)) - } + ToAggregator::GatherMetrics(tx) => self.handle_gather_metrics( + tx, + metered_rx.len(), + dropped_messages2.load(Ordering::Relaxed), + ), } } }); @@ -263,7 +268,7 @@ impl InnerLoop { &mut self, rx: flume::Sender, total_messages_to_aggregator: usize, - dropped_messages_to_aggregator: u64 + dropped_messages_to_aggregator: u64, ) { let timestamp_unix_ms = time::now(); let connected_nodes = self.node_ids.len(); From 46b0641dfd9ff9b560853d57311186836adaf87e Mon Sep 17 00:00:00 2001 From: James Wilson Date: Fri, 13 Aug 2021 11:45:03 +0100 Subject: [PATCH 22/22] Clarify wording --- backend/telemetry_core/src/aggregator/inner_loop.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/backend/telemetry_core/src/aggregator/inner_loop.rs b/backend/telemetry_core/src/aggregator/inner_loop.rs index ac6e144..357137d 100644 --- a/backend/telemetry_core/src/aggregator/inner_loop.rs +++ b/backend/telemetry_core/src/aggregator/inner_loop.rs @@ -112,9 +112,9 @@ pub struct Metrics { pub timestamp_unix_ms: u64, /// How many chains are feeds currently subscribed to. pub chains_subscribed_to: usize, - /// How many feeds are currently subscribed to something. + /// Number of subscribed feeds. pub subscribed_feeds: usize, - /// How many feeds have asked for finality information, too. + /// Number of subscribed feeds that also asked for finality information. pub subscribed_finality_feeds: usize, /// How many messages are currently queued up in internal channels /// waiting to be sent out to feeds.