diff --git a/backend/telemetry_core/tests/basic_tests.rs b/backend/telemetry_core/tests/basic_tests.rs index e9248eb..90cfec5 100644 --- a/backend/telemetry_core/tests/basic_tests.rs +++ b/backend/telemetry_core/tests/basic_tests.rs @@ -1,10 +1,11 @@ //! These only run when the "e2e" feature is set (eg `cargo test --features e2e`). //! The rust IDE plugins may behave better if you comment out this line during development: -/// #![cfg(feature = "e2e")] +#![cfg(feature = "e2e")] -use test_utils::{feed_message_de::FeedMessage, server::Server}; +use test_utils::{feed_message_de::{ FeedMessage, NodeDetails }, server::Server, assert_contains_matches}; use serde_json::json; use std::time::Duration; +use common::node_types::{ BlockHash }; #[tokio::test] async fn feed_sent_version_on_connect() { @@ -24,7 +25,7 @@ async fn feed_sent_version_on_connect() { } #[tokio::test] -async fn ping_responded_to_with_pong() { +async fn feed_ping_responded_to_with_pong() { let server = Server::start_default() .await .expect("server could start"); @@ -44,15 +45,10 @@ async fn ping_responded_to_with_pong() { } #[tokio::test] -async fn node_can_be_added_and_removed() { - let mut server = Server::start_default() - .await - .expect("server could start"); - - // Add a shard: - let shard_id = server.add_shard() - .await - .expect("shard could be added"); +async fn feed_add_and_remove_node() { + // Connect server and add shard + let mut server = Server::start_default().await.unwrap(); + let shard_id = server.add_shard().await.unwrap(); // Connect a node to the shard: let (mut node_tx, _node_rx) = server.get_shard(shard_id) @@ -70,7 +66,7 @@ async fn node_can_be_added_and_removed() { "authority":true, "chain":"Local Testnet", "config":"", - "genesis_hash":"0x340358f3029f5211d20d6a1f4bbe3567b39dffd35ce0d4b358fa7c62ba3f5505", + "genesis_hash": BlockHash::from_low_u64_ne(1), "implementation":"Substrate Node", "msg":"system.connected", "name":"Alice", @@ -107,6 +103,184 @@ async fn node_can_be_added_and_removed() { } )); + // Tidy up: + server.shutdown().await; +} + +#[tokio::test] +async fn feed_add_and_remove_shard() { + let mut server = Server::start_default() + .await + .expect("server could start"); + + let mut shards = vec![]; + for id in 1 ..= 2 { + // Add a shard: + let shard_id = server.add_shard().await.unwrap(); + + // Connect a node to it: + let (mut node_tx, _node_rx) = server.get_shard(shard_id) + .unwrap() + .connect() + .await + .expect("can connect to shard"); + + // Send a "system connected" message: + node_tx.send_json_text(json!( + { + "id":id, + "ts":"2021-07-12T10:37:47.714666+01:00", + "payload": { + "authority":true, + "chain": format!("Local Testnet {}", id), + "config":"", + "genesis_hash": BlockHash::from_low_u64_ne(id), + "implementation":"Substrate Node", + "msg":"system.connected", + "name":"Alice", + "network_id":"12D3KooWEyoppNCUx8Yx66oV9fJnriXwCcXwDDUA2kj6vnc6iDEp", + "startup_time":"1625565542717", + "version":"2.0.0-07a1af348-aarch64-macos" + }, + } + )).await.unwrap(); + + // Keep what we need to to keep connection alive and let us kill a shard: + shards.push((shard_id, node_tx)); + } + + let shard1 = shards.remove(0); + + // Connect a feed. + let (_feed_tx, mut feed_rx) = server.get_core().connect().await.unwrap(); + + // The feed should be told about both of the chains that we've sent info about: + let feed_messages = feed_rx.recv_feed_messages().await.unwrap(); + assert!(feed_messages.contains( + &FeedMessage::AddedChain { + name: "Local Testnet 1".to_owned(), + node_count: 1 + } + )); + assert!(feed_messages.contains( + &FeedMessage::AddedChain { + name: "Local Testnet 2".to_owned(), + node_count: 1 + } + )); + + // Disconnect the first shard: + server.kill_shard(shard1.0).await; + + // We should be told about the node connected to that shard disconnecting: + let feed_messages = feed_rx.recv_feed_messages().await.unwrap(); + assert!(feed_messages.contains( + &FeedMessage::RemovedChain { + name: "Local Testnet 1".to_owned(), + } + )); + assert!(!feed_messages.contains( + &FeedMessage::RemovedChain { + name: "Local Testnet 2".to_owned(), + } + )); + + // Tidy up: + server.shutdown().await; +} + +#[tokio::test] +async fn feed_can_subscribe_and_unsubscribe_from_chain() { + use FeedMessage::*; + + // Start server, add shard, connect node: + let mut server = Server::start_default().await.unwrap(); + let shard_id = server.add_shard().await.unwrap(); + let (mut node_tx, _node_rx) = server.get_shard(shard_id).unwrap().connect().await.unwrap(); + + // Send a "system connected" message for a few nodes/chains: + for id in 1..=3 { + node_tx.send_json_text(json!( + { + "id":id, + "ts":"2021-07-12T10:37:47.714666+01:00", + "payload": { + "authority":true, + "chain":format!("Local Testnet {}", id), + "config":"", + "genesis_hash": BlockHash::from_low_u64_ne(id), + "implementation":"Substrate Node", + "msg":"system.connected", + "name":format!("Alice {}", id), + "network_id":"12D3KooWEyoppNCUx8Yx66oV9fJnriXwCcXwDDUA2kj6vnc6iDEp", + "startup_time":"1625565542717", + "version":"2.0.0-07a1af348-aarch64-macos" + }, + } + )).await.unwrap(); + } + + // Connect a feed + let (mut feed_tx, mut feed_rx) = server.get_core().connect().await.unwrap(); + + let feed_messages = feed_rx.recv_feed_messages().await.unwrap(); + assert_contains_matches!(feed_messages, AddedChain { name, node_count: 1 } if name == "Local Testnet 1"); + + // Subscribe it to a chain + feed_tx.send_command("subscribe", "Local Testnet 1").await.unwrap(); + + let feed_messages = feed_rx.recv_feed_messages().await.unwrap(); + assert_contains_matches!( + feed_messages, + SubscribedTo { name } if name == "Local Testnet 1", + TimeSync {..}, + BestBlock { block_number: 0, timestamp: 0, avg_block_time: None }, + BestFinalized { block_number: 0, .. }, + AddedNode { node_id: 0, node: NodeDetails { name, .. }, .. } if name == "Alice 1", + FinalizedBlock { node_id: 0, block_number: 0, .. } + ); + + // We receive updates relating to nodes on that chain: + node_tx.send_json_text(json!( + {"id":1, "payload":{ "bandwidth_download":576,"bandwidth_upload":576,"msg":"system.interval","peers":1},"ts":"2021-07-12T10:37:48.330433+01:00" } + )).await.unwrap(); + + let feed_messages = feed_rx.recv_feed_messages().await.unwrap(); + assert_ne!(feed_messages.len(), 0); + + // We don't receive anything for updates to nodes on other chains (wait a sec to ensure no messages are sent): + node_tx.send_json_text(json!( + {"id":2, "payload":{ "bandwidth_download":576,"bandwidth_upload":576,"msg":"system.interval","peers":1},"ts":"2021-07-12T10:37:48.330433+01:00" } + )).await.unwrap(); + + tokio::time::timeout(Duration::from_secs(1), feed_rx.recv_feed_messages()) + .await + .expect_err("Timeout should elapse since no messages sent"); + + // We can change our subscription: + feed_tx.send_command("subscribe", "Local Testnet 2").await.unwrap(); + let feed_messages = feed_rx.recv_feed_messages().await.unwrap(); + + // We are told about the subscription change and given similar on-subscribe messages to above. + assert_contains_matches!( + &feed_messages, + UnsubscribedFrom { name } if name == "Local Testnet 1", + SubscribedTo { name } if name == "Local Testnet 2", + TimeSync {..}, + BestBlock {..}, + BestFinalized {..}, + AddedNode { node: NodeDetails { name, .. }, ..} if name == "Alice 2", + FinalizedBlock {..}, + ); + + // We didn't get messages from this earlier, but we will now we've subscribed: + node_tx.send_json_text(json!( + {"id":2, "payload":{ "bandwidth_download":576,"bandwidth_upload":576,"msg":"system.interval","peers":1},"ts":"2021-07-12T10:38:48.330433+01:00" } + )).await.unwrap(); + + let feed_messages = feed_rx.recv_feed_messages().await.unwrap(); + assert_ne!(feed_messages.len(), 0); + // Tidy up: server.shutdown().await; } \ No newline at end of file diff --git a/backend/test_utils/src/contains_matches.rs b/backend/test_utils/src/contains_matches.rs new file mode 100644 index 0000000..b619c68 --- /dev/null +++ b/backend/test_utils/src/contains_matches.rs @@ -0,0 +1,94 @@ +/** +This macro checks to see whether an iterable container contains each of the +match items given, in the order that they are given in (but not necessarily +contiguous, ie other items may be interspersed between the ones we're looking +to match). + +Similar to `matches!`. + +``` +enum Item { + Foo { a: usize }, + Bar(bool), + Wibble +} + +use Item::*; + +let does_contain: bool = test_utils::contains_matches!( + vec![Foo { a: 2 }, Wibble, Bar(true), Foo { a: 100 }], + Foo { a: 2 } | Foo { a: 3 }, + Bar(true), + Foo {..} +); + +assert!(does_contain); +``` +*/ +#[macro_export] +macro_rules! contains_matches { + ($expression:expr, $( $( $pattern:pat )|+ $( if $guard:expr )? ),+ $(,)?) => {{ + let mut items = $expression.into_iter(); + + // For each pattern we want to match, we consume items until + // we find the first match, and then break the loop and do the + // same again with the next pattern. If we run out of items, we + // set the validity to false and stop trying to match. Else, we + // match againse each of the patterns and return true. + let mut is_valid = true; + $( + while is_valid { + let item = match items.next() { + Some(item) => item, + None => { + is_valid = false; + break; + } + }; + + match item { + $( $pattern )|+ $( if $guard )? => break, + _ => continue + } + } + )+ + + is_valid + }} +} + +/** +This macro checks to see whether an iterable container contains each of the +match items given, in the order that they are given in (but not necessarily +contiguous, ie other items may be interspersed between the ones we're looking +to match). + +Panics if this is not the case. +``` +enum Item { + Foo { a: usize }, + Bar(bool), + Wibble +} + +use Item::*; + +test_utils::assert_contains_matches!( + vec![Foo { a: 2 }, Wibble, Bar(true), Foo { a: 100 }], + Foo { a: 2 }, + Bar(true), + Foo {..} +); +``` +*/ +#[macro_export] +macro_rules! assert_contains_matches { + ($expression:expr, $( $( $pattern:pat )|+ $( if $guard:expr )? ),+ $(,)?) => { + let does_contain_matches = $crate::contains_matches!( + $expression, + $( $( $pattern )|+ $( if $guard )? ),+ + ); + + assert!(does_contain_matches); + } +} diff --git a/backend/test_utils/src/lib.rs b/backend/test_utils/src/lib.rs index fc44e9b..da19dca 100644 --- a/backend/test_utils/src/lib.rs +++ b/backend/test_utils/src/lib.rs @@ -8,3 +8,8 @@ pub mod feed_message_de; /// A wrapper around soketto to simplify the process of establishing connections /// and sending messages. Provides cancel-safe message channels. pub mod ws_client; + +/// A couple of macros to make it easier to test for the presense of things (mainly, feed messages) +/// in an iterable container. +#[macro_use] +pub mod contains_matches; diff --git a/backend/test_utils/src/server/server.rs b/backend/test_utils/src/server/server.rs index df57039..258eb39 100644 --- a/backend/test_utils/src/server/server.rs +++ b/backend/test_utils/src/server/server.rs @@ -51,8 +51,9 @@ pub enum Error { CannotAddShardNoHandle, } -/// This provides back connections (or groups of connections) that are -/// hooked up to the running processes and ready to send/receive messages. +/// This represents a telemetry core process and zero or more connected shards. +/// From this, you can add/remove shards, establish node/feed connections, and +/// send/receive relevant messages from each. pub struct Server { /// URI to connect a shard to core: core_shard_submit_uri: Option,