tidyup, add more e2e tests, fix (and test a little) feed msg decoding, supporting bits

This commit is contained in:
James Wilson
2021-07-12 13:17:24 +01:00
parent f2f122285e
commit c6c262c9c5
9 changed files with 201 additions and 31 deletions
+1
View File
@@ -1585,6 +1585,7 @@ dependencies = [
"futures-sink",
"log",
"pin-project-lite",
"slab",
"tokio",
]
@@ -265,7 +265,6 @@ impl InnerLoop {
&genesis_hash,
feed_messages_for_chain,
);
// Tell everybody about the new node count and potential rename:
let mut feed_messages_for_all = FeedMessageSerializer::new();
if has_chain_label_changed {
@@ -376,7 +375,7 @@ impl InnerLoop {
let mut feed_serializer = FeedMessageSerializer::new();
feed_serializer.push(feed_message::Pong(&value));
if let Some(bytes) = feed_serializer.into_finalized() {
let _ = feed_channel.send(ToFeedWebsocket::Bytes(bytes)).await;
let _ = feed_channel.unbounded_send(ToFeedWebsocket::Bytes(bytes));
}
}
FromFeedWebsocket::Subscribe { chain } => {
@@ -430,7 +429,7 @@ impl InnerLoop {
// and continue sending batches of 32 nodes a time over the wire subsequently
if idx % 32 == 0 {
if let Some(bytes) = feed_serializer.finalize() {
let _ = feed_channel.send(ToFeedWebsocket::Bytes(bytes)).await;
let _ = feed_channel.unbounded_send(ToFeedWebsocket::Bytes(bytes));
}
}
feed_serializer.push(feed_message::AddedNode(chain_node_id, node));
@@ -444,7 +443,7 @@ impl InnerLoop {
}
}
if let Some(bytes) = feed_serializer.into_finalized() {
let _ = feed_channel.send(ToFeedWebsocket::Bytes(bytes)).await;
let _ = feed_channel.unbounded_send(ToFeedWebsocket::Bytes(bytes));
}
// Actually make a note of the new chain subsciption:
+90 -7
View File
@@ -1,29 +1,112 @@
#![cfg(feature = "e2e")]
//! These only run when the "e2e" feature is set (eg `cargo test --features e2e`).
//! The rust IDE plugins may behave better if you comment out this line during development:
/// #![cfg(feature = "e2e")]
use test_utils::{feed_message_de::FeedMessage, server::Server};
// use serde_json::json;
use serde_json::json;
use std::time::Duration;
#[tokio::test]
async fn can_ping_feed() {
async fn feed_sent_version_on_connect() {
let server = Server::start_default()
.await
.expect("server could start");
// Connect to the feed:
let (mut feed_tx, mut feed_rx) = server.get_core().connect().await.unwrap();
// Connect a feed:
let (_feed_tx, mut feed_rx) = server.get_core().connect().await.unwrap();
// Expect a version response of 31:
let feed_messages = feed_rx.recv_feed_messages().await.unwrap();
assert_eq!(feed_messages, vec![FeedMessage::Version(31)], "expecting version");
// Tidy up:
server.shutdown().await;
}
#[tokio::test]
async fn ping_responded_to_with_pong() {
let server = Server::start_default()
.await
.expect("server could start");
// Connect a feed:
let (mut feed_tx, mut feed_rx) = server.get_core().connect().await.unwrap();
// Ping it:
feed_tx.send_command("ping", "hello!").await.unwrap();
// Expect a pong response:
let feed_messages = feed_rx.recv_feed_messages().await.unwrap();
assert_eq!(feed_messages, vec![FeedMessage::Pong { msg: "hello!".to_owned() }], "expecting pong");
assert!(feed_messages.contains(&FeedMessage::Pong { msg: "hello!".to_owned() }), "Expecting pong");
// Tidy up:
server.shutdown().await;
}
#[tokio::test]
async fn node_can_be_added_and_removed() {
let mut server = Server::start_default()
.await
.expect("server could start");
// Add a shard:
let shard_id = server.add_shard()
.await
.expect("shard could be added");
// Connect a node to the shard:
let (mut node_tx, _node_rx) = server.get_shard(shard_id)
.unwrap()
.connect()
.await
.expect("can connect to shard");
// Send a "system connected" message:
node_tx.send_json_text(json!(
{
"id":1,
"ts":"2021-07-12T10:37:47.714666+01:00",
"payload": {
"authority":true,
"chain":"Local Testnet",
"config":"",
"genesis_hash":"0x340358f3029f5211d20d6a1f4bbe3567b39dffd35ce0d4b358fa7c62ba3f5505",
"implementation":"Substrate Node",
"msg":"system.connected",
"name":"Alice",
"network_id":"12D3KooWEyoppNCUx8Yx66oV9fJnriXwCcXwDDUA2kj6vnc6iDEp",
"startup_time":"1625565542717",
"version":"2.0.0-07a1af348-aarch64-macos"
},
}
)).await.unwrap();
// Wait a little for this message to propagate to the core
// (so that our feed connects after the core knows and not before).
tokio::time::sleep(Duration::from_millis(500)).await;
// Connect a feed.
let (_feed_tx, mut feed_rx) = server.get_core().connect()
.await.unwrap();
let feed_messages = feed_rx.recv_feed_messages().await.unwrap();
assert!(feed_messages.contains(
&FeedMessage::AddedChain {
name: "Local Testnet".to_owned(),
node_count: 1
}
));
// Disconnect the node:
node_tx.close().await.unwrap();
let feed_messages = feed_rx.recv_feed_messages().await.unwrap();
assert!(feed_messages.contains(
&FeedMessage::RemovedChain {
name: "Local Testnet".to_owned(),
}
));
// Tidy up:
server.shutdown().await;
}
+2 -3
View File
@@ -74,7 +74,7 @@ async fn start_server(opts: Opts) -> anyhow::Result<()> {
log::info!("Opening /submit connection from {:?}", addr);
ws.on_upgrade(move |websocket| async move {
let (mut tx_to_aggregator, websocket) =
handle_websocket_connection(websocket, tx_to_aggregator, addr).await;
handle_node_websocket_connection(websocket, tx_to_aggregator, addr).await;
log::info!("Closing /submit connection from {:?}", addr);
// Tell the aggregator that this connection has closed, so it can tidy up.
let _ = tx_to_aggregator.send(FromWebsocket::Disconnected).await;
@@ -92,7 +92,7 @@ async fn start_server(opts: Opts) -> anyhow::Result<()> {
}
/// This takes care of handling messages from an established socket connection.
async fn handle_websocket_connection<S>(
async fn handle_node_websocket_connection<S>(
mut websocket: ws::WebSocket,
mut tx_to_aggregator: S,
addr: Option<IpAddr>,
@@ -147,7 +147,6 @@ where
if !msg.is_binary() && !msg.is_text() {
continue;
}
// Deserialize from JSON, warning if deserialization fails:
let bytes = msg.as_bytes();
let node_message: json_message::NodeMessage = match serde_json::from_slice(bytes) {
+1 -1
View File
@@ -15,5 +15,5 @@ serde_json = "1.0.64"
soketto = "0.6.0"
thiserror = "1.0.25"
tokio = { version = "1.7.1", features = ["full"] }
tokio-util = "0.6.7"
tokio-util = { version = "0.6.7", features = ["full"] }
common = { path = "../common" }
+33 -1
View File
@@ -121,7 +121,7 @@ impl FeedMessage {
let v: Vec<&RawValue> = serde_json::from_slice(bytes)?;
let mut feed_messages = vec![];
for raw_keyval in v.windows(2) {
for raw_keyval in v.chunks(2) {
let raw_key = raw_keyval[0];
let raw_val = raw_keyval[1];
feed_messages.push(FeedMessage::decode(raw_key, raw_val)?);
@@ -274,4 +274,36 @@ impl FeedMessage {
Ok(feed_message)
}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn decode_remove_node_msg() {
// "remove chain ''":
let msg = r#"[12,""]"#;
assert_eq!(
FeedMessage::from_bytes(msg.as_bytes()).unwrap(),
vec![FeedMessage::RemovedChain { name: "".to_owned() }]
);
}
#[test]
fn decode_remove_then_add_node_msg() {
// "remove chain '', then add chain 'Local Testnet' with 1 node":
let msg = r#"[12,"",11,["Local Testnet",1]]"#;
assert_eq!(
FeedMessage::from_bytes(msg.as_bytes()).unwrap(),
vec![
FeedMessage::RemovedChain { name: "".to_owned() },
FeedMessage::AddedChain { name: "Local Testnet".to_owned(), node_count: 1 },
]
);
}
}
+38 -1
View File
@@ -1,3 +1,5 @@
use std::time::Duration;
use crate::ws_client;
use futures::{Sink, SinkExt, Stream, StreamExt};
use crate::feed_message_de::FeedMessage;
@@ -9,6 +11,13 @@ impl From<ws_client::Sender> for ShardSender {
fn from(c: ws_client::Sender) -> Self { ShardSender(c) }
}
impl ShardSender {
/// Close this connection
pub async fn close(&mut self) -> Result<(),ws_client::SendError> {
self.0.close().await
}
}
impl Sink<ws_client::Message> for ShardSender {
type Error = ws_client::SendError;
fn poll_ready(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Result<(), Self::Error>> {
@@ -97,7 +106,11 @@ impl Stream for FeedReceiver {
impl FeedReceiver {
/// Wait for the next set of feed messages to arrive. Returns an error if the connection
/// is closed, or the messages that come back cannot be properly decoded.
pub async fn recv_feed_messages(&mut self) -> Result<Vec<FeedMessage>, anyhow::Error> {
///
/// Prefer [`FeedReceiver::recv_feed_messages`]; tests should generally be
/// robust in assuming that messages may not all be delivered at once (unless we are
/// specifically testing which messages are buffered together).
pub async fn recv_feed_messages_once(&mut self) -> Result<Vec<FeedMessage>, anyhow::Error> {
let msg = self.0
.next()
.await
@@ -114,4 +127,28 @@ impl FeedReceiver {
}
}
}
/// Wait for feed messages to be sent back, building up a list of output messages until
/// the channel goes quiet for a short while.
pub async fn recv_feed_messages(&mut self) -> Result<Vec<FeedMessage>, anyhow::Error> {
// Block as long as needed for messages to start coming in:
let mut feed_messages = self.recv_feed_messages_once().await?;
// Then, loop a little to make sure we catch any additional messages that are sent soon after:
loop {
match tokio::time::timeout(Duration::from_millis(250), self.recv_feed_messages_once()).await {
// Timeout elapsed; return the messages we have so far
Err(_) => {
break Ok(feed_messages);
},
// Append messages that come back to our vec
Ok(Ok(mut msgs)) => {
feed_messages.append(&mut msgs);
},
// Error came back receiving messages; return it
Ok(Err(e)) => {
break Err(e)
}
}
}
}
}
+12 -2
View File
@@ -140,9 +140,19 @@ impl Server {
.await
.map_err(|e| Error::ErrorObtainingPort(e))?;
// Attempt to wait until we've received word that the shard is connected to the
// core before continuing. If we don't wait for this, the connection may happen
// after we've attempted to connect node sockets, and they would be booted and
// made to reconnect, which we don't want to deal with in general.
let _ = utils::wait_for_line_containing(
&mut child_stdout,
"Connected to telemetry core",
std::time::Duration::from_secs(5)
).await;
// Since we're piping stdout from the child process, we need somewhere for it to go
// else the process will get stuck when it tries to produce output:
utils::drain(child_stdout, tokio::io::stdout());
utils::drain(child_stdout, tokio::io::sink());
let shard_uri = format!("http://127.0.0.1:{}/submit", shard_port)
.parse()
@@ -189,7 +199,7 @@ impl Server {
// Since we're piping stdout from the child process, we need somewhere for it to go
// else the process will get stuck when it tries to produce output:
utils::drain(child_stdout, tokio::io::stdout());
utils::drain(child_stdout, tokio::io::sink());
// URI for feeds to connect to the core:
let feed_uri = format!("http://127.0.0.1:{}/feed", core_port)
+21 -12
View File
@@ -8,19 +8,34 @@ use anyhow::{ anyhow, Context };
/// with the side benefit that we'll wait for it to start listening before returning. We do this
/// because we want to allow the kernel to assign ports and so don't specify a port as an arg.
pub async fn get_port<R: AsyncRead + Unpin>(reader: R) -> Result<u16, anyhow::Error> {
let expected_text = "listening on http://127.0.0.1:";
wait_for_line_containing(reader, expected_text, Duration::from_secs(30))
.await
.and_then(|line| {
let (_, port_str) = line.rsplit_once(expected_text).unwrap();
port_str
.trim()
.parse()
.with_context(|| format!("Could not parse output to port: {}", port_str))
})
}
/// Wait for a line of output containing the text given. Also provide a timeout,
/// such that if we don't see a new line of output within the timeout we bail out
/// and return an error.
pub async fn wait_for_line_containing<R: AsyncRead + Unpin>(reader: R, text: &str, max_wait_between_lines: Duration) -> Result<String, anyhow::Error> {
let reader = BufReader::new(reader);
let mut reader_lines = reader.lines();
loop {
let line = tokio::time::timeout(
// This has to accomodate pauses during compilation if the cmd is "cargo run --":
Duration::from_secs(30),
max_wait_between_lines,
reader_lines.next_line()
).await;
let line = match line {
// timeout expired; couldn't get port:
Err(e) => return Err(anyhow!("Timeout expired waiting to discover port: {}", e)),
Err(_) => return Err(anyhow!("Timeout expired waiting for output containing: {}", text)),
// Something went wrong reading line; bail:
Ok(Err(e)) => return Err(anyhow!("Could not read line from stdout: {}", e)),
// No more output; process ended? bail:
@@ -29,15 +44,9 @@ pub async fn get_port<R: AsyncRead + Unpin>(reader: R) -> Result<u16, anyhow::Er
Ok(Ok(Some(line))) => line
};
let (_, port_str) = match line.rsplit_once("listening on http://127.0.0.1:") {
Some(m) => m,
None => continue
};
return port_str
.trim()
.parse()
.with_context(|| format!("Could not parse output to port: {}", port_str));
if line.contains(text) {
return Ok(line);
}
}
}