contains_matches macro and more tests

This commit is contained in:
James Wilson
2021-07-12 17:21:01 +01:00
parent c6c262c9c5
commit cfe7ff39bb
4 changed files with 289 additions and 15 deletions
+187 -13
View File
@@ -1,10 +1,11 @@
//! These only run when the "e2e" feature is set (eg `cargo test --features e2e`).
//! The rust IDE plugins may behave better if you comment out this line during development:
/// #![cfg(feature = "e2e")]
#![cfg(feature = "e2e")]
use test_utils::{feed_message_de::FeedMessage, server::Server};
use test_utils::{feed_message_de::{ FeedMessage, NodeDetails }, server::Server, assert_contains_matches};
use serde_json::json;
use std::time::Duration;
use common::node_types::{ BlockHash };
#[tokio::test]
async fn feed_sent_version_on_connect() {
@@ -24,7 +25,7 @@ async fn feed_sent_version_on_connect() {
}
#[tokio::test]
async fn ping_responded_to_with_pong() {
async fn feed_ping_responded_to_with_pong() {
let server = Server::start_default()
.await
.expect("server could start");
@@ -44,15 +45,10 @@ async fn ping_responded_to_with_pong() {
}
#[tokio::test]
async fn node_can_be_added_and_removed() {
let mut server = Server::start_default()
.await
.expect("server could start");
// Add a shard:
let shard_id = server.add_shard()
.await
.expect("shard could be added");
async fn feed_add_and_remove_node() {
// Connect server and add shard
let mut server = Server::start_default().await.unwrap();
let shard_id = server.add_shard().await.unwrap();
// Connect a node to the shard:
let (mut node_tx, _node_rx) = server.get_shard(shard_id)
@@ -70,7 +66,7 @@ async fn node_can_be_added_and_removed() {
"authority":true,
"chain":"Local Testnet",
"config":"",
"genesis_hash":"0x340358f3029f5211d20d6a1f4bbe3567b39dffd35ce0d4b358fa7c62ba3f5505",
"genesis_hash": BlockHash::from_low_u64_ne(1),
"implementation":"Substrate Node",
"msg":"system.connected",
"name":"Alice",
@@ -107,6 +103,184 @@ async fn node_can_be_added_and_removed() {
}
));
// Tidy up:
server.shutdown().await;
}
#[tokio::test]
async fn feed_add_and_remove_shard() {
let mut server = Server::start_default()
.await
.expect("server could start");
let mut shards = vec![];
for id in 1 ..= 2 {
// Add a shard:
let shard_id = server.add_shard().await.unwrap();
// Connect a node to it:
let (mut node_tx, _node_rx) = server.get_shard(shard_id)
.unwrap()
.connect()
.await
.expect("can connect to shard");
// Send a "system connected" message:
node_tx.send_json_text(json!(
{
"id":id,
"ts":"2021-07-12T10:37:47.714666+01:00",
"payload": {
"authority":true,
"chain": format!("Local Testnet {}", id),
"config":"",
"genesis_hash": BlockHash::from_low_u64_ne(id),
"implementation":"Substrate Node",
"msg":"system.connected",
"name":"Alice",
"network_id":"12D3KooWEyoppNCUx8Yx66oV9fJnriXwCcXwDDUA2kj6vnc6iDEp",
"startup_time":"1625565542717",
"version":"2.0.0-07a1af348-aarch64-macos"
},
}
)).await.unwrap();
// Keep what we need to to keep connection alive and let us kill a shard:
shards.push((shard_id, node_tx));
}
let shard1 = shards.remove(0);
// Connect a feed.
let (_feed_tx, mut feed_rx) = server.get_core().connect().await.unwrap();
// The feed should be told about both of the chains that we've sent info about:
let feed_messages = feed_rx.recv_feed_messages().await.unwrap();
assert!(feed_messages.contains(
&FeedMessage::AddedChain {
name: "Local Testnet 1".to_owned(),
node_count: 1
}
));
assert!(feed_messages.contains(
&FeedMessage::AddedChain {
name: "Local Testnet 2".to_owned(),
node_count: 1
}
));
// Disconnect the first shard:
server.kill_shard(shard1.0).await;
// We should be told about the node connected to that shard disconnecting:
let feed_messages = feed_rx.recv_feed_messages().await.unwrap();
assert!(feed_messages.contains(
&FeedMessage::RemovedChain {
name: "Local Testnet 1".to_owned(),
}
));
assert!(!feed_messages.contains(
&FeedMessage::RemovedChain {
name: "Local Testnet 2".to_owned(),
}
));
// Tidy up:
server.shutdown().await;
}
#[tokio::test]
async fn feed_can_subscribe_and_unsubscribe_from_chain() {
use FeedMessage::*;
// Start server, add shard, connect node:
let mut server = Server::start_default().await.unwrap();
let shard_id = server.add_shard().await.unwrap();
let (mut node_tx, _node_rx) = server.get_shard(shard_id).unwrap().connect().await.unwrap();
// Send a "system connected" message for a few nodes/chains:
for id in 1..=3 {
node_tx.send_json_text(json!(
{
"id":id,
"ts":"2021-07-12T10:37:47.714666+01:00",
"payload": {
"authority":true,
"chain":format!("Local Testnet {}", id),
"config":"",
"genesis_hash": BlockHash::from_low_u64_ne(id),
"implementation":"Substrate Node",
"msg":"system.connected",
"name":format!("Alice {}", id),
"network_id":"12D3KooWEyoppNCUx8Yx66oV9fJnriXwCcXwDDUA2kj6vnc6iDEp",
"startup_time":"1625565542717",
"version":"2.0.0-07a1af348-aarch64-macos"
},
}
)).await.unwrap();
}
// Connect a feed
let (mut feed_tx, mut feed_rx) = server.get_core().connect().await.unwrap();
let feed_messages = feed_rx.recv_feed_messages().await.unwrap();
assert_contains_matches!(feed_messages, AddedChain { name, node_count: 1 } if name == "Local Testnet 1");
// Subscribe it to a chain
feed_tx.send_command("subscribe", "Local Testnet 1").await.unwrap();
let feed_messages = feed_rx.recv_feed_messages().await.unwrap();
assert_contains_matches!(
feed_messages,
SubscribedTo { name } if name == "Local Testnet 1",
TimeSync {..},
BestBlock { block_number: 0, timestamp: 0, avg_block_time: None },
BestFinalized { block_number: 0, .. },
AddedNode { node_id: 0, node: NodeDetails { name, .. }, .. } if name == "Alice 1",
FinalizedBlock { node_id: 0, block_number: 0, .. }
);
// We receive updates relating to nodes on that chain:
node_tx.send_json_text(json!(
{"id":1, "payload":{ "bandwidth_download":576,"bandwidth_upload":576,"msg":"system.interval","peers":1},"ts":"2021-07-12T10:37:48.330433+01:00" }
)).await.unwrap();
let feed_messages = feed_rx.recv_feed_messages().await.unwrap();
assert_ne!(feed_messages.len(), 0);
// We don't receive anything for updates to nodes on other chains (wait a sec to ensure no messages are sent):
node_tx.send_json_text(json!(
{"id":2, "payload":{ "bandwidth_download":576,"bandwidth_upload":576,"msg":"system.interval","peers":1},"ts":"2021-07-12T10:37:48.330433+01:00" }
)).await.unwrap();
tokio::time::timeout(Duration::from_secs(1), feed_rx.recv_feed_messages())
.await
.expect_err("Timeout should elapse since no messages sent");
// We can change our subscription:
feed_tx.send_command("subscribe", "Local Testnet 2").await.unwrap();
let feed_messages = feed_rx.recv_feed_messages().await.unwrap();
// We are told about the subscription change and given similar on-subscribe messages to above.
assert_contains_matches!(
&feed_messages,
UnsubscribedFrom { name } if name == "Local Testnet 1",
SubscribedTo { name } if name == "Local Testnet 2",
TimeSync {..},
BestBlock {..},
BestFinalized {..},
AddedNode { node: NodeDetails { name, .. }, ..} if name == "Alice 2",
FinalizedBlock {..},
);
// We didn't get messages from this earlier, but we will now we've subscribed:
node_tx.send_json_text(json!(
{"id":2, "payload":{ "bandwidth_download":576,"bandwidth_upload":576,"msg":"system.interval","peers":1},"ts":"2021-07-12T10:38:48.330433+01:00" }
)).await.unwrap();
let feed_messages = feed_rx.recv_feed_messages().await.unwrap();
assert_ne!(feed_messages.len(), 0);
// Tidy up:
server.shutdown().await;
}
@@ -0,0 +1,94 @@
/**
This macro checks to see whether an iterable container contains each of the
match items given, in the order that they are given in (but not necessarily
contiguous, ie other items may be interspersed between the ones we're looking
to match).
Similar to `matches!`.
```
enum Item {
Foo { a: usize },
Bar(bool),
Wibble
}
use Item::*;
let does_contain: bool = test_utils::contains_matches!(
vec![Foo { a: 2 }, Wibble, Bar(true), Foo { a: 100 }],
Foo { a: 2 } | Foo { a: 3 },
Bar(true),
Foo {..}
);
assert!(does_contain);
```
*/
#[macro_export]
macro_rules! contains_matches {
($expression:expr, $( $( $pattern:pat )|+ $( if $guard:expr )? ),+ $(,)?) => {{
let mut items = $expression.into_iter();
// For each pattern we want to match, we consume items until
// we find the first match, and then break the loop and do the
// same again with the next pattern. If we run out of items, we
// set the validity to false and stop trying to match. Else, we
// match againse each of the patterns and return true.
let mut is_valid = true;
$(
while is_valid {
let item = match items.next() {
Some(item) => item,
None => {
is_valid = false;
break;
}
};
match item {
$( $pattern )|+ $( if $guard )? => break,
_ => continue
}
}
)+
is_valid
}}
}
/**
This macro checks to see whether an iterable container contains each of the
match items given, in the order that they are given in (but not necessarily
contiguous, ie other items may be interspersed between the ones we're looking
to match).
Panics if this is not the case.
```
enum Item {
Foo { a: usize },
Bar(bool),
Wibble
}
use Item::*;
test_utils::assert_contains_matches!(
vec![Foo { a: 2 }, Wibble, Bar(true), Foo { a: 100 }],
Foo { a: 2 },
Bar(true),
Foo {..}
);
```
*/
#[macro_export]
macro_rules! assert_contains_matches {
($expression:expr, $( $( $pattern:pat )|+ $( if $guard:expr )? ),+ $(,)?) => {
let does_contain_matches = $crate::contains_matches!(
$expression,
$( $( $pattern )|+ $( if $guard )? ),+
);
assert!(does_contain_matches);
}
}
+5
View File
@@ -8,3 +8,8 @@ pub mod feed_message_de;
/// A wrapper around soketto to simplify the process of establishing connections
/// and sending messages. Provides cancel-safe message channels.
pub mod ws_client;
/// A couple of macros to make it easier to test for the presense of things (mainly, feed messages)
/// in an iterable container.
#[macro_use]
pub mod contains_matches;
+3 -2
View File
@@ -51,8 +51,9 @@ pub enum Error {
CannotAddShardNoHandle,
}
/// This provides back connections (or groups of connections) that are
/// hooked up to the running processes and ready to send/receive messages.
/// This represents a telemetry core process and zero or more connected shards.
/// From this, you can add/remove shards, establish node/feed connections, and
/// send/receive relevant messages from each.
pub struct Server {
/// URI to connect a shard to core:
core_shard_submit_uri: Option<http::Uri>,