From 98c9ccd278fc15ef1d231182a54a3714deb44a0b Mon Sep 17 00:00:00 2001 From: James Wilson Date: Mon, 9 Aug 2021 17:52:56 +0100 Subject: [PATCH] fmt, clean warnings, tidy aggregator opts and add queue length limit --- .../common/src/channel/metered_unbounded.rs | 49 ++++++++++++------- backend/common/src/channel/mod.rs | 2 +- backend/common/src/lib.rs | 2 +- .../src/aggregator/aggregator.rs | 18 +++++-- .../src/aggregator/aggregator_set.rs | 6 +-- .../src/aggregator/inner_loop.rs | 39 +++++++++++---- backend/telemetry_core/src/aggregator/mod.rs | 1 + backend/telemetry_core/src/main.rs | 17 ++++++- 8 files changed, 95 insertions(+), 39 deletions(-) diff --git a/backend/common/src/channel/metered_unbounded.rs b/backend/common/src/channel/metered_unbounded.rs index f76e440..81c8f00 100644 --- a/backend/common/src/channel/metered_unbounded.rs +++ b/backend/common/src/channel/metered_unbounded.rs @@ -1,6 +1,8 @@ -use futures::channel::mpsc::{ SendError, TrySendError, UnboundedSender, UnboundedReceiver, unbounded }; -use futures::{ Sink, Stream, SinkExt, StreamExt }; -use std::sync::atomic::{ AtomicUsize, Ordering }; +use futures::channel::mpsc::{ + unbounded, SendError, TrySendError, UnboundedReceiver, UnboundedSender, +}; +use futures::{Sink, SinkExt, Stream, StreamExt}; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use std::task::Poll; @@ -12,11 +14,11 @@ pub fn metered_unbounded() -> (MeteredUnboundedSender, MeteredUnboundedRec let tx = MeteredUnboundedSender { inner: tx, - len: len + len: len, }; let rx = MeteredUnboundedReceiver { inner: rx, - len: len2 + len: len2, }; (tx, rx) @@ -30,7 +32,7 @@ pub struct MeteredUnboundedSender { len: Arc, } -impl MeteredUnboundedSender { +impl MeteredUnboundedSender { /// The current number of messages in the queue. pub fn len(&self) -> usize { self.len.load(Ordering::Relaxed) @@ -43,10 +45,13 @@ impl MeteredUnboundedSender { } } -impl Sink for MeteredUnboundedSender { +impl Sink for MeteredUnboundedSender { type Error = SendError; - fn poll_ready(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll> { + fn poll_ready( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll> { self.inner.poll_ready(cx) } @@ -54,19 +59,28 @@ impl Sink for MeteredUnboundedSender { self.unbounded_send(item).map_err(|e| e.into_send_error()) } - fn poll_flush(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll> { + fn poll_flush( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll> { self.inner.poll_flush_unpin(cx) } - fn poll_close(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll> { + fn poll_close( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll> { self.inner.poll_close_unpin(cx) } } -impl Stream for MeteredUnboundedReceiver { +impl Stream for MeteredUnboundedReceiver { type Item = T; - fn poll_next(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll> { + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { let res = self.inner.poll_next_unpin(cx); if matches!(res, Poll::Ready(Some(..))) { self.len.fetch_sub(1, Ordering::Relaxed); @@ -83,7 +97,7 @@ pub struct MeteredUnboundedReceiver { len: Arc, } -impl MeteredUnboundedReceiver { +impl MeteredUnboundedReceiver { /// The current number of messages in the queue. pub fn len(&self) -> usize { self.len.load(Ordering::Relaxed) @@ -137,7 +151,7 @@ mod test { #[tokio::test] async fn channel_len_consistent_when_send_parallelised() { - let (mut tx, mut rx) = metered_unbounded::(); + let (tx, _rx) = metered_unbounded::(); // Send lots of messages on a bunch of real threads: let mut join_handles = vec![]; @@ -156,12 +170,11 @@ mod test { handle.join().unwrap(); } assert_eq!(tx.len(), 50 * 10_000); - } #[tokio::test] async fn channel_len_consistent_when_send_and_recv_parallelised() { - let (mut tx, mut rx) = metered_unbounded::(); + let (tx, mut rx) = metered_unbounded::(); // Send lots of messages on a bunch of real threads: let mut join_handles = vec![]; @@ -185,7 +198,5 @@ mod test { handle.join().unwrap(); } assert_eq!(tx.len(), 0); - } - -} \ No newline at end of file +} diff --git a/backend/common/src/channel/mod.rs b/backend/common/src/channel/mod.rs index dd4b412..3b1b209 100644 --- a/backend/common/src/channel/mod.rs +++ b/backend/common/src/channel/mod.rs @@ -1,3 +1,3 @@ mod metered_unbounded; -pub use metered_unbounded::*; \ No newline at end of file +pub use metered_unbounded::*; diff --git a/backend/common/src/lib.rs b/backend/common/src/lib.rs index b863e45..690f118 100644 --- a/backend/common/src/lib.rs +++ b/backend/common/src/lib.rs @@ -15,6 +15,7 @@ // along with this program. If not, see . pub mod byte_size; +pub mod channel; pub mod http_utils; pub mod id_type; pub mod internal_messages; @@ -24,7 +25,6 @@ pub mod ready_chunks_all; pub mod rolling_total; pub mod time; pub mod ws_client; -pub mod channel; mod assign_id; mod dense_map; diff --git a/backend/telemetry_core/src/aggregator/aggregator.rs b/backend/telemetry_core/src/aggregator/aggregator.rs index 172a405..7f644c2 100644 --- a/backend/telemetry_core/src/aggregator/aggregator.rs +++ b/backend/telemetry_core/src/aggregator/aggregator.rs @@ -34,6 +34,16 @@ id_type! { #[derive(Clone)] pub struct Aggregator(Arc); +/// Options to configure the aggregator loop(s) +#[derive(Debug, Clone)] +pub struct AggregatorOpts { + /// Any node from these chains is muted + pub denylist: Vec, + /// If our incoming message queue exceeds this length, we start + /// dropping non-essential messages. + pub max_queue_len: usize, +} + struct AggregatorInternal { /// Shards that connect are each assigned a unique connection ID. /// This helps us know who to send messages back to (especially in @@ -49,7 +59,7 @@ struct AggregatorInternal { impl Aggregator { /// Spawn a new Aggregator. This connects to the telemetry backend - pub async fn spawn(denylist: Vec) -> anyhow::Result { + pub async fn spawn(opts: AggregatorOpts) -> anyhow::Result { let (tx_to_aggregator, rx_from_external) = mpsc::unbounded(); // Kick off a locator task to locate nodes, which hands back a channel to make location requests @@ -63,7 +73,8 @@ impl Aggregator { tokio::spawn(Aggregator::handle_messages( rx_from_external, tx_to_locator, - denylist, + opts.max_queue_len, + opts.denylist, )); // Return a handle to our aggregator: @@ -80,9 +91,10 @@ impl Aggregator { async fn handle_messages( rx_from_external: mpsc::UnboundedReceiver, tx_to_aggregator: mpsc::UnboundedSender<(NodeId, Ipv4Addr)>, + max_queue_len: usize, denylist: Vec, ) { - inner_loop::InnerLoop::new(tx_to_aggregator, denylist) + inner_loop::InnerLoop::new(tx_to_aggregator, denylist, max_queue_len) .handle(rx_from_external) .await; } diff --git a/backend/telemetry_core/src/aggregator/aggregator_set.rs b/backend/telemetry_core/src/aggregator/aggregator_set.rs index 8ebfa25..5fa991b 100644 --- a/backend/telemetry_core/src/aggregator/aggregator_set.rs +++ b/backend/telemetry_core/src/aggregator/aggregator_set.rs @@ -1,4 +1,4 @@ -use super::aggregator::Aggregator; +use super::aggregator::{Aggregator, AggregatorOpts}; use super::inner_loop; use common::EitherSink; use futures::{Sink, SinkExt, StreamExt}; @@ -18,12 +18,12 @@ impl AggregatorSet { /// Spawn the number of aggregators we're asked to. pub async fn spawn( num_aggregators: usize, - denylist: Vec, + opts: AggregatorOpts, ) -> anyhow::Result { assert_ne!(num_aggregators, 0, "You must have 1 or more aggregator"); let aggregators = futures::future::try_join_all( - (0..num_aggregators).map(|_| Aggregator::spawn(denylist.clone())), + (0..num_aggregators).map(|_| Aggregator::spawn(opts.clone())), ) .await?; diff --git a/backend/telemetry_core/src/aggregator/inner_loop.rs b/backend/telemetry_core/src/aggregator/inner_loop.rs index 2426090..abfd1fc 100644 --- a/backend/telemetry_core/src/aggregator/inner_loop.rs +++ b/backend/telemetry_core/src/aggregator/inner_loop.rs @@ -20,10 +20,10 @@ use crate::find_location; use crate::state::{self, NodeId, State}; use bimap::BiMap; use common::{ + channel::metered_unbounded, internal_messages::{self, MuteReason, ShardNodeId}, node_message, node_types::BlockHash, - channel::metered_unbounded, time, }; use futures::channel::mpsc; @@ -151,6 +151,10 @@ pub struct InnerLoop { /// Send messages here to make geographical location requests. tx_to_locator: mpsc::UnboundedSender<(NodeId, Ipv4Addr)>, + + /// How big can the queue of messages coming in to the aggregator get before messages + /// are prioritised and dropped to try and get back on track. + max_queue_len: usize, } impl InnerLoop { @@ -158,6 +162,7 @@ impl InnerLoop { pub fn new( tx_to_locator: mpsc::UnboundedSender<(NodeId, Ipv4Addr)>, denylist: Vec, + max_queue_len: usize, ) -> Self { InnerLoop { node_state: State::new(denylist), @@ -168,6 +173,7 @@ impl InnerLoop { chain_to_feed_conn_ids: HashMap::new(), feed_conn_id_finality: HashSet::new(), tx_to_locator, + max_queue_len, } } @@ -175,7 +181,7 @@ impl InnerLoop { /// only have a single `.await` (in this function). This helps to make it clear that the aggregator loop /// will be able to make progress quickly without any potential yield points. pub async fn handle(mut self, mut rx_from_external: mpsc::UnboundedReceiver) { - + let max_queue_len = self.max_queue_len; let (metered_tx, mut metered_rx) = metered_unbounded(); tokio::spawn(async move { @@ -197,17 +203,30 @@ impl InnerLoop { // TEMP: let's monitor message queue len out of interest let tx_len = metered_tx.clone(); std::thread::spawn(move || { - tokio::runtime::Runtime::new().unwrap().block_on(async move { - let mut n = 0; - loop { - println!("#{} Queue len: {}", n, tx_len.len()); - n += 1; - tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; - } - }); + tokio::runtime::Runtime::new() + .unwrap() + .block_on(async move { + let mut n = 0; + loop { + println!("#{} Queue len: {}", n, tx_len.len()); + n += 1; + tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; + } + }); }); while let Some(msg) = rx_from_external.next().await { + // ignore node updates if we have too many messages to handle, in an attempt + // to reduce the queue length back to something reasonable. + if metered_tx.len() > max_queue_len { + if matches!( + msg, + ToAggregator::FromShardWebsocket(.., FromShardWebsocket::Update { .. }) + ) { + continue; + } + } + if let Err(e) = metered_tx.unbounded_send(msg) { log::error!("Cannot send message into aggregator: {}", e); break; diff --git a/backend/telemetry_core/src/aggregator/mod.rs b/backend/telemetry_core/src/aggregator/mod.rs index 2865ed9..9caab51 100644 --- a/backend/telemetry_core/src/aggregator/mod.rs +++ b/backend/telemetry_core/src/aggregator/mod.rs @@ -19,6 +19,7 @@ mod aggregator_set; mod inner_loop; // Expose the various message types that can be worked with externally: +pub use aggregator::AggregatorOpts; pub use inner_loop::{FromFeedWebsocket, FromShardWebsocket, ToFeedWebsocket, ToShardWebsocket}; pub use aggregator_set::*; diff --git a/backend/telemetry_core/src/main.rs b/backend/telemetry_core/src/main.rs index e7f1414..beed2ec 100644 --- a/backend/telemetry_core/src/main.rs +++ b/backend/telemetry_core/src/main.rs @@ -22,7 +22,8 @@ use std::str::FromStr; use tokio::time::{Duration, Instant}; use aggregator::{ - AggregatorSet, FromFeedWebsocket, FromShardWebsocket, ToFeedWebsocket, ToShardWebsocket, + AggregatorOpts, AggregatorSet, FromFeedWebsocket, FromShardWebsocket, ToFeedWebsocket, + ToShardWebsocket, }; use bincode::Options; use common::http_utils; @@ -67,6 +68,10 @@ struct Opts { /// aggregators. #[structopt(long)] num_aggregators: Option, + /// How big can the message queue for each aggregator grow before we start dropping non-essential + /// messages in an attempt to let it reduce? + #[structopt(long)] + aggregator_queue_len: Option, } fn main() { @@ -110,7 +115,15 @@ fn main() { /// Declare our routes and start the server. async fn start_server(num_aggregators: usize, opts: Opts) -> anyhow::Result<()> { - let aggregator = AggregatorSet::spawn(num_aggregators, opts.denylist).await?; + let aggregator_queue_len = opts.aggregator_queue_len.unwrap_or(10_000); + let aggregator = AggregatorSet::spawn( + num_aggregators, + AggregatorOpts { + max_queue_len: aggregator_queue_len, + denylist: opts.denylist, + }, + ) + .await?; let socket_addr = opts.socket; let feed_timeout = opts.feed_timeout;