diff --git a/backend/telemetry_core/benches/throughput.rs b/backend/telemetry_core/benches/throughput.rs index 27691fa..7c98af4 100644 --- a/backend/telemetry_core/benches/throughput.rs +++ b/backend/telemetry_core/benches/throughput.rs @@ -1,3 +1,6 @@ +use std::iter::FromIterator; + +use futures::StreamExt; use test_utils::workspace::start_server_release; use criterion::{criterion_group, criterion_main, Criterion}; use tokio::runtime::Runtime; @@ -39,27 +42,41 @@ pub fn benchmark_throughput_single_shard(c: &mut Criterion) { } })).await.unwrap(); } - +tokio::time::sleep(std::time::Duration::from_millis(500)).await; // Start 1000 feeds: let mut feeds = server .get_core() - .connect_multiple(1000) + .connect_multiple(1) .await .expect("feeds can connect"); - // Subscribe all feeds to the chain: - for (feed_tx, _) in feeds.iter_mut() { - feed_tx.send_command("subscribe", "Local Testnet").await.unwrap(); - } + // // Subscribe all feeds to the chain: + // for (feed_tx, _) in feeds.iter_mut() { + // feed_tx.send_command("subscribe", "Local Testnet").await.unwrap(); + // } - // Consume any messages feeds have received so far: - let feed_consumers = feeds - .iter_mut() - .map(|(_,rx)| rx.recv_feed_messages()); - futures::future::join_all(feed_consumers).await; +println!("consuming feed"); +{ - tokio::time::sleep(std::time::Duration::from_secs(100)).await; + let mut msgs = futures::stream::FuturesUnordered::from_iter( + feeds + .iter_mut() + .map(|(_,rx)| rx.recv_feed_messages()) + ); + let mut n = 0; + while let Some(Ok(msg)) = msgs.next().await { + n += 1; + println!("Message {}: {:?}", n, msg); + } +} + + // // Consume any messages feeds have received so far (every feed should havea few at least): + // let feed_consumers = feeds + // .iter_mut() + // .map(|(_,rx)| rx.next()); + // futures::future::join_all(feed_consumers).await; +println!("feed consumed"); (nodes, feeds) }); diff --git a/backend/telemetry_core/src/aggregator/aggregator.rs b/backend/telemetry_core/src/aggregator/aggregator.rs index e04a419..7e9a4e1 100644 --- a/backend/telemetry_core/src/aggregator/aggregator.rs +++ b/backend/telemetry_core/src/aggregator/aggregator.rs @@ -28,13 +28,13 @@ struct AggregatorInternal { /// Send messages in to the aggregator from the outside via this. This is /// stored here so that anybody holding an `Aggregator` handle can /// make use of it. - tx_to_aggregator: mpsc::Sender, + tx_to_aggregator: mpsc::UnboundedSender, } impl Aggregator { /// Spawn a new Aggregator. This connects to the telemetry backend pub async fn spawn(denylist: Vec) -> anyhow::Result { - let (tx_to_aggregator, rx_from_external) = mpsc::channel(10); + let (tx_to_aggregator, rx_from_external) = mpsc::unbounded(); // Kick off a locator task to locate nodes, which hands back a channel to make location requests let tx_to_locator = find_location(tx_to_aggregator.clone().with(|(node_id, msg)| { @@ -62,7 +62,7 @@ impl Aggregator { // in to the aggregator. If nobody is tolding the tx side of the channel // any more, this task will gracefully end. async fn handle_messages( - rx_from_external: mpsc::Receiver, + rx_from_external: mpsc::UnboundedReceiver, tx_to_aggregator: mpsc::UnboundedSender<(NodeId, Ipv4Addr)>, denylist: Vec, ) { diff --git a/backend/telemetry_core/src/aggregator/inner_loop.rs b/backend/telemetry_core/src/aggregator/inner_loop.rs index f645ceb..b3b7786 100644 --- a/backend/telemetry_core/src/aggregator/inner_loop.rs +++ b/backend/telemetry_core/src/aggregator/inner_loop.rs @@ -10,7 +10,7 @@ use common::{ time, }; use futures::channel::mpsc; -use futures::{SinkExt, StreamExt}; +use futures::StreamExt; use std::collections::{HashMap, HashSet}; use std::{ net::{IpAddr, Ipv4Addr}, @@ -31,7 +31,7 @@ pub enum FromShardWebsocket { /// When the socket is opened, it'll send this first /// so that we have a way to communicate back to it. Initialize { - channel: mpsc::Sender, + channel: mpsc::UnboundedSender, }, /// Tell the aggregator about a new node. Add { @@ -112,7 +112,7 @@ pub enum ToFeedWebsocket { /// outgoing messages in the main aggregator loop. pub struct InnerLoop { /// Messages from the outside world come into this: - rx_from_external: mpsc::Receiver, + rx_from_external: mpsc::UnboundedReceiver, /// The state of our chains and nodes lives here: node_state: State, @@ -123,7 +123,7 @@ pub struct InnerLoop { /// Keep track of how to send messages out to feeds. feed_channels: HashMap>, /// Keep track of how to send messages out to shards. - shard_channels: HashMap>, + shard_channels: HashMap>, /// Which chain is a feed subscribed to? /// Feed Connection ID -> Chain Genesis Hash @@ -142,7 +142,7 @@ pub struct InnerLoop { impl InnerLoop { /// Create a new inner loop handler with the various state it needs. pub fn new( - rx_from_external: mpsc::Receiver, + rx_from_external: mpsc::UnboundedReceiver, tx_to_locator: mpsc::UnboundedSender<(NodeId, Ipv4Addr)>, denylist: Vec, ) -> Self { @@ -159,25 +159,27 @@ impl InnerLoop { } } - /// Start handling and responding to incoming messages. + /// Start handling and responding to incoming messages. Owing to unbounded channels, we actually + /// only have a single `.await` (in this function). This helps to make it clear that the aggregator loop + /// will be able to make progress quickly without any potential yield points. pub async fn handle(mut self) { while let Some(msg) = self.rx_from_external.next().await { match msg { ToAggregator::FromFeedWebsocket(feed_conn_id, msg) => { - self.handle_from_feed(feed_conn_id, msg).await + self.handle_from_feed(feed_conn_id, msg) } ToAggregator::FromShardWebsocket(shard_conn_id, msg) => { - self.handle_from_shard(shard_conn_id, msg).await + self.handle_from_shard(shard_conn_id, msg) } ToAggregator::FromFindLocation(node_id, location) => { - self.handle_from_find_location(node_id, location).await + self.handle_from_find_location(node_id, location) } } } } /// Handle messages that come from the node geographical locator. - async fn handle_from_find_location( + fn handle_from_find_location( &mut self, node_id: NodeId, location: find_location::Location, @@ -209,7 +211,7 @@ impl InnerLoop { } /// Handle messages coming from shards. - async fn handle_from_shard(&mut self, shard_conn_id: ConnId, msg: FromShardWebsocket) { + fn handle_from_shard(&mut self, shard_conn_id: ConnId, msg: FromShardWebsocket) { log::debug!("Message from shard ({:?}): {:?}", shard_conn_id, msg); match msg { @@ -226,21 +228,19 @@ impl InnerLoop { state::AddNodeResult::ChainOnDenyList => { if let Some(shard_conn) = self.shard_channels.get_mut(&shard_conn_id) { let _ = shard_conn - .send(ToShardWebsocket::Mute { + .unbounded_send(ToShardWebsocket::Mute { local_id, reason: MuteReason::ChainNotAllowed, - }) - .await; + }); } } state::AddNodeResult::ChainOverQuota => { if let Some(shard_conn) = self.shard_channels.get_mut(&shard_conn_id) { let _ = shard_conn - .send(ToShardWebsocket::Mute { + .unbounded_send(ToShardWebsocket::Mute { local_id, reason: MuteReason::Overquota, - }) - .await; + }); } } state::AddNodeResult::NodeAddedToChain(details) => { @@ -295,7 +295,7 @@ impl InnerLoop { return; } }; - self.remove_nodes_and_broadcast_result(Some(node_id)).await; + self.remove_nodes_and_broadcast_result(Some(node_id)); } FromShardWebsocket::Update { local_id, payload } => { let node_id = match self.node_ids.get_by_right(&(shard_conn_id, local_id)) { @@ -340,17 +340,16 @@ impl InnerLoop { .collect(); // ... and remove them: - self.remove_nodes_and_broadcast_result(node_ids_to_remove) - .await; + self.remove_nodes_and_broadcast_result(node_ids_to_remove); } } } /// Handle messages coming from feeds. - async fn handle_from_feed(&mut self, feed_conn_id: ConnId, msg: FromFeedWebsocket) { + fn handle_from_feed(&mut self, feed_conn_id: ConnId, msg: FromFeedWebsocket) { log::debug!("Message from feed ({:?}): {:?}", feed_conn_id, msg); match msg { - FromFeedWebsocket::Initialize { mut channel } => { + FromFeedWebsocket::Initialize { channel } => { self.feed_channels.insert(feed_conn_id, channel.clone()); // Tell the new feed subscription some basic things to get it going: @@ -363,7 +362,7 @@ impl InnerLoop { // Send this to the channel that subscribed: if let Some(bytes) = feed_serializer.into_finalized() { - let _ = channel.send(ToFeedWebsocket::Bytes(bytes)).await; + let _ = channel.unbounded_send(ToFeedWebsocket::Bytes(bytes)); } } FromFeedWebsocket::Ping { value } => { @@ -474,7 +473,7 @@ impl InnerLoop { } /// Remove all of the node IDs provided and broadcast messages to feeds as needed. - async fn remove_nodes_and_broadcast_result( + fn remove_nodes_and_broadcast_result( &mut self, node_ids: impl IntoIterator, ) { diff --git a/backend/telemetry_core/src/main.rs b/backend/telemetry_core/src/main.rs index aff1b1f..bf45160 100644 --- a/backend/telemetry_core/src/main.rs +++ b/backend/telemetry_core/src/main.rs @@ -119,7 +119,7 @@ async fn handle_shard_websocket_connection( where S: futures::Sink + Unpin, { - let (tx_to_shard_conn, mut rx_from_aggregator) = mpsc::channel(10); + let (tx_to_shard_conn, mut rx_from_aggregator) = mpsc::unbounded(); // Tell the aggregator about this new connection, and give it a way to send messages to us: let init_msg = FromShardWebsocket::Initialize { diff --git a/backend/telemetry_core/tests/e2e_tests.rs b/backend/telemetry_core/tests/e2e_tests.rs index 6b1178b..5678deb 100644 --- a/backend/telemetry_core/tests/e2e_tests.rs +++ b/backend/telemetry_core/tests/e2e_tests.rs @@ -1,12 +1,15 @@ use common::node_types::BlockHash; +use futures::StreamExt; use serde_json::json; -use std::time::Duration; +use std::{iter::FromIterator, time::Duration}; use test_utils::{ assert_contains_matches, feed_message_de::{FeedMessage, NodeDetails}, workspace::start_server_debug }; +/// The simplest test we can run; the main benefit of this test (since we check similar) +/// below) is just to give a feel for _how_ we can test basic feed related things. #[tokio::test] async fn feed_sent_version_on_connect() { let server = start_server_debug().await; @@ -26,6 +29,8 @@ async fn feed_sent_version_on_connect() { server.shutdown().await; } +/// Another very simple test: pings from feeds should be responded to by pongs +/// with the same message content. #[tokio::test] async fn feed_ping_responded_to_with_pong() { let server = start_server_debug().await; @@ -49,6 +54,109 @@ async fn feed_ping_responded_to_with_pong() { server.shutdown().await; } + +/// As a prelude to `lots_of_mute_messages_dont_cause_a_deadlock`, we can check that +/// a lot of nodes can simultaneously subscribe and are all sent the expected response. +#[tokio::test] +async fn multiple_feeds_sent_version_on_connect() { + let server = start_server_debug().await; + + // Connect a bunch of feeds: + let mut feeds = server + .get_core() + .connect_multiple(1000) + .await + .unwrap(); + + // Wait for responses all at once: + let responses = futures::future::join_all( + feeds.iter_mut() + .map(|(_, rx)| rx.recv_feed_messages()) + ); + + let responses = tokio::time::timeout(Duration::from_secs(10), responses) + .await + .expect("we shouldn't hit a timeout waiting for responses"); + + // Expect a version response of 31 to all of them: + for feed_messages in responses { + assert_eq!( + feed_messages.expect("should have messages"), + vec![FeedMessage::Version(31)], + "expecting version" + ); + } + + // Tidy up: + server.shutdown().await; +} + +/// When a lot (> ~700 in this case) of nodes are added, the chain becomes overquota. +/// this leads to a load of messages being sent back to the shard. If bounded channels +/// are used to send messages back to the shard, it's possible that we get into a situation +/// where the shard is waiting trying to send the next "add node" message, while the +/// telemetry core is waiting trying to send up to the shard the next "mute node" message, +/// resulting in a deadlock. This test gives confidence that we don't run into such a deadlock. +#[tokio::test] +async fn lots_of_mute_messages_dont_cause_a_deadlock() { + let mut server = start_server_debug().await; + let shard_id = server.add_shard().await.unwrap(); + + // Connect 1000 nodes to the shard: + let mut nodes = server + .get_shard(shard_id) + .unwrap() + .connect_multiple(2000) // 1500 of these will be overquota. + .await + .expect("nodes can connect"); + + // Every node announces itself on the same chain: + for (idx, (node_tx, _)) in nodes.iter_mut().enumerate() { + node_tx.send_json_text(json!({ + "id":1, // message ID, not node ID. Can be the same for all. + "ts":"2021-07-12T10:37:47.714666+01:00", + "payload": { + "authority":true, + "chain":"Local Testnet", + "config":"", + "genesis_hash": BlockHash::from_low_u64_ne(1), + "implementation":"Substrate Node", + "msg":"system.connected", + "name": format!("Alice {}", idx), + "network_id":"12D3KooWEyoppNCUx8Yx66oV9fJnriXwCcXwDDUA2kj6vnc6iDEp", + "startup_time":"1625565542717", + "version":"2.0.0-07a1af348-aarch64-macos" + } + })).await.unwrap(); + } + + // Wait a little time (just to let everything get deadlocked) before + // trying to have the aggregator send out feed messages. + tokio::time::sleep(std::time::Duration::from_millis(500)).await; + + // Start a bunch of feeds. If deadlock has happened, none of them will + // receive any messages back. + let mut feeds = server + .get_core() + .connect_multiple(1) + .await + .expect("feeds can connect"); + + // Wait to see whether we get anything back: + let msgs_fut = futures::future::join_all( + feeds + .iter_mut() + .map(|(_,rx)| rx.recv_feed_messages()) + ); + + // Give up after a timeout: + tokio::time::timeout(Duration::from_secs(10), msgs_fut) + .await + .expect("should not hit timeout waiting for messages (deadlock has happened)"); +} + +/// If a node is added, a connecting feed should be told about the new chain. +/// If the node is removed, the feed should be told that the chain has gone. #[tokio::test] async fn feed_add_and_remove_node() { // Connect server and add shard @@ -111,6 +219,90 @@ async fn feed_add_and_remove_node() { server.shutdown().await; } +/// If nodes connect and the chain name changes, feeds will be told about this +/// and will keep receiving messages about the renamed chain (despite subscribing +/// to it by name). +#[tokio::test] +async fn feeds_told_about_chain_rename_and_stay_subscribed() { + // Connect a node: + let mut server = start_server_debug().await; + let shard_id = server.add_shard().await.unwrap(); + let (mut node_tx, _node_rx) = server + .get_shard(shard_id) + .unwrap() + .connect() + .await + .expect("can connect to shard"); + + let node_init_msg = |id, chain_name: &str, node_name: &str| json!({ + "id":id, + "ts":"2021-07-12T10:37:47.714666+01:00", + "payload": { + "authority":true, + "chain": chain_name, + "config":"", + "genesis_hash": BlockHash::from_low_u64_ne(1), + "implementation":"Substrate Node", + "msg":"system.connected", + "name": node_name, + "network_id":"12D3KooWEyoppNCUx8Yx66oV9fJnriXwCcXwDDUA2kj6vnc6iDEp", + "startup_time":"1625565542717", + "version":"2.0.0-07a1af348-aarch64-macos" + }, + }); + + // Subscribe a chain: + node_tx.send_json_text(node_init_msg(1, "Initial chain name", "Node 1")).await.unwrap(); + + // Connect a feed and subscribe to the above chain: + let (mut feed_tx, mut feed_rx) = server.get_core().connect().await.unwrap(); + feed_tx.send_command("subscribe", "Initial chain name").await.unwrap(); + + // Feed is told about the chain, and the node on this chain: + let feed_messages = feed_rx.recv_feed_messages().await.unwrap(); + assert_contains_matches!( + feed_messages, + FeedMessage::AddedChain { name, node_count: 1 } if name == "Initial chain name", + FeedMessage::SubscribedTo { name } if name == "Initial chain name", + FeedMessage::AddedNode { node: NodeDetails { name: node_name, .. }, ..} if node_name == "Node 1", + ); + + // Subscribe another node. The chain doesn't rename yet but we are told about the new node + // count and the node that's been added. + node_tx.send_json_text(node_init_msg(2, "New chain name", "Node 2")).await.unwrap(); + let feed_messages = feed_rx.recv_feed_messages().await.unwrap(); + assert_contains_matches!( + feed_messages, + FeedMessage::AddedNode { node: NodeDetails { name: node_name, .. }, ..} if node_name == "Node 2", + FeedMessage::AddedChain { name, node_count: 2 } if name == "Initial chain name", + ); + + // Subscribe a third node. The chain renames, so we're told about the new node but also + // about the chain rename. + node_tx.send_json_text(node_init_msg(3, "New chain name", "Node 3")).await.unwrap(); + let feed_messages = feed_rx.recv_feed_messages().await.unwrap(); + assert_contains_matches!( + feed_messages, + FeedMessage::AddedNode { node: NodeDetails { name: node_name, .. }, ..} if node_name == "Node 3", + FeedMessage::RemovedChain { name } if name == "Initial chain name", + FeedMessage::AddedChain { name, node_count: 3 } if name == "New chain name", + ); + + // Just to be sure, subscribing a fourth node on this chain will still lead to updates + // to this feed. + node_tx.send_json_text(node_init_msg(4, "New chain name", "Node 4")).await.unwrap(); + let feed_messages = feed_rx.recv_feed_messages().await.unwrap(); + assert_contains_matches!( + feed_messages, + FeedMessage::AddedNode { node: NodeDetails { name: node_name, .. }, ..} if node_name == "Node 4", + FeedMessage::AddedChain { name, node_count: 4 } if name == "New chain name", + ); + +} + +/// If we add a couple of shards and a node for each, all feeds should be +/// told about both node chains. If one shard goes away, we should get a +/// "removed chain" message only for the node connected to that shard. #[tokio::test] async fn feed_add_and_remove_shard() { let mut server = start_server_debug().await; @@ -130,24 +322,22 @@ async fn feed_add_and_remove_shard() { // Send a "system connected" message: node_tx - .send_json_text(json!( - { - "id":id, - "ts":"2021-07-12T10:37:47.714666+01:00", - "payload": { - "authority":true, - "chain": format!("Local Testnet {}", id), - "config":"", - "genesis_hash": BlockHash::from_low_u64_ne(id), - "implementation":"Substrate Node", - "msg":"system.connected", - "name":"Alice", - "network_id":"12D3KooWEyoppNCUx8Yx66oV9fJnriXwCcXwDDUA2kj6vnc6iDEp", - "startup_time":"1625565542717", - "version":"2.0.0-07a1af348-aarch64-macos" - }, - } - )) + .send_json_text(json!({ + "id":id, + "ts":"2021-07-12T10:37:47.714666+01:00", + "payload": { + "authority":true, + "chain": format!("Local Testnet {}", id), + "config":"", + "genesis_hash": BlockHash::from_low_u64_ne(id), + "implementation":"Substrate Node", + "msg":"system.connected", + "name":"Alice", + "network_id":"12D3KooWEyoppNCUx8Yx66oV9fJnriXwCcXwDDUA2kj6vnc6iDEp", + "startup_time":"1625565542717", + "version":"2.0.0-07a1af348-aarch64-macos" + }, + })) .await .unwrap(); @@ -188,6 +378,8 @@ async fn feed_add_and_remove_shard() { server.shutdown().await; } +/// feeds can subscribe to one chain at a time. They should get the relevant +/// messages for that chain and no other. #[tokio::test] async fn feed_can_subscribe_and_unsubscribe_from_chain() { use FeedMessage::*; diff --git a/backend/telemetry_shard/src/aggregator.rs b/backend/telemetry_shard/src/aggregator.rs index b23e899..e6731f9 100644 --- a/backend/telemetry_shard/src/aggregator.rs +++ b/backend/telemetry_shard/src/aggregator.rs @@ -145,7 +145,7 @@ impl Aggregator { for (_, mut closer) in closers { // if this fails, it probably means the connection has died already anyway. - let _ = closer.send(()); + let _ = closer.send(()).await; } // We've told everything to disconnect. Now, reset our state: diff --git a/backend/test_utils/src/feed_message_de.rs b/backend/test_utils/src/feed_message_de.rs index 930e0eb..08e28cb 100644 --- a/backend/test_utils/src/feed_message_de.rs +++ b/backend/test_utils/src/feed_message_de.rs @@ -1,3 +1,4 @@ +use anyhow::Context; use common::node_types::{ BlockDetails, BlockHash, BlockNumber, NodeLocation, NodeStats, Timestamp, }; @@ -22,7 +23,7 @@ pub enum FeedMessage { // io: NodeIO, // can't losslessly deserialize // hardware: NodeHardware, // can't losslessly deserialize block_details: BlockDetails, - location: NodeLocation, + location: Option, startup_time: Option, }, RemovedNode { @@ -127,15 +128,18 @@ impl FeedMessage { for raw_keyval in v.chunks(2) { let raw_key = raw_keyval[0]; let raw_val = raw_keyval[1]; - feed_messages.push(FeedMessage::decode(raw_key, raw_val)?); + let action: u8 = serde_json::from_str(raw_key.get())?; + let msg = FeedMessage::decode(action, raw_val) + .with_context(|| format!("Failed to decode message with action {}", action))?; + + feed_messages.push(msg); } Ok(feed_messages) } // Deserialize the feed message to a value based on the "action" key - fn decode(raw_key: &RawValue, raw_val: &RawValue) -> Result { - let action: u8 = serde_json::from_str(raw_key.get())?; + fn decode(action: u8, raw_val: &RawValue) -> Result { let feed_message = match action { // Version: 0 => { diff --git a/backend/test_utils/src/server/utils.rs b/backend/test_utils/src/server/utils.rs index ed22991..0a8e2bf 100644 --- a/backend/test_utils/src/server/utils.rs +++ b/backend/test_utils/src/server/utils.rs @@ -65,15 +65,15 @@ pub async fn connect_multiple_to_uri( uri: &http::Uri, num_connections: usize, ) -> Result, ws_client::ConnectError> { - - // Batch connection establishing to groups of 100 at a time; I found while benchmarking that - // I'd run into "connection reset by peer" issues trying to establish more at once. - let connect_futs = (0..num_connections).map(|_| ws_client::connect(uri)); - let sockets: Result, _> = futures::future::join_all(connect_futs) - .await - .into_iter() - .collect(); - sockets + // Previous versions of this used future::join_all to concurrently establish all of the + // connections we want. However, trying to do that with more than say ~130 connections on + // MacOS led to hitting "Connection reset by peer" errors, so let's do it one-at-a-time. + // (Side note: on Ubuntu the concurrency seemed to be no issue up to at least 10k connections). + let mut sockets = vec![]; + for _ in 0..num_connections { + sockets.push(ws_client::connect(uri).await?); + } + Ok(sockets) } /// Drain output from a reader to stdout. After acquiring port details from spawned processes,