diff --git a/backend/Cargo.lock b/backend/Cargo.lock index 2536d50..9de9792 100644 --- a/backend/Cargo.lock +++ b/backend/Cargo.lock @@ -202,6 +202,7 @@ dependencies = [ "bimap", "bincode", "bytes", + "flume", "fnv", "futures", "hex", @@ -392,6 +393,19 @@ dependencies = [ "static_assertions", ] +[[package]] +name = "flume" +version = "0.10.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e90cc80fad5bb391b38127896b0fa27d97e7fef74742797f4da518d67e1292f" +dependencies = [ + "futures-core", + "futures-sink", + "nanorand", + "pin-project", + "spinning_top", +] + [[package]] name = "fnv" version = "1.0.7" @@ -540,8 +554,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7fcd999463524c52659517fe2cea98493cfe485d10565e7b0fb07dbba7ad2753" dependencies = [ "cfg-if", + "js-sys", "libc", "wasi", + "wasm-bindgen", ] [[package]] @@ -827,6 +843,15 @@ dependencies = [ "winapi", ] +[[package]] +name = "nanorand" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "729eb334247daa1803e0a094d0a5c55711b85571179f5ec6e53eccfdf7008958" +dependencies = [ + "getrandom", +] + [[package]] name = "native-tls" version = "0.2.7" @@ -977,6 +1002,26 @@ version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e" +[[package]] +name = "pin-project" +version = "1.0.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "576bc800220cc65dac09e99e97b08b358cfab6e17078de8dc5fee223bd2d0c08" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.0.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e8fe8163d14ce7f0cdac2e040116f22eac817edabff0be91e8aff7e9accf389" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "pin-project-lite" version = "0.2.7" @@ -1458,6 +1503,15 @@ dependencies = [ "sha-1", ] +[[package]] +name = "spinning_top" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75adad84ee84b521fb2cca2d4fd0f1dab1d8d026bda3c5bea4ca63b5f9f9293c" +dependencies = [ + "lock_api", +] + [[package]] name = "static_assertions" version = "1.1.0" @@ -1521,6 +1575,7 @@ dependencies = [ "bytes", "common", "criterion", + "flume", "futures", "hex", "http", @@ -1553,6 +1608,7 @@ dependencies = [ "anyhow", "bincode", "common", + "flume", "futures", "hex", "http", @@ -1590,6 +1646,7 @@ version = "0.1.0" dependencies = [ "anyhow", "common", + "flume", "futures", "http", "log", diff --git a/backend/common/Cargo.toml b/backend/common/Cargo.toml index a6fcd58..a0ccdfa 100644 --- a/backend/common/Cargo.toml +++ b/backend/common/Cargo.toml @@ -10,6 +10,7 @@ anyhow = "1.0.42" base64 = { default-features = false, features = ["alloc"], version = "0.13" } bimap = "0.6.1" bytes = "1.0.1" +flume = "0.10.8" fnv = "1.0.7" futures = "0.3.15" hex = "0.4.3" diff --git a/backend/common/src/dense_map.rs b/backend/common/src/dense_map.rs index f3ee90f..24bc60d 100644 --- a/backend/common/src/dense_map.rs +++ b/backend/common/src/dense_map.rs @@ -131,3 +131,35 @@ where } } } + +#[cfg(test)] +mod test { + + use super::*; + + #[test] + fn len_doesnt_panic_if_lots_of_ids_are_retired() { + let mut map = DenseMap::::new(); + + let id1 = map.add(1); + let id2 = map.add(2); + let id3 = map.add(3); + + assert_eq!(map.len(), 3); + + map.remove(id1); + map.remove(id2); + + assert_eq!(map.len(), 1); + + map.remove(id3); + + assert_eq!(map.len(), 0); + + map.remove(id1); + map.remove(id1); + map.remove(id1); + + assert_eq!(map.len(), 0); + } +} diff --git a/backend/common/src/ws_client/connect.rs b/backend/common/src/ws_client/connect.rs index 0ec6256..787a88b 100644 --- a/backend/common/src/ws_client/connect.rs +++ b/backend/common/src/ws_client/connect.rs @@ -14,8 +14,6 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . use super::on_close::OnClose; -use futures::channel::mpsc; -use futures::{SinkExt, StreamExt}; use soketto::handshake::{Client, ServerResponse}; use std::sync::Arc; use tokio::net::TcpStream; @@ -73,7 +71,7 @@ impl Connection { let mut rx_closed2 = tx_closed1.subscribe(); // Receive messages from the socket: - let (mut tx_to_external, rx_from_ws) = mpsc::unbounded(); + let (tx_to_external, rx_from_ws) = flume::unbounded(); tokio::spawn(async move { let mut send_to_external = true; loop { @@ -112,7 +110,7 @@ impl Connection { .map_err(|e| e.into()), }; - if let Err(e) = tx_to_external.send(msg).await { + if let Err(e) = tx_to_external.send_async(msg).await { // Our external channel may have closed or errored, but the socket hasn't // been closed, so keep receiving in order to allow the socket to continue to // function properly (we may be happy just sending messages to it), but stop @@ -124,12 +122,12 @@ impl Connection { }); // Send messages to the socket: - let (tx_to_ws, mut rx_from_external) = mpsc::unbounded(); + let (tx_to_ws, rx_from_external) = flume::unbounded::(); tokio::spawn(async move { loop { // Wait for messages, or bail entirely if asked to close. let msg = tokio::select! { - msg = rx_from_external.next() => { msg }, + msg = rx_from_external.recv_async() => { msg }, _ = rx_closed2.recv() => { // attempt to gracefully end the connection. let _ = ws_to_connection.close().await; @@ -141,8 +139,8 @@ impl Connection { // needs to keep receiving data for the WS connection to stay open, there's no // reason to keep this side of the loop open if our channel is closed. let msg = match msg { - None => break, - Some(msg) => msg, + Ok(msg) => msg, + _ => break, }; // We don't explicitly shut down the channel if we hit send errors. Why? Because the @@ -207,7 +205,7 @@ impl Connection { closer: Arc::clone(&on_close), }, Receiver { - inner: rx_from_ws, + inner: rx_from_ws.into_stream(), closer: on_close, }, ) diff --git a/backend/common/src/ws_client/receiver.rs b/backend/common/src/ws_client/receiver.rs index 6f45da7..e1e2397 100644 --- a/backend/common/src/ws_client/receiver.rs +++ b/backend/common/src/ws_client/receiver.rs @@ -15,13 +15,12 @@ // along with this program. If not, see . use super::on_close::OnClose; -use futures::channel::mpsc; use futures::{Stream, StreamExt}; use std::sync::Arc; /// Receive messages out of a connection pub struct Receiver { - pub(super) inner: mpsc::UnboundedReceiver>, + pub(super) inner: flume::r#async::RecvStream<'static, Result>, pub(super) closer: Arc, } diff --git a/backend/common/src/ws_client/sender.rs b/backend/common/src/ws_client/sender.rs index 45c3d66..b81529c 100644 --- a/backend/common/src/ws_client/sender.rs +++ b/backend/common/src/ws_client/sender.rs @@ -15,8 +15,6 @@ // along with this program. If not, see . use super::on_close::OnClose; -use futures::channel::mpsc; -use futures::{Sink, SinkExt}; use std::sync::Arc; /// A message that can be sent into the channel interface @@ -41,62 +39,38 @@ pub enum SentMessage { /// Send messages into the connection #[derive(Clone)] pub struct Sender { - pub(super) inner: mpsc::UnboundedSender, + pub(super) inner: flume::Sender, pub(super) closer: Arc, } impl Sender { /// Ask the underlying Websocket connection to close. - pub async fn close(&mut self) -> Result<(), SendError> { + pub async fn close(&mut self) -> Result<(), SendError> { self.closer.0.send(()).map_err(|_| SendError::CloseError)?; Ok(()) } /// Returns whether this channel is closed. pub fn is_closed(&self) -> bool { - self.inner.is_closed() + self.inner.is_disconnected() } /// Unbounded send will always queue the message and doesn't /// need to be awaited. - pub fn unbounded_send(&self, msg: SentMessage) -> Result<(), SendError> { - self.inner - .unbounded_send(msg) - .map_err(|e| e.into_send_error())?; + pub fn unbounded_send(&self, msg: SentMessage) -> Result<(), flume::SendError> { + self.inner.send(msg)?; Ok(()) } + /// Convert this sender into a Sink + pub fn into_sink( + self, + ) -> impl futures::Sink + std::marker::Unpin + Clone + 'static { + self.inner.into_sink() + } } #[derive(thiserror::Error, Debug, Clone)] -pub enum SendError { +pub enum SendError { #[error("Failed to send message: {0}")] - ChannelError(#[from] mpsc::SendError), + ChannelError(#[from] flume::SendError), #[error("Failed to send close message")] CloseError, } - -impl Sink for Sender { - type Error = SendError; - fn poll_ready( - mut self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - self.inner.poll_ready_unpin(cx).map_err(|e| e.into()) - } - fn start_send( - mut self: std::pin::Pin<&mut Self>, - item: SentMessage, - ) -> Result<(), Self::Error> { - self.inner.start_send_unpin(item).map_err(|e| e.into()) - } - fn poll_flush( - mut self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - self.inner.poll_flush_unpin(cx).map_err(|e| e.into()) - } - fn poll_close( - mut self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - self.inner.poll_close_unpin(cx).map_err(|e| e.into()) - } -} diff --git a/backend/telemetry_core/Cargo.toml b/backend/telemetry_core/Cargo.toml index 495b460..1917d8a 100644 --- a/backend/telemetry_core/Cargo.toml +++ b/backend/telemetry_core/Cargo.toml @@ -11,6 +11,7 @@ bimap = "0.6.1" bincode = "1.3.3" bytes = "1.0.1" common = { path = "../common" } +flume = "0.10.8" futures = "0.3.15" hex = "0.4.3" http = "0.2.4" diff --git a/backend/telemetry_core/src/aggregator/aggregator.rs b/backend/telemetry_core/src/aggregator/aggregator.rs index 7c975ee..c172680 100644 --- a/backend/telemetry_core/src/aggregator/aggregator.rs +++ b/backend/telemetry_core/src/aggregator/aggregator.rs @@ -18,7 +18,6 @@ use super::inner_loop; use crate::find_location::find_location; use crate::state::NodeId; use common::id_type; -use futures::channel::mpsc; use futures::{future, Sink, SinkExt}; use std::net::Ipv4Addr; use std::sync::atomic::AtomicU64; @@ -34,6 +33,16 @@ id_type! { #[derive(Clone)] pub struct Aggregator(Arc); +/// Options to configure the aggregator loop(s) +#[derive(Debug, Clone)] +pub struct AggregatorOpts { + /// Any node from these chains is muted + pub denylist: Vec, + /// If our incoming message queue exceeds this length, we start + /// dropping non-essential messages. + pub max_queue_len: usize, +} + struct AggregatorInternal { /// Shards that connect are each assigned a unique connection ID. /// This helps us know who to send messages back to (especially in @@ -44,26 +53,28 @@ struct AggregatorInternal { /// Send messages in to the aggregator from the outside via this. This is /// stored here so that anybody holding an `Aggregator` handle can /// make use of it. - tx_to_aggregator: mpsc::UnboundedSender, + tx_to_aggregator: flume::Sender, } impl Aggregator { /// Spawn a new Aggregator. This connects to the telemetry backend - pub async fn spawn(denylist: Vec) -> anyhow::Result { - let (tx_to_aggregator, rx_from_external) = mpsc::unbounded(); + pub async fn spawn(opts: AggregatorOpts) -> anyhow::Result { + let (tx_to_aggregator, rx_from_external) = flume::unbounded(); // Kick off a locator task to locate nodes, which hands back a channel to make location requests - let tx_to_locator = find_location(tx_to_aggregator.clone().with(|(node_id, msg)| { - future::ok::<_, mpsc::SendError>(inner_loop::ToAggregator::FromFindLocation( - node_id, msg, - )) - })); + let tx_to_locator = + find_location(tx_to_aggregator.clone().into_sink().with(|(node_id, msg)| { + future::ok::<_, flume::SendError<_>>(inner_loop::ToAggregator::FromFindLocation( + node_id, msg, + )) + })); // Handle any incoming messages in our handler loop: tokio::spawn(Aggregator::handle_messages( rx_from_external, tx_to_locator, - denylist, + opts.max_queue_len, + opts.denylist, )); // Return a handle to our aggregator: @@ -74,19 +85,31 @@ impl Aggregator { }))) } - // This is spawned into a separate task and handles any messages coming - // in to the aggregator. If nobody is tolding the tx side of the channel - // any more, this task will gracefully end. + /// This is spawned into a separate task and handles any messages coming + /// in to the aggregator. If nobody is holding the tx side of the channel + /// any more, this task will gracefully end. async fn handle_messages( - rx_from_external: mpsc::UnboundedReceiver, - tx_to_aggregator: mpsc::UnboundedSender<(NodeId, Ipv4Addr)>, + rx_from_external: flume::Receiver, + tx_to_aggregator: flume::Sender<(NodeId, Ipv4Addr)>, + max_queue_len: usize, denylist: Vec, ) { - inner_loop::InnerLoop::new(rx_from_external, tx_to_aggregator, denylist) - .handle() + inner_loop::InnerLoop::new(tx_to_aggregator, denylist, max_queue_len) + .handle(rx_from_external) .await; } + /// Gather metrics from our aggregator loop + pub async fn gather_metrics(&self) -> anyhow::Result { + let (tx, rx) = flume::unbounded(); + let msg = inner_loop::ToAggregator::GatherMetrics(tx); + + self.0.tx_to_aggregator.send_async(msg).await?; + + let metrics = rx.recv_async().await?; + Ok(metrics) + } + /// Return a sink that a shard can send messages into to be handled by the aggregator. pub fn subscribe_shard( &self, @@ -102,7 +125,7 @@ impl Aggregator { // Calling `send` on this Sink requires Unpin. There may be a nicer way than this, // but pinning by boxing is the easy solution for now: - Box::pin(tx_to_aggregator.with(move |msg| async move { + Box::pin(tx_to_aggregator.into_sink().with(move |msg| async move { Ok(inner_loop::ToAggregator::FromShardWebsocket( shard_conn_id.into(), msg, @@ -129,7 +152,7 @@ impl Aggregator { // but pinning by boxing is the easy solution for now: ( feed_conn_id, - Box::pin(tx_to_aggregator.with(move |msg| async move { + Box::pin(tx_to_aggregator.into_sink().with(move |msg| async move { Ok(inner_loop::ToAggregator::FromFeedWebsocket( feed_conn_id.into(), msg, diff --git a/backend/telemetry_core/src/aggregator/aggregator_set.rs b/backend/telemetry_core/src/aggregator/aggregator_set.rs index 8ebfa25..c5a324d 100644 --- a/backend/telemetry_core/src/aggregator/aggregator_set.rs +++ b/backend/telemetry_core/src/aggregator/aggregator_set.rs @@ -1,10 +1,10 @@ -use super::aggregator::Aggregator; +use super::aggregator::{Aggregator, AggregatorOpts}; use super::inner_loop; use common::EitherSink; -use futures::{Sink, SinkExt, StreamExt}; -use inner_loop::FromShardWebsocket; +use futures::{Sink, SinkExt}; +use inner_loop::{FromShardWebsocket, Metrics}; use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; #[derive(Clone)] pub struct AggregatorSet(Arc); @@ -12,25 +12,74 @@ pub struct AggregatorSet(Arc); pub struct AggregatorSetInner { aggregators: Vec, next_idx: AtomicUsize, + metrics: Mutex>, } impl AggregatorSet { /// Spawn the number of aggregators we're asked to. pub async fn spawn( num_aggregators: usize, - denylist: Vec, + opts: AggregatorOpts, ) -> anyhow::Result { assert_ne!(num_aggregators, 0, "You must have 1 or more aggregator"); let aggregators = futures::future::try_join_all( - (0..num_aggregators).map(|_| Aggregator::spawn(denylist.clone())), + (0..num_aggregators).map(|_| Aggregator::spawn(opts.clone())), ) .await?; - Ok(AggregatorSet(Arc::new(AggregatorSetInner { + let initial_metrics = (0..num_aggregators).map(|_| Metrics::default()).collect(); + + let this = AggregatorSet(Arc::new(AggregatorSetInner { aggregators, next_idx: AtomicUsize::new(0), - }))) + metrics: Mutex::new(initial_metrics), + })); + + // Start asking for metrics: + this.spawn_metrics_loops(); + + Ok(this) + } + + /// Spawn loops which periodically ask for metrics from each internal aggregator. + /// Depending on how busy the aggregators are, these metrics won't necessarily be in + /// sync with each other. + fn spawn_metrics_loops(&self) { + let aggregators = self.0.aggregators.clone(); + for (idx, a) in aggregators.into_iter().enumerate() { + let inner = Arc::clone(&self.0); + tokio::spawn(async move { + loop { + let now = tokio::time::Instant::now(); + let metrics = match a.gather_metrics().await { + Ok(metrics) => metrics, + // Any error here is unlikely and probably means that the aggregator + // loop has failed completely. + Err(e) => { + log::error!("Error obtaining metrics (bailing): {}", e); + return; + } + }; + + // Lock, update the stored metrics and drop the lock immediately. + // We discard any error; if something went wrong talking to the inner loop, + // it's probably a fatal error + { + inner.metrics.lock().unwrap()[idx] = metrics; + } + + // Sleep *at least* 10 seconds. If it takes a while to get metrics back, we'll + // end up waiting longer between requests. + tokio::time::sleep_until(now + tokio::time::Duration::from_secs(10)).await; + } + }); + } + } + + /// Return the latest metrics we've gathered so far from each internal aggregator. + pub fn latest_metrics(&self) -> Vec { + self.0.metrics.lock().unwrap().clone() } /// Return a sink that a shard can send messages into to be handled by all aggregators. @@ -52,10 +101,11 @@ impl AggregatorSet { .map(|a| a.subscribe_shard()) .collect(); + let (tx, rx) = flume::unbounded::(); + // Send every incoming message to all aggregators. - let (tx, mut rx) = futures::channel::mpsc::unbounded::(); tokio::spawn(async move { - while let Some(msg) = rx.next().await { + while let Ok(msg) = rx.recv_async().await { for conn in &mut conns { // Unbounded channel under the hood, so this await // shouldn't ever need to yield. @@ -67,7 +117,7 @@ impl AggregatorSet { } }); - EitherSink::b(tx.sink_map_err(|e| anyhow::anyhow!("{}", e))) + EitherSink::b(tx.into_sink().sink_map_err(|e| anyhow::anyhow!("{}", e))) } /// Return a sink that a feed can send messages into to be handled by a single aggregator. diff --git a/backend/telemetry_core/src/aggregator/inner_loop.rs b/backend/telemetry_core/src/aggregator/inner_loop.rs index 6cfec73..357137d 100644 --- a/backend/telemetry_core/src/aggregator/inner_loop.rs +++ b/backend/telemetry_core/src/aggregator/inner_loop.rs @@ -25,9 +25,11 @@ use common::{ node_types::BlockHash, time, }; -use futures::channel::mpsc; -use futures::StreamExt; use std::collections::{HashMap, HashSet}; +use std::sync::{ + atomic::{AtomicU64, Ordering}, + Arc, +}; use std::{ net::{IpAddr, Ipv4Addr}, str::FromStr, @@ -39,6 +41,9 @@ pub enum ToAggregator { FromShardWebsocket(ConnId, FromShardWebsocket), FromFeedWebsocket(ConnId, FromFeedWebsocket), FromFindLocation(NodeId, find_location::Location), + /// Hand back some metrics. The provided sender is expected not to block when + /// a message is sent into it. + GatherMetrics(flume::Sender), } /// An incoming shard connection can send these messages to the aggregator. @@ -47,7 +52,7 @@ pub enum FromShardWebsocket { /// When the socket is opened, it'll send this first /// so that we have a way to communicate back to it. Initialize { - channel: mpsc::UnboundedSender, + channel: flume::Sender, }, /// Tell the aggregator about a new node. Add { @@ -85,7 +90,7 @@ pub enum FromFeedWebsocket { /// Unbounded so that slow feeds don't block aggregato /// progress. Initialize { - channel: mpsc::UnboundedSender, + channel: flume::Sender, }, /// The feed can subscribe to a chain to receive /// messages relating to it. @@ -100,6 +105,32 @@ pub enum FromFeedWebsocket { Disconnected, } +/// A set of metrics returned when we ask for metrics +#[derive(Clone, Debug, Default)] +pub struct Metrics { + /// When in unix MS from epoch were these metrics obtained + pub timestamp_unix_ms: u64, + /// How many chains are feeds currently subscribed to. + pub chains_subscribed_to: usize, + /// Number of subscribed feeds. + pub subscribed_feeds: usize, + /// Number of subscribed feeds that also asked for finality information. + pub subscribed_finality_feeds: usize, + /// How many messages are currently queued up in internal channels + /// waiting to be sent out to feeds. + pub total_messages_to_feeds: usize, + /// How many messages are queued waiting to be handled by this aggregator. + pub total_messages_to_aggregator: usize, + /// How many (non-critical) messages have been dropped by the aggregator because it was overwhelmed. + pub dropped_messages_to_aggregator: u64, + /// How many nodes are currently known to this aggregator. + pub connected_nodes: usize, + /// How many feeds are currently connected to this aggregator. + pub connected_feeds: usize, + /// How many shards are currently connected to this aggregator. + pub connected_shards: usize, +} + // The frontend sends text based commands; parse them into these messages: impl FromStr for FromFeedWebsocket { type Err = anyhow::Error; @@ -127,9 +158,6 @@ pub enum ToFeedWebsocket { /// Instances of this are responsible for handling incoming and /// outgoing messages in the main aggregator loop. pub struct InnerLoop { - /// Messages from the outside world come into this: - rx_from_external: mpsc::UnboundedReceiver, - /// The state of our chains and nodes lives here: node_state: State, /// We maintain a mapping between NodeId and ConnId+LocalId, so that we know @@ -137,9 +165,9 @@ pub struct InnerLoop { node_ids: BiMap, /// Keep track of how to send messages out to feeds. - feed_channels: HashMap>, + feed_channels: HashMap>, /// Keep track of how to send messages out to shards. - shard_channels: HashMap>, + shard_channels: HashMap>, /// Which chain is a feed subscribed to? /// Feed Connection ID -> Chain Genesis Hash @@ -152,18 +180,21 @@ pub struct InnerLoop { feed_conn_id_finality: HashSet, /// Send messages here to make geographical location requests. - tx_to_locator: mpsc::UnboundedSender<(NodeId, Ipv4Addr)>, + tx_to_locator: flume::Sender<(NodeId, Ipv4Addr)>, + + /// How big can the queue of messages coming in to the aggregator get before messages + /// are prioritised and dropped to try and get back on track. + max_queue_len: usize, } impl InnerLoop { /// Create a new inner loop handler with the various state it needs. pub fn new( - rx_from_external: mpsc::UnboundedReceiver, - tx_to_locator: mpsc::UnboundedSender<(NodeId, Ipv4Addr)>, + tx_to_locator: flume::Sender<(NodeId, Ipv4Addr)>, denylist: Vec, + max_queue_len: usize, ) -> Self { InnerLoop { - rx_from_external, node_state: State::new(denylist), node_ids: BiMap::new(), feed_channels: HashMap::new(), @@ -172,28 +203,97 @@ impl InnerLoop { chain_to_feed_conn_ids: HashMap::new(), feed_conn_id_finality: HashSet::new(), tx_to_locator, + max_queue_len, } } - /// Start handling and responding to incoming messages. Owing to unbounded channels, we actually - /// only have a single `.await` (in this function). This helps to make it clear that the aggregator loop - /// will be able to make progress quickly without any potential yield points. - pub async fn handle(mut self) { - while let Some(msg) = self.rx_from_external.next().await { - match msg { - ToAggregator::FromFeedWebsocket(feed_conn_id, msg) => { - self.handle_from_feed(feed_conn_id, msg) - } - ToAggregator::FromShardWebsocket(shard_conn_id, msg) => { - self.handle_from_shard(shard_conn_id, msg) - } - ToAggregator::FromFindLocation(node_id, location) => { - self.handle_from_find_location(node_id, location) + /// Start handling and responding to incoming messages. + pub async fn handle(mut self, rx_from_external: flume::Receiver) { + let max_queue_len = self.max_queue_len; + let (metered_tx, metered_rx) = flume::unbounded(); + + // Keep count of the number of messages we drop for the sake of metric reporting + let dropped_messages = Arc::new(AtomicU64::new(0)); + + // Actually handle all of our messages, but before we get here, we + // check the length of the queue below to decide whether or not to + // pass the message on to this. + let dropped_messages2 = Arc::clone(&dropped_messages); + tokio::spawn(async move { + while let Ok(msg) = metered_rx.recv_async().await { + match msg { + ToAggregator::FromFeedWebsocket(feed_conn_id, msg) => { + self.handle_from_feed(feed_conn_id, msg) + } + ToAggregator::FromShardWebsocket(shard_conn_id, msg) => { + self.handle_from_shard(shard_conn_id, msg) + } + ToAggregator::FromFindLocation(node_id, location) => { + self.handle_from_find_location(node_id, location) + } + ToAggregator::GatherMetrics(tx) => self.handle_gather_metrics( + tx, + metered_rx.len(), + dropped_messages2.load(Ordering::Relaxed), + ), } } + }); + + while let Ok(msg) = rx_from_external.recv_async().await { + // ignore node updates if we have too many messages to handle, in an attempt + // to reduce the queue length back to something reasonable, lest it get out of + // control and start consuming a load of memory. + if metered_tx.len() > max_queue_len { + if matches!( + msg, + ToAggregator::FromShardWebsocket(.., FromShardWebsocket::Update { .. }) + ) { + // Note: this wraps on overflow (which is probably the best + // behaviour for graphing it anyway) + dropped_messages.fetch_add(1, Ordering::Relaxed); + continue; + } + } + + if let Err(e) = metered_tx.send(msg) { + log::error!("Cannot send message into aggregator: {}", e); + break; + } } } + /// Gather and return some metrics. + fn handle_gather_metrics( + &mut self, + rx: flume::Sender, + total_messages_to_aggregator: usize, + dropped_messages_to_aggregator: u64, + ) { + let timestamp_unix_ms = time::now(); + let connected_nodes = self.node_ids.len(); + let subscribed_feeds = self.feed_conn_id_to_chain.len(); + let chains_subscribed_to = self.chain_to_feed_conn_ids.len(); + let subscribed_finality_feeds = self.feed_conn_id_finality.len(); + let connected_shards = self.shard_channels.len(); + let connected_feeds = self.feed_channels.len(); + let total_messages_to_feeds: usize = self.feed_channels.values().map(|c| c.len()).sum(); + + // Ignore error sending; assume the receiver stopped caring and dropped the channel: + let _ = rx.send(Metrics { + timestamp_unix_ms, + chains_subscribed_to, + subscribed_feeds, + subscribed_finality_feeds, + total_messages_to_feeds, + total_messages_to_aggregator, + dropped_messages_to_aggregator, + connected_nodes, + connected_feeds, + connected_shards, + }); + } + /// Handle messages that come from the node geographical locator. fn handle_from_find_location(&mut self, node_id: NodeId, location: find_location::Location) { self.node_state @@ -237,7 +337,7 @@ impl InnerLoop { match self.node_state.add_node(genesis_hash, node) { state::AddNodeResult::ChainOnDenyList => { if let Some(shard_conn) = self.shard_channels.get_mut(&shard_conn_id) { - let _ = shard_conn.unbounded_send(ToShardWebsocket::Mute { + let _ = shard_conn.send(ToShardWebsocket::Mute { local_id, reason: MuteReason::ChainNotAllowed, }); @@ -245,7 +345,7 @@ impl InnerLoop { } state::AddNodeResult::ChainOverQuota => { if let Some(shard_conn) = self.shard_channels.get_mut(&shard_conn_id) { - let _ = shard_conn.unbounded_send(ToShardWebsocket::Mute { + let _ = shard_conn.send(ToShardWebsocket::Mute { local_id, reason: MuteReason::Overquota, }); @@ -286,7 +386,7 @@ impl InnerLoop { // Ask for the grographical location of the node. // Currently we only geographically locate IPV4 addresses so ignore IPV6. if let IpAddr::V4(ip_v4) = ip { - let _ = self.tx_to_locator.unbounded_send((node_id, ip_v4)); + let _ = self.tx_to_locator.send((node_id, ip_v4)); } } } @@ -369,7 +469,7 @@ impl InnerLoop { // Send this to the channel that subscribed: if let Some(bytes) = feed_serializer.into_finalized() { - let _ = channel.unbounded_send(ToFeedWebsocket::Bytes(bytes)); + let _ = channel.send(ToFeedWebsocket::Bytes(bytes)); } } FromFeedWebsocket::Ping { value } => { @@ -382,7 +482,7 @@ impl InnerLoop { let mut feed_serializer = FeedMessageSerializer::new(); feed_serializer.push(feed_message::Pong(&value)); if let Some(bytes) = feed_serializer.into_finalized() { - let _ = feed_channel.unbounded_send(ToFeedWebsocket::Bytes(bytes)); + let _ = feed_channel.send(ToFeedWebsocket::Bytes(bytes)); } } FromFeedWebsocket::Subscribe { chain } => { @@ -430,7 +530,7 @@ impl InnerLoop { new_chain.finalized_block().hash, )); if let Some(bytes) = feed_serializer.into_finalized() { - let _ = feed_channel.unbounded_send(ToFeedWebsocket::Bytes(bytes)); + let _ = feed_channel.send(ToFeedWebsocket::Bytes(bytes)); } // If many (eg 10k) nodes are connected, serializing all of their info takes time. @@ -465,7 +565,7 @@ impl InnerLoop { }) .collect(); for bytes in all_feed_messages { - let _ = feed_channel.unbounded_send(ToFeedWebsocket::Bytes(bytes)); + let _ = feed_channel.send(ToFeedWebsocket::Bytes(bytes)); } // Actually make a note of the new chain subsciption: @@ -580,7 +680,7 @@ impl InnerLoop { if let Some(feeds) = self.chain_to_feed_conn_ids.get(genesis_hash) { for &feed_id in feeds { if let Some(chan) = self.feed_channels.get_mut(&feed_id) { - let _ = chan.unbounded_send(message.clone()); + let _ = chan.send(message.clone()); } } } @@ -596,7 +696,7 @@ impl InnerLoop { /// Send a message to everybody. fn broadcast_to_all_feeds(&mut self, message: ToFeedWebsocket) { for chan in self.feed_channels.values_mut() { - let _ = chan.unbounded_send(message.clone()); + let _ = chan.send(message.clone()); } } @@ -622,7 +722,7 @@ impl InnerLoop { // are also subscribed to receive finality updates. for &feed_id in feeds.union(&self.feed_conn_id_finality) { if let Some(chan) = self.feed_channels.get_mut(&feed_id) { - let _ = chan.unbounded_send(message.clone()); + let _ = chan.send(message.clone()); } } } diff --git a/backend/telemetry_core/src/aggregator/mod.rs b/backend/telemetry_core/src/aggregator/mod.rs index 2865ed9..9caab51 100644 --- a/backend/telemetry_core/src/aggregator/mod.rs +++ b/backend/telemetry_core/src/aggregator/mod.rs @@ -19,6 +19,7 @@ mod aggregator_set; mod inner_loop; // Expose the various message types that can be worked with externally: +pub use aggregator::AggregatorOpts; pub use inner_loop::{FromFeedWebsocket, FromShardWebsocket, ToFeedWebsocket, ToShardWebsocket}; pub use aggregator_set::*; diff --git a/backend/telemetry_core/src/find_location.rs b/backend/telemetry_core/src/find_location.rs index 680a123..f020747 100644 --- a/backend/telemetry_core/src/find_location.rs +++ b/backend/telemetry_core/src/find_location.rs @@ -17,8 +17,7 @@ use std::net::Ipv4Addr; use std::sync::Arc; -use futures::channel::mpsc; -use futures::{Sink, SinkExt, StreamExt}; +use futures::{Sink, SinkExt}; use parking_lot::RwLock; use rustc_hash::FxHashMap; use serde::Deserialize; @@ -31,12 +30,12 @@ pub type Location = Option>; /// This is responsible for taking an IP address and attempting /// to find a geographical location from this -pub fn find_location(response_chan: R) -> mpsc::UnboundedSender<(Id, Ipv4Addr)> +pub fn find_location(response_chan: R) -> flume::Sender<(Id, Ipv4Addr)> where R: Sink<(Id, Option>)> + Unpin + Send + Clone + 'static, Id: Clone + Send + 'static, { - let (tx, mut rx) = mpsc::unbounded(); + let (tx, rx) = flume::unbounded(); // cache entries let mut cache: FxHashMap>> = FxHashMap::default(); @@ -61,7 +60,7 @@ where let semaphore = Arc::new(Semaphore::new(4)); loop { - while let Some((id, ip_address)) = rx.next().await { + while let Ok((id, ip_address)) = rx.recv_async().await { let permit = semaphore.clone().acquire_owned().await.unwrap(); let mut response_chan = response_chan.clone(); let locator = locator.clone(); diff --git a/backend/telemetry_core/src/main.rs b/backend/telemetry_core/src/main.rs index e7f1414..1508daa 100644 --- a/backend/telemetry_core/src/main.rs +++ b/backend/telemetry_core/src/main.rs @@ -22,13 +22,14 @@ use std::str::FromStr; use tokio::time::{Duration, Instant}; use aggregator::{ - AggregatorSet, FromFeedWebsocket, FromShardWebsocket, ToFeedWebsocket, ToShardWebsocket, + AggregatorOpts, AggregatorSet, FromFeedWebsocket, FromShardWebsocket, ToFeedWebsocket, + ToShardWebsocket, }; use bincode::Options; use common::http_utils; use common::internal_messages; use common::ready_chunks_all::ReadyChunksAll; -use futures::{channel::mpsc, SinkExt, StreamExt}; +use futures::{SinkExt, StreamExt}; use hyper::{Method, Response}; use simple_logger::SimpleLogger; use structopt::StructOpt; @@ -67,6 +68,10 @@ struct Opts { /// aggregators. #[structopt(long)] num_aggregators: Option, + /// How big can the message queue for each aggregator grow before we start dropping non-essential + /// messages in an attempt to let it reduce? + #[structopt(long)] + aggregator_queue_len: Option, } fn main() { @@ -110,7 +115,15 @@ fn main() { /// Declare our routes and start the server. async fn start_server(num_aggregators: usize, opts: Opts) -> anyhow::Result<()> { - let aggregator = AggregatorSet::spawn(num_aggregators, opts.denylist).await?; + let aggregator_queue_len = opts.aggregator_queue_len.unwrap_or(10_000); + let aggregator = AggregatorSet::spawn( + num_aggregators, + AggregatorOpts { + max_queue_len: aggregator_queue_len, + denylist: opts.denylist, + }, + ) + .await?; let socket_addr = opts.socket; let feed_timeout = opts.feed_timeout; @@ -166,6 +179,8 @@ async fn start_server(num_aggregators: usize, opts: Opts) -> anyhow::Result<()> }, )) } + // Return metrics in a prometheus-friendly text based format: + (&Method::GET, "/metrics") => Ok(return_prometheus_metrics(aggregator).await), // 404 for anything else: _ => Ok(Response::builder() .status(404) @@ -188,7 +203,8 @@ async fn handle_shard_websocket_connection( where S: futures::Sink + Unpin + Send + 'static, { - let (tx_to_shard_conn, mut rx_from_aggregator) = mpsc::unbounded(); + let (tx_to_shard_conn, rx_from_aggregator) = flume::unbounded(); + let mut rx_from_aggregator = rx_from_aggregator.into_stream(); // Tell the aggregator about this new connection, and give it a way to send messages to us: let init_msg = FromShardWebsocket::Initialize { @@ -330,8 +346,8 @@ where S: futures::Sink + Unpin + Send + 'static, { // unbounded channel so that slow feeds don't block aggregator progress: - let (tx_to_feed_conn, rx_from_aggregator) = mpsc::unbounded(); - let mut rx_from_aggregator_chunks = ReadyChunksAll::new(rx_from_aggregator); + let (tx_to_feed_conn, rx_from_aggregator) = flume::unbounded(); + let mut rx_from_aggregator_chunks = ReadyChunksAll::new(rx_from_aggregator.into_stream()); // Tell the aggregator about this new connection, and give it a way to send messages to us: let init_msg = FromFeedWebsocket::Initialize { @@ -350,7 +366,6 @@ where let recv_handle = tokio::spawn(async move { loop { let mut bytes = Vec::new(); - // Receive a message, or bail if closer called. We don't care about cancel safety; // if we're halfway through receiving a message, no biggie since we're closing the // connection anyway. @@ -466,3 +481,63 @@ where // loop ended; give socket back to parent: (tx_to_aggregator, ws_send) } + +async fn return_prometheus_metrics(aggregator: AggregatorSet) -> Response { + let metrics = aggregator.latest_metrics(); + + // Instead of using the rust prometheus library (which is optimised around global variables updated across a codebase), + // we just split out the text format that prometheus expects ourselves, and use the latest metrics that we've + // captured so far from the aggregators. See: + // + // https://github.com/prometheus/docs/blob/master/content/docs/instrumenting/exposition_formats.md#text-format-details + // + // For an example and explanation of this text based format. The minimal output we produce here seems to + // be handled correctly when pointing a current version of prometheus at it. + // + // Note: '{{' and '}}' are just escaped versions of '{' and '}' in Rust fmt strings. + let mut s = String::new(); + for (idx, m) in metrics.iter().enumerate() { + s.push_str(&format!( + "telemetry_connected_feeds{{aggregator=\"{}\"}} {} {}\n", + idx, m.connected_feeds, m.timestamp_unix_ms + )); + s.push_str(&format!( + "telemetry_connected_nodes{{aggregator=\"{}\"}} {} {}\n", + idx, m.connected_nodes, m.timestamp_unix_ms + )); + s.push_str(&format!( + "telemetry_connected_shards{{aggregator=\"{}\"}} {} {}\n", + idx, m.connected_shards, m.timestamp_unix_ms + )); + s.push_str(&format!( + "telemetry_chains_subscribed_to{{aggregator=\"{}\"}} {} {}\n", + idx, m.chains_subscribed_to, m.timestamp_unix_ms + )); + s.push_str(&format!( + "telemetry_subscribed_feeds{{aggregator=\"{}\"}} {} {}\n", + idx, m.subscribed_feeds, m.timestamp_unix_ms + )); + s.push_str(&format!( + "telemetry_subscribed_finality_feeds{{aggregator=\"{}\"}} {} {}\n", + idx, m.subscribed_finality_feeds, m.timestamp_unix_ms + )); + s.push_str(&format!( + "telemetry_total_messages_to_feeds{{aggregator=\"{}\"}} {} {}\n", + idx, m.total_messages_to_feeds, m.timestamp_unix_ms + )); + s.push_str(&format!( + "telemetry_total_messages_to_aggregator{{aggregator=\"{}\"}} {} {}\n\n", + idx, m.total_messages_to_aggregator, m.timestamp_unix_ms + )); + s.push_str(&format!( + "telemetry_dropped_messages_to_aggregator{{aggregator=\"{}\"}} {} {}\n\n", + idx, m.dropped_messages_to_aggregator, m.timestamp_unix_ms + )); + } + + Response::builder() + // The version number here tells prometheus which version of the text format we're using: + .header(http::header::CONTENT_TYPE, "text/plain; version=0.0.4") + .body(s.into()) + .unwrap() +} diff --git a/backend/telemetry_core/tests/soak_tests.rs b/backend/telemetry_core/tests/soak_tests.rs index 78b3dbe..2aef316 100644 --- a/backend/telemetry_core/tests/soak_tests.rs +++ b/backend/telemetry_core/tests/soak_tests.rs @@ -34,19 +34,17 @@ In general, if you run into issues, it may be better to run this on a linux box; MacOS seems to hit limits quicker in general. */ -use common::node_types::BlockHash; use common::ws_client::SentMessage; use futures::{future, StreamExt}; -use serde_json::json; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use std::time::Duration; use structopt::StructOpt; use test_utils::workspace::{start_server, CoreOpts, ServerOpts, ShardOpts}; -/// A configurable soak_test runner. Configure by providing the expected args as -/// an environment variable. One example to run this test is: +/// A test runner which sends realistic(ish) messages from fake nodes to a telemetry server. /// +/// To start up 4 telemetry_shards and 1 telemetry_core with 10 feeds and 100 nodes: /// ```sh /// SOAK_TEST_ARGS='--feeds 10 --nodes 100 --shards 4' cargo test --release -- soak_test --ignored --nocapture /// ``` @@ -56,211 +54,30 @@ use test_utils::workspace::{start_server, CoreOpts, ServerOpts, ShardOpts}; /// TELEMETRY_BIN=~/old_telemetry_binary SOAK_TEST_ARGS='--feeds 100 --nodes 100 --shards 4' cargo test --release -- soak_test --ignored --nocapture /// ``` /// -/// Or, you can run it against existing processes with something like this: +/// Or, you can run it against existing processes on the network with something like this: /// ```sh /// TELEMETRY_SUBMIT_HOSTS='127.0.0.1:8001' TELEMETRY_FEED_HOST='127.0.0.1:8000' SOAK_TEST_ARGS='--feeds 100 --nodes 100 --shards 4' cargo test --release -- soak_test --ignored --nocapture /// ``` /// -/// Each will establish the same total number of connections and send the same messages. #[ignore] -#[tokio::test(flavor = "multi_thread", worker_threads = 4)] -pub async fn soak_test() { +#[test] +pub fn soak_test() { let opts = get_soak_test_opts(); - run_soak_test(opts).await; -} -/// A general soak test runner. -/// This test sends the same message over and over, and so -/// the results should be pretty reproducible. -async fn run_soak_test(opts: SoakTestOpts) { - let mut server = start_server( - ServerOpts { - release_mode: true, - log_output: opts.log_output, - ..Default::default() - }, - CoreOpts { - worker_threads: opts.core_worker_threads, - ..Default::default() - }, - ShardOpts { - worker_threads: opts.shard_worker_threads, - ..Default::default() - }, - ) - .await; - println!("Telemetry core running at {}", server.get_core().host()); - - // Start up the shards we requested: - let mut shard_ids = vec![]; - for _ in 0..opts.shards { - let shard_id = server.add_shard().await.expect("shard can't be added"); - shard_ids.push(shard_id); - } - - // Connect nodes to each shard: - let mut nodes = vec![]; - for &shard_id in &shard_ids { - let mut conns = server - .get_shard(shard_id) - .unwrap() - .connect_multiple_nodes(opts.nodes) - .await - .expect("node connections failed"); - nodes.append(&mut conns); - } - - // Each node tells the shard about itself: - for (idx, (node_tx, _)) in nodes.iter_mut().enumerate() { - node_tx - .send_json_binary(json!({ - "id":1, // Only needs to be unique per node - "ts":"2021-07-12T10:37:47.714666+01:00", - "payload": { - "authority":true, - "chain": "Polkadot", // <- so that we don't go over quota with lots of nodes. - "config":"", - "genesis_hash": BlockHash::from_low_u64_ne(1), - "implementation":"Substrate Node", - "msg":"system.connected", - "name": format!("Node #{}", idx), - "network_id":"12D3KooWEyoppNCUx8Yx66oV9fJnriXwCcXwDDUA2kj6vnc6iDEp", - "startup_time":"1625565542717", - "version":"2.0.0-07a1af348-aarch64-macos" - }, - })) - .unwrap(); - } - - // Connect feeds to the core: - let mut feeds = server - .get_core() - .connect_multiple_feeds(opts.feeds) - .await - .expect("feed connections failed"); - - // Every feed subscribes to the chain above to recv messages about it: - for (feed_tx, _) in &mut feeds { - feed_tx.send_command("subscribe", "Polkadot").unwrap(); - } - - // Start sending "update" messages from nodes at time intervals. - let bytes_in = Arc::new(AtomicUsize::new(0)); - let bytes_in2 = Arc::clone(&bytes_in); - tokio::task::spawn(async move { - let msg = json!({ - "id":1, - "payload":{ - "bandwidth_download":576, - "bandwidth_upload":576, - "msg":"system.interval", - "peers":1 - }, - "ts":"2021-07-12T10:37:48.330433+01:00" - }); - let msg_bytes: &'static [u8] = Box::new(serde_json::to_vec(&msg).unwrap()).leak(); - - loop { - // every ~1second we aim to have sent messages from all of the nodes. So we cycle through - // the node IDs and send a message from each at roughly 1s / number_of_nodes. - let mut interval = - tokio::time::interval(Duration::from_secs_f64(1.0 / nodes.len() as f64)); - - for node_id in (0..nodes.len()).cycle() { - interval.tick().await; - let node_tx = &mut nodes[node_id].0; - node_tx - .unbounded_send(SentMessage::StaticBinary(msg_bytes)) - .unwrap(); - bytes_in2.fetch_add(msg_bytes.len(), Ordering::Relaxed); - } - } - }); - - // Also start receiving messages, counting the bytes received so far. - let bytes_out = Arc::new(AtomicUsize::new(0)); - let msgs_out = Arc::new(AtomicUsize::new(0)); - for (_, mut feed_rx) in feeds { - let bytes_out = Arc::clone(&bytes_out); - let msgs_out = Arc::clone(&msgs_out); - tokio::task::spawn(async move { - while let Some(msg) = feed_rx.next().await { - let msg = msg.expect("message could be received"); - let num_bytes = msg.len(); - bytes_out.fetch_add(num_bytes, Ordering::Relaxed); - msgs_out.fetch_add(1, Ordering::Relaxed); - } - eprintln!("Error: feed has been closed unexpectedly"); - }); - } - - // Periodically report on bytes out - tokio::task::spawn(async move { - let one_mb = 1024.0 * 1024.0; - let mut last_bytes_in = 0; - let mut last_bytes_out = 0; - let mut last_msgs_out = 0; - let mut n = 1; - loop { - tokio::time::sleep(Duration::from_secs(1)).await; - let bytes_in_val = bytes_in.load(Ordering::Relaxed); - let bytes_out_val = bytes_out.load(Ordering::Relaxed); - let msgs_out_val = msgs_out.load(Ordering::Relaxed); - - println!( - "#{}: MB in/out per measurement: {:.4} / {:.4}, total bytes in/out: {} / {}, msgs out: {}, total msgs out: {})", - n, - (bytes_in_val - last_bytes_in) as f64 / one_mb, - (bytes_out_val - last_bytes_out) as f64 / one_mb, - bytes_in_val, - bytes_out_val, - (msgs_out_val - last_msgs_out), - msgs_out_val - ); - - n += 1; - last_bytes_in = bytes_in_val; - last_bytes_out = bytes_out_val; - last_msgs_out = msgs_out_val; - } - }); - - // Wait forever. - future::pending().await -} - -/// Identical to `soak_test`, except that we try to send realistic messages from fake nodes. -/// This means it's potentially less reproducable, but presents a more accurate picture of -/// the load, and lets us see the UI working more or less. -/// -/// We can provide the same arguments as we would to `soak_test`: -/// -/// ```sh -/// SOAK_TEST_ARGS='--feeds 10 --nodes 100 --shards 4' cargo test --release -- realistic_soak_test --ignored --nocapture -/// ``` -/// -/// You can also run this test against the pre-sharding actix binary with something like this: -/// ```sh -/// TELEMETRY_BIN=~/old_telemetry_binary SOAK_TEST_ARGS='--feeds 100 --nodes 100 --shards 4' cargo test --release -- realistic_soak_test --ignored --nocapture -/// ``` -/// -/// Or, you can run it against existing processes with something like this: -/// ```sh -/// TELEMETRY_SUBMIT_HOSTS='127.0.0.1:8001' TELEMETRY_FEED_HOST='127.0.0.1:8000' SOAK_TEST_ARGS='--feeds 100 --nodes 100 --shards 4' cargo test --release -- realistic_soak_test --ignored --nocapture -/// ``` -/// -#[ignore] -#[tokio::test(flavor = "multi_thread", worker_threads = 4)] -pub async fn realistic_soak_test() { - let opts = get_soak_test_opts(); - run_realistic_soak_test(opts).await; + tokio::runtime::Builder::new_multi_thread() + .worker_threads(opts.test_worker_threads) + .enable_all() + .thread_name("telemetry_test_runner") + .build() + .unwrap() + .block_on(run_soak_test(opts)); } /// A general soak test runner. /// This test sends realistic messages from connected nodes /// so that we can see how things react under more normal /// circumstances -async fn run_realistic_soak_test(opts: SoakTestOpts) { +async fn run_soak_test(opts: SoakTestOpts) { let mut server = start_server( ServerOpts { release_mode: true, @@ -300,30 +117,34 @@ async fn run_realistic_soak_test(opts: SoakTestOpts) { // Start nodes talking to the shards: let bytes_in = Arc::new(AtomicUsize::new(0)); + let ids_per_node = opts.ids_per_node; for node in nodes.into_iter().enumerate() { - let bytes_in = Arc::clone(&bytes_in); - tokio::spawn(async move { - let (idx, (tx, _)) = node; + let (idx, (tx, _)) = node; + for id in 0..ids_per_node { + let bytes_in = Arc::clone(&bytes_in); + let tx = tx.clone(); - let telemetry = test_utils::fake_telemetry::FakeTelemetry::new( - Duration::from_secs(3), - format!("Node {}", idx + 1), - "Polkadot".to_owned(), - idx + 1, - ); + tokio::spawn(async move { + let telemetry = test_utils::fake_telemetry::FakeTelemetry::new( + Duration::from_secs(3), + format!("Node {}", idx + 1), + "Polkadot".to_owned(), + id + 1, + ); - let res = telemetry - .start(|msg| async { - bytes_in.fetch_add(msg.len(), Ordering::Relaxed); - tx.unbounded_send(SentMessage::Binary(msg))?; - Ok::<_, anyhow::Error>(()) - }) - .await; + let res = telemetry + .start(|msg| async { + bytes_in.fetch_add(msg.len(), Ordering::Relaxed); + tx.unbounded_send(SentMessage::Binary(msg))?; + Ok::<_, anyhow::Error>(()) + }) + .await; - if let Err(e) = res { - log::error!("Telemetry Node #{} has died with error: {}", idx, e); - } - }); + if let Err(e) = res { + log::error!("Telemetry Node #{} has died with error: {}", idx, e); + } + }); + } } // Connect feeds to the core: @@ -404,6 +225,9 @@ struct SoakTestOpts { /// The number of nodes to connect to each feed #[structopt(long)] nodes: usize, + /// The number of different virtual nodes to connect per actual node socket connection + #[structopt(long, default_value = "1")] + ids_per_node: usize, /// Number of aggregator loops to use in the core #[structopt(long)] core_num_aggregators: Option, @@ -416,6 +240,9 @@ struct SoakTestOpts { /// Should we log output from the core/shards to stdout? #[structopt(long)] log_output: bool, + /// How many worker threads should the soak test runner use? + #[structopt(long, default_value = "4")] + test_worker_threads: usize, } /// Get soak test args from an envvar and parse them via structopt. diff --git a/backend/telemetry_shard/Cargo.toml b/backend/telemetry_shard/Cargo.toml index 2f9d505..370d739 100644 --- a/backend/telemetry_shard/Cargo.toml +++ b/backend/telemetry_shard/Cargo.toml @@ -9,6 +9,7 @@ license = "GPL-3.0" anyhow = "1.0.41" bincode = "1.3.3" common = { path = "../common" } +flume = "0.10.8" futures = "0.3.15" hex = "0.4.3" http = "0.2.4" diff --git a/backend/telemetry_shard/src/aggregator.rs b/backend/telemetry_shard/src/aggregator.rs index 1200240..50ded1d 100644 --- a/backend/telemetry_shard/src/aggregator.rs +++ b/backend/telemetry_shard/src/aggregator.rs @@ -21,8 +21,7 @@ use common::{ node_types::BlockHash, AssignId, }; -use futures::channel::mpsc; -use futures::{Sink, SinkExt, StreamExt}; +use futures::{Sink, SinkExt}; use std::collections::{HashMap, HashSet}; use std::sync::atomic::AtomicU64; use std::sync::Arc; @@ -60,7 +59,7 @@ pub enum FromWebsocket { /// the websocket connection and force the node to reconnect /// so that it sends its system info again incase the telemetry /// core has restarted. - close_connection: mpsc::Sender<()>, + close_connection: flume::Sender<()>, }, /// Tell the aggregator about a new node. Add { @@ -94,28 +93,28 @@ struct AggregatorInternal { /// Send messages to the aggregator from websockets via this. This is /// stored here so that anybody holding an `Aggregator` handle can /// make use of it. - tx_to_aggregator: mpsc::Sender, + tx_to_aggregator: flume::Sender, } impl Aggregator { /// Spawn a new Aggregator. This connects to the telemetry backend pub async fn spawn(telemetry_uri: http::Uri) -> anyhow::Result { - let (tx_to_aggregator, rx_from_external) = mpsc::channel(10); + let (tx_to_aggregator, rx_from_external) = flume::bounded(10); // Establish a resiliant connection to the core (this retries as needed): - let (tx_to_telemetry_core, mut rx_from_telemetry_core) = + let (tx_to_telemetry_core, rx_from_telemetry_core) = create_ws_connection_to_core(telemetry_uri).await; // Forward messages from the telemetry core into the aggregator: - let mut tx_to_aggregator2 = tx_to_aggregator.clone(); + let tx_to_aggregator2 = tx_to_aggregator.clone(); tokio::spawn(async move { - while let Some(msg) = rx_from_telemetry_core.next().await { + while let Ok(msg) = rx_from_telemetry_core.recv_async().await { let msg_to_aggregator = match msg { Message::Connected => ToAggregator::ConnectedToTelemetryCore, Message::Disconnected => ToAggregator::DisconnectedFromTelemetryCore, Message::Data(data) => ToAggregator::FromTelemetryCore(data), }; - if let Err(_) = tx_to_aggregator2.send(msg_to_aggregator).await { + if let Err(_) = tx_to_aggregator2.send_async(msg_to_aggregator).await { // This will close the ws channels, which themselves log messages. break; } @@ -139,8 +138,8 @@ impl Aggregator { // in to the aggregator. If nobody is holding the tx side of the channel // any more, this task will gracefully end. async fn handle_messages( - mut rx_from_external: mpsc::Receiver, - mut tx_to_telemetry_core: mpsc::Sender, + rx_from_external: flume::Receiver, + tx_to_telemetry_core: flume::Sender, ) { use internal_messages::{FromShardAggregator, FromTelemetryCore}; @@ -150,7 +149,7 @@ impl Aggregator { // A list of close channels for the currently connected substrate nodes. Send an empty // tuple to these to ask the connections to be closed. - let mut close_connections: HashMap> = HashMap::new(); + let mut close_connections: HashMap> = HashMap::new(); // Maintain mappings from the connection ID and node message ID to the "local ID" which we // broadcast to the telemetry core. @@ -160,15 +159,15 @@ impl Aggregator { let mut muted: HashSet = HashSet::new(); // Now, loop and receive messages to handle. - while let Some(msg) = rx_from_external.next().await { + while let Ok(msg) = rx_from_external.recv_async().await { match msg { ToAggregator::ConnectedToTelemetryCore => { // Take hold of the connection closers and run them all. let closers = close_connections; - for (_, mut closer) in closers { + for (_, closer) in closers { // if this fails, it probably means the connection has died already anyway. - let _ = closer.send(()).await; + let _ = closer.send_async(()).await; } // We've told everything to disconnect. Now, reset our state: @@ -212,7 +211,7 @@ impl Aggregator { // Send the message to the telemetry core with this local ID: let _ = tx_to_telemetry_core - .send(FromShardAggregator::AddNode { + .send_async(FromShardAggregator::AddNode { ip, node, genesis_hash, @@ -245,7 +244,7 @@ impl Aggregator { // Send the message to the telemetry core with this local ID: let _ = tx_to_telemetry_core - .send(FromShardAggregator::UpdateNode { local_id, payload }) + .send_async(FromShardAggregator::UpdateNode { local_id, payload }) .await; } ToAggregator::FromWebsocket(disconnected_conn_id, FromWebsocket::Disconnected) => { @@ -264,7 +263,7 @@ impl Aggregator { to_local_id.remove_by_id(local_id); muted.remove(&local_id); let _ = tx_to_telemetry_core - .send(FromShardAggregator::RemoveNode { local_id }) + .send_async(FromShardAggregator::RemoveNode { local_id }) .await; } } @@ -293,6 +292,7 @@ impl Aggregator { // but pinning by boxing is the easy solution for now: Box::pin( tx_to_aggregator + .into_sink() .with(move |msg| async move { Ok(ToAggregator::FromWebsocket(conn_id, msg)) }), ) } diff --git a/backend/telemetry_shard/src/connection.rs b/backend/telemetry_shard/src/connection.rs index 4ebf39d..4b24bee 100644 --- a/backend/telemetry_shard/src/connection.rs +++ b/backend/telemetry_shard/src/connection.rs @@ -16,8 +16,7 @@ use bincode::Options; use common::ws_client; -use futures::channel::mpsc; -use futures::{SinkExt, StreamExt}; +use futures::StreamExt; #[derive(Clone, Debug)] pub enum Message { @@ -36,13 +35,13 @@ pub enum Message { /// between aggregator and core. pub async fn create_ws_connection_to_core( telemetry_uri: http::Uri, -) -> (mpsc::Sender, mpsc::Receiver>) +) -> (flume::Sender, flume::Receiver>) where In: serde::Serialize + Send + 'static, Out: serde::de::DeserializeOwned + Send + 'static, { - let (tx_in, mut rx_in) = mpsc::channel(10); - let (mut tx_out, rx_out) = mpsc::channel(10); + let (tx_in, rx_in) = flume::bounded::(10); + let (tx_out, rx_out) = flume::bounded(10); let mut is_connected = false; @@ -51,7 +50,7 @@ where // Throw away any pending messages from the incoming channel so that it // doesn't get filled up and begin blocking while we're looping and waiting // for a reconnection. - while let Ok(Some(_)) = rx_in.try_next() {} + while let Ok(_) = rx_in.try_recv() {} // Try to connect. If connection established, we serialize and forward messages // to/from the core. If the external channels break, we end for good. If the internal @@ -60,9 +59,9 @@ where Ok(connection) => { let (tx_to_core, mut rx_from_core) = connection.into_channels(); is_connected = true; - let mut tx_out = tx_out.clone(); + let tx_out = tx_out.clone(); - if let Err(e) = tx_out.send(Message::Connected).await { + if let Err(e) = tx_out.send_async(Message::Connected).await { // If receiving end is closed, bail now. log::warn!("Aggregator is no longer receiving messages from core; disconnecting (permanently): {}", e); return; @@ -73,35 +72,31 @@ where tokio::select! { msg = rx_from_core.next() => { let msg = match msg { - Some(msg) => msg, + Some(Ok(msg)) => msg, // No more messages from core? core WS is disconnected. - None => { + _ => { log::warn!("No more messages from core: shutting down connection (will reconnect)"); break } }; let bytes = match msg { - Ok(ws_client::RecvMessage::Binary(bytes)) => bytes, - Ok(ws_client::RecvMessage::Text(s)) => s.into_bytes(), - Err(e) => { - log::warn!("Unable to receive message from core: shutting down connection (will reconnect): {}", e); - break; - } + ws_client::RecvMessage::Binary(bytes) => bytes, + ws_client::RecvMessage::Text(s) => s.into_bytes() }; let msg = bincode::options() .deserialize(&bytes) .expect("internal messages must be deserializable"); - if let Err(e) = tx_out.send(Message::Data(msg)).await { + if let Err(e) = tx_out.send_async(Message::Data(msg)).await { log::error!("Aggregator is no longer receiving messages from core; disconnecting (permanently): {}", e); return; } }, - msg = rx_in.next() => { + msg = rx_in.recv_async() => { let msg = match msg { - Some(msg) => msg, - None => { + Ok(msg) => msg, + Err(flume::RecvError::Disconnected) => { log::error!("Aggregator is no longer sending messages to core; disconnecting (permanently)"); return } @@ -131,7 +126,7 @@ where if is_connected { is_connected = false; - if let Err(e) = tx_out.send(Message::Disconnected).await { + if let Err(e) = tx_out.send_async(Message::Disconnected).await { log::error!("Aggregator is no longer receiving messages from core; disconnecting (permanently): {}", e); return; } diff --git a/backend/telemetry_shard/src/main.rs b/backend/telemetry_shard/src/main.rs index 4e62d76..86f632c 100644 --- a/backend/telemetry_shard/src/main.rs +++ b/backend/telemetry_shard/src/main.rs @@ -29,7 +29,7 @@ use common::byte_size::ByteSize; use common::http_utils; use common::node_message; use common::rolling_total::RollingTotalBuilder; -use futures::{channel::mpsc, SinkExt, StreamExt}; +use futures::SinkExt; use http::Uri; use hyper::{Method, Response}; use simple_logger::SimpleLogger; @@ -203,7 +203,7 @@ where // This could be a oneshot channel, but it's useful to be able to clone // messages, and we can't clone oneshot channel senders. - let (close_connection_tx, mut close_connection_rx) = mpsc::channel(0); + let (close_connection_tx, close_connection_rx) = flume::bounded(1); // Tell the aggregator about this new connection, and give it a way to close this connection: let init_msg = FromWebsocket::Initialize { @@ -223,7 +223,7 @@ where tokio::select! { // The close channel has fired, so end the loop. `ws_recv.receive_data` is // *not* cancel safe, but since we're closing the connection we don't care. - _ = close_connection_rx.next() => { + _ = close_connection_rx.recv_async() => { log::info!("connection to {:?} being closed by aggregator", real_addr); break }, diff --git a/backend/test_utils/Cargo.toml b/backend/test_utils/Cargo.toml index e3800c7..7dccdae 100644 --- a/backend/test_utils/Cargo.toml +++ b/backend/test_utils/Cargo.toml @@ -18,3 +18,4 @@ tokio = { version = "1.7.1", features = ["full"] } tokio-util = { version = "0.6.7", features = ["full"] } common = { path = "../common" } time = { version = "0.3.0", features = ["formatting"] } +flume = "0.10.8" diff --git a/backend/test_utils/src/server/channels.rs b/backend/test_utils/src/server/channels.rs index f130f7e..047de88 100644 --- a/backend/test_utils/src/server/channels.rs +++ b/backend/test_utils/src/server/channels.rs @@ -21,7 +21,7 @@ use std::{ use crate::feed_message_de::FeedMessage; use common::ws_client; -use futures::{Sink, SinkExt, Stream, StreamExt}; +use futures::{Stream, StreamExt}; /// Wrap a `ws_client::Sender` with convenient utility methods for shard connections pub struct ShardSender(ws_client::Sender); @@ -32,45 +32,20 @@ impl From for ShardSender { } } -impl Sink for ShardSender { - type Error = ws_client::SendError; - fn poll_ready( - mut self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - self.0.poll_ready_unpin(cx) - } - fn start_send( - mut self: std::pin::Pin<&mut Self>, - item: ws_client::SentMessage, - ) -> Result<(), Self::Error> { - self.0.start_send_unpin(item) - } - fn poll_flush( - mut self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - self.0.poll_flush_unpin(cx) - } - fn poll_close( - mut self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - self.0.poll_close_unpin(cx) - } -} - impl ShardSender { /// Send JSON as a binary websocket message pub fn send_json_binary( &mut self, json: serde_json::Value, - ) -> Result<(), ws_client::SendError> { + ) -> Result<(), flume::SendError> { let bytes = serde_json::to_vec(&json).expect("valid bytes"); self.unbounded_send(ws_client::SentMessage::Binary(bytes)) } /// Send JSON as a textual websocket message - pub fn send_json_text(&mut self, json: serde_json::Value) -> Result<(), ws_client::SendError> { + pub fn send_json_text( + &mut self, + json: serde_json::Value, + ) -> Result<(), flume::SendError> { let s = serde_json::to_string(&json).expect("valid string"); self.unbounded_send(ws_client::SentMessage::Text(s)) } @@ -128,34 +103,6 @@ impl From for FeedSender { } } -impl Sink for FeedSender { - type Error = ws_client::SendError; - fn poll_ready( - mut self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - self.0.poll_ready_unpin(cx) - } - fn start_send( - mut self: std::pin::Pin<&mut Self>, - item: ws_client::SentMessage, - ) -> Result<(), Self::Error> { - self.0.start_send_unpin(item) - } - fn poll_flush( - mut self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - self.0.poll_flush_unpin(cx) - } - fn poll_close( - mut self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - self.0.poll_close_unpin(cx) - } -} - impl Deref for FeedSender { type Target = ws_client::Sender; fn deref(&self) -> &Self::Target { @@ -176,7 +123,7 @@ impl FeedSender { &self, command: S, param: S, - ) -> Result<(), ws_client::SendError> { + ) -> Result<(), flume::SendError> { self.unbounded_send(ws_client::SentMessage::Text(format!( "{}:{}", command.as_ref(),