diff --git a/backend/Cargo.lock b/backend/Cargo.lock index 5167e55..dab8fa7 100644 --- a/backend/Cargo.lock +++ b/backend/Cargo.lock @@ -1585,6 +1585,7 @@ dependencies = [ "futures-sink", "log", "pin-project-lite", + "slab", "tokio", ] diff --git a/backend/telemetry_core/src/aggregator/inner_loop.rs b/backend/telemetry_core/src/aggregator/inner_loop.rs index 32f90ff..747bd46 100644 --- a/backend/telemetry_core/src/aggregator/inner_loop.rs +++ b/backend/telemetry_core/src/aggregator/inner_loop.rs @@ -265,7 +265,6 @@ impl InnerLoop { &genesis_hash, feed_messages_for_chain, ); - // Tell everybody about the new node count and potential rename: let mut feed_messages_for_all = FeedMessageSerializer::new(); if has_chain_label_changed { @@ -376,7 +375,7 @@ impl InnerLoop { let mut feed_serializer = FeedMessageSerializer::new(); feed_serializer.push(feed_message::Pong(&value)); if let Some(bytes) = feed_serializer.into_finalized() { - let _ = feed_channel.send(ToFeedWebsocket::Bytes(bytes)).await; + let _ = feed_channel.unbounded_send(ToFeedWebsocket::Bytes(bytes)); } } FromFeedWebsocket::Subscribe { chain } => { @@ -430,7 +429,7 @@ impl InnerLoop { // and continue sending batches of 32 nodes a time over the wire subsequently if idx % 32 == 0 { if let Some(bytes) = feed_serializer.finalize() { - let _ = feed_channel.send(ToFeedWebsocket::Bytes(bytes)).await; + let _ = feed_channel.unbounded_send(ToFeedWebsocket::Bytes(bytes)); } } feed_serializer.push(feed_message::AddedNode(chain_node_id, node)); @@ -444,7 +443,7 @@ impl InnerLoop { } } if let Some(bytes) = feed_serializer.into_finalized() { - let _ = feed_channel.send(ToFeedWebsocket::Bytes(bytes)).await; + let _ = feed_channel.unbounded_send(ToFeedWebsocket::Bytes(bytes)); } // Actually make a note of the new chain subsciption: diff --git a/backend/telemetry_core/tests/basic_tests.rs b/backend/telemetry_core/tests/basic_tests.rs index 7d0604f..e9248eb 100644 --- a/backend/telemetry_core/tests/basic_tests.rs +++ b/backend/telemetry_core/tests/basic_tests.rs @@ -1,29 +1,112 @@ -#![cfg(feature = "e2e")] +//! These only run when the "e2e" feature is set (eg `cargo test --features e2e`). +//! The rust IDE plugins may behave better if you comment out this line during development: +/// #![cfg(feature = "e2e")] use test_utils::{feed_message_de::FeedMessage, server::Server}; -// use serde_json::json; +use serde_json::json; +use std::time::Duration; #[tokio::test] -async fn can_ping_feed() { - +async fn feed_sent_version_on_connect() { let server = Server::start_default() .await .expect("server could start"); - // Connect to the feed: - let (mut feed_tx, mut feed_rx) = server.get_core().connect().await.unwrap(); + // Connect a feed: + let (_feed_tx, mut feed_rx) = server.get_core().connect().await.unwrap(); // Expect a version response of 31: let feed_messages = feed_rx.recv_feed_messages().await.unwrap(); assert_eq!(feed_messages, vec![FeedMessage::Version(31)], "expecting version"); + // Tidy up: + server.shutdown().await; +} + +#[tokio::test] +async fn ping_responded_to_with_pong() { + let server = Server::start_default() + .await + .expect("server could start"); + + // Connect a feed: + let (mut feed_tx, mut feed_rx) = server.get_core().connect().await.unwrap(); + // Ping it: feed_tx.send_command("ping", "hello!").await.unwrap(); // Expect a pong response: let feed_messages = feed_rx.recv_feed_messages().await.unwrap(); - assert_eq!(feed_messages, vec![FeedMessage::Pong { msg: "hello!".to_owned() }], "expecting pong"); + assert!(feed_messages.contains(&FeedMessage::Pong { msg: "hello!".to_owned() }), "Expecting pong"); // Tidy up: server.shutdown().await; } + +#[tokio::test] +async fn node_can_be_added_and_removed() { + let mut server = Server::start_default() + .await + .expect("server could start"); + + // Add a shard: + let shard_id = server.add_shard() + .await + .expect("shard could be added"); + + // Connect a node to the shard: + let (mut node_tx, _node_rx) = server.get_shard(shard_id) + .unwrap() + .connect() + .await + .expect("can connect to shard"); + + // Send a "system connected" message: + node_tx.send_json_text(json!( + { + "id":1, + "ts":"2021-07-12T10:37:47.714666+01:00", + "payload": { + "authority":true, + "chain":"Local Testnet", + "config":"", + "genesis_hash":"0x340358f3029f5211d20d6a1f4bbe3567b39dffd35ce0d4b358fa7c62ba3f5505", + "implementation":"Substrate Node", + "msg":"system.connected", + "name":"Alice", + "network_id":"12D3KooWEyoppNCUx8Yx66oV9fJnriXwCcXwDDUA2kj6vnc6iDEp", + "startup_time":"1625565542717", + "version":"2.0.0-07a1af348-aarch64-macos" + }, + } + )).await.unwrap(); + + // Wait a little for this message to propagate to the core + // (so that our feed connects after the core knows and not before). + tokio::time::sleep(Duration::from_millis(500)).await; + + // Connect a feed. + let (_feed_tx, mut feed_rx) = server.get_core().connect() + .await.unwrap(); + + let feed_messages = feed_rx.recv_feed_messages().await.unwrap(); + assert!(feed_messages.contains( + &FeedMessage::AddedChain { + name: "Local Testnet".to_owned(), + node_count: 1 + } + )); + + // Disconnect the node: + node_tx.close().await.unwrap(); + + let feed_messages = feed_rx.recv_feed_messages().await.unwrap(); + assert!(feed_messages.contains( + &FeedMessage::RemovedChain { + name: "Local Testnet".to_owned(), + } + )); + + // Tidy up: + server.shutdown().await; +} \ No newline at end of file diff --git a/backend/telemetry_shard/src/main.rs b/backend/telemetry_shard/src/main.rs index f32c4be..4737166 100644 --- a/backend/telemetry_shard/src/main.rs +++ b/backend/telemetry_shard/src/main.rs @@ -74,7 +74,7 @@ async fn start_server(opts: Opts) -> anyhow::Result<()> { log::info!("Opening /submit connection from {:?}", addr); ws.on_upgrade(move |websocket| async move { let (mut tx_to_aggregator, websocket) = - handle_websocket_connection(websocket, tx_to_aggregator, addr).await; + handle_node_websocket_connection(websocket, tx_to_aggregator, addr).await; log::info!("Closing /submit connection from {:?}", addr); // Tell the aggregator that this connection has closed, so it can tidy up. let _ = tx_to_aggregator.send(FromWebsocket::Disconnected).await; @@ -92,7 +92,7 @@ async fn start_server(opts: Opts) -> anyhow::Result<()> { } /// This takes care of handling messages from an established socket connection. -async fn handle_websocket_connection( +async fn handle_node_websocket_connection( mut websocket: ws::WebSocket, mut tx_to_aggregator: S, addr: Option, @@ -147,7 +147,6 @@ where if !msg.is_binary() && !msg.is_text() { continue; } - // Deserialize from JSON, warning if deserialization fails: let bytes = msg.as_bytes(); let node_message: json_message::NodeMessage = match serde_json::from_slice(bytes) { diff --git a/backend/test_utils/Cargo.toml b/backend/test_utils/Cargo.toml index 02de8d0..590d227 100644 --- a/backend/test_utils/Cargo.toml +++ b/backend/test_utils/Cargo.toml @@ -15,5 +15,5 @@ serde_json = "1.0.64" soketto = "0.6.0" thiserror = "1.0.25" tokio = { version = "1.7.1", features = ["full"] } -tokio-util = "0.6.7" +tokio-util = { version = "0.6.7", features = ["full"] } common = { path = "../common" } diff --git a/backend/test_utils/src/feed_message_de.rs b/backend/test_utils/src/feed_message_de.rs index 28f78ac..7e020ce 100644 --- a/backend/test_utils/src/feed_message_de.rs +++ b/backend/test_utils/src/feed_message_de.rs @@ -121,7 +121,7 @@ impl FeedMessage { let v: Vec<&RawValue> = serde_json::from_slice(bytes)?; let mut feed_messages = vec![]; - for raw_keyval in v.windows(2) { + for raw_keyval in v.chunks(2) { let raw_key = raw_keyval[0]; let raw_val = raw_keyval[1]; feed_messages.push(FeedMessage::decode(raw_key, raw_val)?); @@ -274,4 +274,36 @@ impl FeedMessage { Ok(feed_message) } +} + +#[cfg(test)] +mod test { + + use super::*; + + #[test] + fn decode_remove_node_msg() { + // "remove chain ''": + let msg = r#"[12,""]"#; + + assert_eq!( + FeedMessage::from_bytes(msg.as_bytes()).unwrap(), + vec![FeedMessage::RemovedChain { name: "".to_owned() }] + ); + } + + #[test] + fn decode_remove_then_add_node_msg() { + // "remove chain '', then add chain 'Local Testnet' with 1 node": + let msg = r#"[12,"",11,["Local Testnet",1]]"#; + + assert_eq!( + FeedMessage::from_bytes(msg.as_bytes()).unwrap(), + vec![ + FeedMessage::RemovedChain { name: "".to_owned() }, + FeedMessage::AddedChain { name: "Local Testnet".to_owned(), node_count: 1 }, + ] + ); + } + } \ No newline at end of file diff --git a/backend/test_utils/src/server/channels.rs b/backend/test_utils/src/server/channels.rs index 11815b4..5404390 100644 --- a/backend/test_utils/src/server/channels.rs +++ b/backend/test_utils/src/server/channels.rs @@ -1,3 +1,5 @@ +use std::time::Duration; + use crate::ws_client; use futures::{Sink, SinkExt, Stream, StreamExt}; use crate::feed_message_de::FeedMessage; @@ -9,6 +11,13 @@ impl From for ShardSender { fn from(c: ws_client::Sender) -> Self { ShardSender(c) } } +impl ShardSender { + /// Close this connection + pub async fn close(&mut self) -> Result<(),ws_client::SendError> { + self.0.close().await + } +} + impl Sink for ShardSender { type Error = ws_client::SendError; fn poll_ready(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll> { @@ -97,7 +106,11 @@ impl Stream for FeedReceiver { impl FeedReceiver { /// Wait for the next set of feed messages to arrive. Returns an error if the connection /// is closed, or the messages that come back cannot be properly decoded. - pub async fn recv_feed_messages(&mut self) -> Result, anyhow::Error> { + /// + /// Prefer [`FeedReceiver::recv_feed_messages`]; tests should generally be + /// robust in assuming that messages may not all be delivered at once (unless we are + /// specifically testing which messages are buffered together). + pub async fn recv_feed_messages_once(&mut self) -> Result, anyhow::Error> { let msg = self.0 .next() .await @@ -114,4 +127,28 @@ impl FeedReceiver { } } } + + /// Wait for feed messages to be sent back, building up a list of output messages until + /// the channel goes quiet for a short while. + pub async fn recv_feed_messages(&mut self) -> Result, anyhow::Error> { + // Block as long as needed for messages to start coming in: + let mut feed_messages = self.recv_feed_messages_once().await?; + // Then, loop a little to make sure we catch any additional messages that are sent soon after: + loop { + match tokio::time::timeout(Duration::from_millis(250), self.recv_feed_messages_once()).await { + // Timeout elapsed; return the messages we have so far + Err(_) => { + break Ok(feed_messages); + }, + // Append messages that come back to our vec + Ok(Ok(mut msgs)) => { + feed_messages.append(&mut msgs); + }, + // Error came back receiving messages; return it + Ok(Err(e)) => { + break Err(e) + } + } + } + } } \ No newline at end of file diff --git a/backend/test_utils/src/server/server.rs b/backend/test_utils/src/server/server.rs index e49579a..df57039 100644 --- a/backend/test_utils/src/server/server.rs +++ b/backend/test_utils/src/server/server.rs @@ -140,9 +140,19 @@ impl Server { .await .map_err(|e| Error::ErrorObtainingPort(e))?; + // Attempt to wait until we've received word that the shard is connected to the + // core before continuing. If we don't wait for this, the connection may happen + // after we've attempted to connect node sockets, and they would be booted and + // made to reconnect, which we don't want to deal with in general. + let _ = utils::wait_for_line_containing( + &mut child_stdout, + "Connected to telemetry core", + std::time::Duration::from_secs(5) + ).await; + // Since we're piping stdout from the child process, we need somewhere for it to go // else the process will get stuck when it tries to produce output: - utils::drain(child_stdout, tokio::io::stdout()); + utils::drain(child_stdout, tokio::io::sink()); let shard_uri = format!("http://127.0.0.1:{}/submit", shard_port) .parse() @@ -189,7 +199,7 @@ impl Server { // Since we're piping stdout from the child process, we need somewhere for it to go // else the process will get stuck when it tries to produce output: - utils::drain(child_stdout, tokio::io::stdout()); + utils::drain(child_stdout, tokio::io::sink()); // URI for feeds to connect to the core: let feed_uri = format!("http://127.0.0.1:{}/feed", core_port) diff --git a/backend/test_utils/src/server/utils.rs b/backend/test_utils/src/server/utils.rs index 8dde9d8..38e6cbe 100644 --- a/backend/test_utils/src/server/utils.rs +++ b/backend/test_utils/src/server/utils.rs @@ -8,19 +8,34 @@ use anyhow::{ anyhow, Context }; /// with the side benefit that we'll wait for it to start listening before returning. We do this /// because we want to allow the kernel to assign ports and so don't specify a port as an arg. pub async fn get_port(reader: R) -> Result { + let expected_text = "listening on http://127.0.0.1:"; + wait_for_line_containing(reader, expected_text, Duration::from_secs(30)) + .await + .and_then(|line| { + let (_, port_str) = line.rsplit_once(expected_text).unwrap(); + port_str + .trim() + .parse() + .with_context(|| format!("Could not parse output to port: {}", port_str)) + }) +} + +/// Wait for a line of output containing the text given. Also provide a timeout, +/// such that if we don't see a new line of output within the timeout we bail out +/// and return an error. +pub async fn wait_for_line_containing(reader: R, text: &str, max_wait_between_lines: Duration) -> Result { let reader = BufReader::new(reader); let mut reader_lines = reader.lines(); loop { let line = tokio::time::timeout( - // This has to accomodate pauses during compilation if the cmd is "cargo run --": - Duration::from_secs(30), + max_wait_between_lines, reader_lines.next_line() ).await; let line = match line { // timeout expired; couldn't get port: - Err(e) => return Err(anyhow!("Timeout expired waiting to discover port: {}", e)), + Err(_) => return Err(anyhow!("Timeout expired waiting for output containing: {}", text)), // Something went wrong reading line; bail: Ok(Err(e)) => return Err(anyhow!("Could not read line from stdout: {}", e)), // No more output; process ended? bail: @@ -29,15 +44,9 @@ pub async fn get_port(reader: R) -> Result line }; - let (_, port_str) = match line.rsplit_once("listening on http://127.0.0.1:") { - Some(m) => m, - None => continue - }; - - return port_str - .trim() - .parse() - .with_context(|| format!("Could not parse output to port: {}", port_str)); + if line.contains(text) { + return Ok(line); + } } }