diff --git a/backend/common/src/http_utils.rs b/backend/common/src/http_utils.rs index bf2d145..49c19fc 100644 --- a/backend/common/src/http_utils.rs +++ b/backend/common/src/http_utils.rs @@ -1,29 +1,27 @@ -use std::net::SocketAddr; -use hyper::{ Server, Request, Response, Body }; -use std::future::Future; -use tokio_util::compat::{Compat,TokioAsyncReadCompatExt}; use futures::io::{BufReader, BufWriter}; use hyper::server::conn::AddrStream; +use hyper::{Body, Request, Response, Server}; +use std::future::Future; +use std::net::SocketAddr; +use tokio_util::compat::{Compat, TokioAsyncReadCompatExt}; /// A convenience function to start up a Hyper server and handle requests. pub async fn start_server(addr: SocketAddr, handler: H) -> Result<(), anyhow::Error> where H: Clone + Send + Sync + 'static + FnMut(SocketAddr, Request) -> F, - F: Send + 'static + Future, anyhow::Error>> + F: Send + 'static + Future, anyhow::Error>>, { - let service = - hyper::service::make_service_fn(move |addr: &AddrStream| { - let mut handler = handler.clone(); - let addr = addr.remote_addr(); - async move { Ok::<_, hyper::Error>(hyper::service::service_fn(move |r| handler(addr, r))) } - }); - let server = Server::bind(&addr).serve(service); + let service = hyper::service::make_service_fn(move |addr: &AddrStream| { + let mut handler = handler.clone(); + let addr = addr.remote_addr(); + async move { Ok::<_, hyper::Error>(hyper::service::service_fn(move |r| handler(addr, r))) } + }); + let server = Server::bind(&addr).serve(service); - log::info!("listening on http://{}", server.local_addr()); - server.await?; - - Ok(()) + log::info!("listening on http://{}", server.local_addr()); + server.await?; + Ok(()) } type WsStream = BufReader>>; @@ -31,27 +29,40 @@ pub type WsSender = soketto::connection::Sender; pub type WsReceiver = soketto::connection::Receiver; /// A convenience function to upgrade a Hyper request into a Soketto Websocket. -pub fn upgrade_to_websocket(req: Request, on_upgrade: H) -> hyper::Response +pub fn upgrade_to_websocket(req: Request, on_upgrade: H) -> hyper::Response where H: 'static + Send + FnOnce(WsSender, WsReceiver) -> F, - F: Send + Future + F: Send + Future, { if !is_upgrade_request(&req) { - return basic_response(400, "Expecting WebSocket upgrade headers"); + return basic_response(400, "Expecting WebSocket upgrade headers"); } - let key = match req.headers().get("Sec-WebSocket-Key") { + let key = match req.headers().get("Sec-WebSocket-Key") { Some(key) => key, - None => return basic_response(400, "Upgrade to websocket connection failed; Sec-WebSocket-Key header not provided") + None => { + return basic_response( + 400, + "Upgrade to websocket connection failed; Sec-WebSocket-Key header not provided", + ) + } }; - if req.headers().get("Sec-WebSocket-Version").map(|v| v.as_bytes()) != Some(b"13") { - return basic_response(400, "Sec-WebSocket-Version header should have a value of 13"); - } + if req + .headers() + .get("Sec-WebSocket-Version") + .map(|v| v.as_bytes()) + != Some(b"13") + { + return basic_response( + 400, + "Sec-WebSocket-Version header should have a value of 13", + ); + } - // Just a little ceremony we need to go to to return the correct response key: - let mut accept_key_buf = [0; 32]; - let accept_key = generate_websocket_accept_key(key.as_bytes(), &mut accept_key_buf); + // Just a little ceremony we need to go to to return the correct response key: + let mut accept_key_buf = [0; 32]; + let accept_key = generate_websocket_accept_key(key.as_bytes(), &mut accept_key_buf); // Tell the client that we accept the upgrade-to-WS request: let response = Response::builder() @@ -74,13 +85,11 @@ where }; // Start a Soketto server with it: - let server = soketto::handshake::Server::new( - BufReader::new(BufWriter::new(stream.compat())) - ); + let server = + soketto::handshake::Server::new(BufReader::new(BufWriter::new(stream.compat()))); // Get hold of a way to send and receive messages: - let (sender, receiver) - = server.into_builder().finish(); + let (sender, receiver) = server.into_builder().finish(); // Pass these to our when-upgraded handler: on_upgrade(sender, receiver).await; @@ -100,40 +109,48 @@ fn basic_response(code: u16, msg: impl AsRef) -> Response { /// Defined in RFC 6455. this is how we convert the Sec-WebSocket-Key in a request into a /// Sec-WebSocket-Accept that we return in the response. fn generate_websocket_accept_key<'a>(key: &[u8], buf: &'a mut [u8; 32]) -> &'a [u8] { - // Defined in RFC 6455, we append this to the key to generate the response: - const KEY: &[u8] = b"258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; + // Defined in RFC 6455, we append this to the key to generate the response: + const KEY: &[u8] = b"258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; - use sha1::{Digest, Sha1}; - let mut digest = Sha1::new(); - digest.update(key); - digest.update(KEY); - let d = digest.finalize(); + use sha1::{Digest, Sha1}; + let mut digest = Sha1::new(); + digest.update(key); + digest.update(KEY); + let d = digest.finalize(); - let n = base64::encode_config_slice(&d, base64::STANDARD, buf); - &buf[..n] + let n = base64::encode_config_slice(&d, base64::STANDARD, buf); + &buf[..n] } /// Check if a request is a websocket upgrade request. fn is_upgrade_request(request: &hyper::Request) -> bool { - header_contains_value(request.headers(), hyper::header::CONNECTION, b"upgrade") - && header_contains_value(request.headers(), hyper::header::UPGRADE, b"websocket") + header_contains_value(request.headers(), hyper::header::CONNECTION, b"upgrade") + && header_contains_value(request.headers(), hyper::header::UPGRADE, b"websocket") } /// Check if there is a header of the given name containing the wanted value. -fn header_contains_value(headers: &hyper::HeaderMap, header: hyper::header::HeaderName, value: &[u8]) -> bool { - pub fn trim(x: &[u8]) -> &[u8] { - let from = match x.iter().position(|x| !x.is_ascii_whitespace()) { - Some(i) => i, - None => return &x[0..0], - }; - let to = x.iter().rposition(|x| !x.is_ascii_whitespace()).unwrap(); - &x[from..=to] - } +fn header_contains_value( + headers: &hyper::HeaderMap, + header: hyper::header::HeaderName, + value: &[u8], +) -> bool { + pub fn trim(x: &[u8]) -> &[u8] { + let from = match x.iter().position(|x| !x.is_ascii_whitespace()) { + Some(i) => i, + None => return &x[0..0], + }; + let to = x.iter().rposition(|x| !x.is_ascii_whitespace()).unwrap(); + &x[from..=to] + } - for header in headers.get_all(header) { - if header.as_bytes().split(|&c| c == b',').any(|x| trim(x).eq_ignore_ascii_case(value)) { - return true; - } - } - false -} \ No newline at end of file + for header in headers.get_all(header) { + if header + .as_bytes() + .split(|&c| c == b',') + .any(|x| trim(x).eq_ignore_ascii_case(value)) + { + return true; + } + } + false +} diff --git a/backend/common/src/lib.rs b/backend/common/src/lib.rs index 19ff1cb..2314d16 100644 --- a/backend/common/src/lib.rs +++ b/backend/common/src/lib.rs @@ -1,11 +1,11 @@ +pub mod http_utils; pub mod id_type; pub mod internal_messages; pub mod node_message; pub mod node_types; +pub mod ready_chunks_all; pub mod time; pub mod ws_client; -pub mod ready_chunks_all; -pub mod http_utils; mod assign_id; mod dense_map; diff --git a/backend/common/src/ready_chunks_all.rs b/backend/common/src/ready_chunks_all.rs index f4cc367..f5b5872 100644 --- a/backend/common/src/ready_chunks_all.rs +++ b/backend/common/src/ready_chunks_all.rs @@ -9,12 +9,12 @@ //! Code is adapted from the futures implementation //! (see [ready_chunks.rs](https://docs.rs/futures-util/0.3.15/src/futures_util/stream/stream/ready_chunks.rs.html)). -use futures::stream::Fuse; -use futures::StreamExt; use core::mem; use core::pin::Pin; +use futures::stream::Fuse; use futures::stream::{FusedStream, Stream}; use futures::task::{Context, Poll}; +use futures::StreamExt; use pin_project_lite::pin_project; pin_project! { @@ -37,7 +37,7 @@ where pub fn new(stream: St) -> Self { Self { stream: stream.fuse(), - items: Vec::new() + items: Vec::new(), } } } diff --git a/backend/common/src/ws_client/connect.rs b/backend/common/src/ws_client/connect.rs index 7a399ca..9cf909b 100644 --- a/backend/common/src/ws_client/connect.rs +++ b/backend/common/src/ws_client/connect.rs @@ -5,21 +5,22 @@ use tokio::net::TcpStream; use tokio_util::compat::TokioAsyncReadCompatExt; use super::{ - sender::{ Sender, SentMessage, SentMessageInternal }, - receiver::{ Receiver, RecvMessage } + receiver::{Receiver, RecvMessage}, + sender::{Sender, SentMessage, SentMessageInternal}, }; /// The send side of a Soketto WebSocket connection pub type RawSender = soketto::connection::Sender>; /// The receive side of a Soketto WebSocket connection -pub type RawReceiver = soketto::connection::Receiver>; +pub type RawReceiver = + soketto::connection::Receiver>; /// A websocket connection. From this, we can either expose the raw connection /// or expose a cancel-safe interface to it. pub struct Connection { tx: soketto::connection::Sender>, - rx: soketto::connection::Receiver> + rx: soketto::connection::Receiver>, } impl Connection { @@ -109,7 +110,7 @@ impl Connection { ); break; } - }, + } SentMessageInternal::Message(SentMessage::StaticText(s)) => { if let Err(e) = ws_to_connection.send_text(s).await { log::error!( @@ -127,7 +128,7 @@ impl Connection { ); break; } - }, + } SentMessageInternal::Close => { if let Err(e) = ws_to_connection.close().await { log::error!("Error attempting to close connection: {}", e); @@ -185,6 +186,6 @@ pub async fn connect(uri: &http::Uri) -> Result { Ok(Connection { tx: ws_to_connection, - rx: ws_from_connection + rx: ws_from_connection, }) } diff --git a/backend/common/src/ws_client/mod.rs b/backend/common/src/ws_client/mod.rs index a5d1b66..da37f6f 100644 --- a/backend/common/src/ws_client/mod.rs +++ b/backend/common/src/ws_client/mod.rs @@ -1,10 +1,10 @@ /// Functionality to establish a connection mod connect; -/// The channel based send interface -mod sender; /// The channel based receive interface mod receiver; +/// The channel based send interface +mod sender; -pub use connect::{ connect, ConnectError, Connection, RawSender, RawReceiver }; -pub use sender::{ Sender, SentMessage, SendError }; -pub use receiver::{ Receiver, RecvMessage, RecvError }; \ No newline at end of file +pub use connect::{connect, ConnectError, Connection, RawReceiver, RawSender}; +pub use receiver::{Receiver, RecvError, RecvMessage}; +pub use sender::{SendError, Sender, SentMessage}; diff --git a/backend/common/src/ws_client/receiver.rs b/backend/common/src/ws_client/receiver.rs index 1d362a3..dfaed84 100644 --- a/backend/common/src/ws_client/receiver.rs +++ b/backend/common/src/ws_client/receiver.rs @@ -3,7 +3,7 @@ use futures::{Stream, StreamExt}; /// Receive messages out of a connection pub struct Receiver { - pub (super) inner: mpsc::UnboundedReceiver>, + pub(super) inner: mpsc::UnboundedReceiver>, } #[derive(thiserror::Error, Debug)] @@ -40,4 +40,4 @@ impl RecvMessage { RecvMessage::Text(s) => s.len(), } } -} \ No newline at end of file +} diff --git a/backend/common/src/ws_client/sender.rs b/backend/common/src/ws_client/sender.rs index 09a7c86..392de0c 100644 --- a/backend/common/src/ws_client/sender.rs +++ b/backend/common/src/ws_client/sender.rs @@ -22,7 +22,7 @@ pub enum SentMessage { /// Messages sent into the channel interface can be anything publically visible, or a close message. #[derive(Debug, Clone)] -pub (super) enum SentMessageInternal { +pub(super) enum SentMessageInternal { Message(SentMessage), Close, } @@ -30,7 +30,7 @@ pub (super) enum SentMessageInternal { /// Send messages into the connection #[derive(Clone)] pub struct Sender { - pub (super) inner: mpsc::UnboundedSender, + pub(super) inner: mpsc::UnboundedSender, } impl Sender { @@ -56,7 +56,7 @@ impl Sender { #[derive(thiserror::Error, Debug, Clone)] pub enum SendError { #[error("Failed to send message: {0}")] - ChannelError(#[from] mpsc::SendError) + ChannelError(#[from] mpsc::SendError), } impl Sink for Sender { @@ -67,7 +67,10 @@ impl Sink for Sender { ) -> std::task::Poll> { self.inner.poll_ready_unpin(cx).map_err(|e| e.into()) } - fn start_send(mut self: std::pin::Pin<&mut Self>, item: SentMessage) -> Result<(), Self::Error> { + fn start_send( + mut self: std::pin::Pin<&mut Self>, + item: SentMessage, + ) -> Result<(), Self::Error> { self.inner .start_send_unpin(SentMessageInternal::Message(item)) .map_err(|e| e.into()) @@ -84,4 +87,4 @@ impl Sink for Sender { ) -> std::task::Poll> { self.inner.poll_close_unpin(cx).map_err(|e| e.into()) } -} \ No newline at end of file +} diff --git a/backend/telemetry_core/benches/throughput.rs b/backend/telemetry_core/benches/throughput.rs index 1b796a4..c8b19ee 100644 --- a/backend/telemetry_core/benches/throughput.rs +++ b/backend/telemetry_core/benches/throughput.rs @@ -1,98 +1,98 @@ use std::iter::FromIterator; -use futures::StreamExt; -use test_utils::workspace::start_server_release; -use criterion::{criterion_group, criterion_main, Criterion}; -use tokio::runtime::Runtime; -use serde_json::json; use common::node_types::BlockHash; +use criterion::{criterion_group, criterion_main, Criterion}; +use futures::StreamExt; +use serde_json::json; +use test_utils::workspace::start_server_release; +use tokio::runtime::Runtime; pub fn benchmark_throughput_single_shard(c: &mut Criterion) { /* - let rt = Runtime::new().expect("tokio runtime should start"); + let rt = Runtime::new().expect("tokio runtime should start"); - // Setup our server and node/feed connections first: - let (nodes, feeds) = rt.block_on(async { - let mut server = start_server_release().await; - let shard_id = server.add_shard().await.unwrap(); + // Setup our server and node/feed connections first: + let (nodes, feeds) = rt.block_on(async { + let mut server = start_server_release().await; + let shard_id = server.add_shard().await.unwrap(); - // Connect 1000 nodes to the shard: - let mut nodes = server - .get_shard(shard_id) - .unwrap() - .connect_multiple(1000) - .await - .expect("nodes can connect"); + // Connect 1000 nodes to the shard: + let mut nodes = server + .get_shard(shard_id) + .unwrap() + .connect_multiple(1000) + .await + .expect("nodes can connect"); - // Every node announces itself on the same chain: - for (idx, (node_tx, _)) in nodes.iter_mut().enumerate() { - node_tx.send_json_text(json!({ - "id":1, // message ID, not node ID. Can be the same for all. - "ts":"2021-07-12T10:37:47.714666+01:00", - "payload": { - "authority":true, - "chain":"Local Testnet", - "config":"", - "genesis_hash": BlockHash::from_low_u64_ne(1), - "implementation":"Substrate Node", - "msg":"system.connected", - "name": format!("Alice {}", idx), - "network_id":"12D3KooWEyoppNCUx8Yx66oV9fJnriXwCcXwDDUA2kj6vnc6iDEp", - "startup_time":"1625565542717", - "version":"2.0.0-07a1af348-aarch64-macos" - } - })).await.unwrap(); + // Every node announces itself on the same chain: + for (idx, (node_tx, _)) in nodes.iter_mut().enumerate() { + node_tx.send_json_text(json!({ + "id":1, // message ID, not node ID. Can be the same for all. + "ts":"2021-07-12T10:37:47.714666+01:00", + "payload": { + "authority":true, + "chain":"Local Testnet", + "config":"", + "genesis_hash": BlockHash::from_low_u64_ne(1), + "implementation":"Substrate Node", + "msg":"system.connected", + "name": format!("Alice {}", idx), + "network_id":"12D3KooWEyoppNCUx8Yx66oV9fJnriXwCcXwDDUA2kj6vnc6iDEp", + "startup_time":"1625565542717", + "version":"2.0.0-07a1af348-aarch64-macos" + } + })).await.unwrap(); + } + tokio::time::sleep(std::time::Duration::from_millis(500)).await; + // Start 1000 feeds: + let mut feeds = server + .get_core() + .connect_multiple(1) + .await + .expect("feeds can connect"); + + // // Subscribe all feeds to the chain: + // for (feed_tx, _) in feeds.iter_mut() { + // feed_tx.send_command("subscribe", "Local Testnet").await.unwrap(); + // } + + println!("consuming feed"); + { + + let mut msgs = futures::stream::FuturesUnordered::from_iter( + feeds + .iter_mut() + .map(|(_,rx)| rx.recv_feed_messages()) + ); + + let mut n = 0; + while let Some(Ok(msg)) = msgs.next().await { + n += 1; + println!("Message {}: {:?}", n, msg); } -tokio::time::sleep(std::time::Duration::from_millis(500)).await; - // Start 1000 feeds: - let mut feeds = server - .get_core() - .connect_multiple(1) - .await - .expect("feeds can connect"); - - // // Subscribe all feeds to the chain: - // for (feed_tx, _) in feeds.iter_mut() { - // feed_tx.send_command("subscribe", "Local Testnet").await.unwrap(); - // } - -println!("consuming feed"); -{ - - let mut msgs = futures::stream::FuturesUnordered::from_iter( - feeds - .iter_mut() - .map(|(_,rx)| rx.recv_feed_messages()) - ); - - let mut n = 0; - while let Some(Ok(msg)) = msgs.next().await { - n += 1; - println!("Message {}: {:?}", n, msg); } -} - // // Consume any messages feeds have received so far (every feed should havea few at least): - // let feed_consumers = feeds - // .iter_mut() - // .map(|(_,rx)| rx.next()); - // futures::future::join_all(feed_consumers).await; -println!("feed consumed"); - (nodes, feeds) - }); + // // Consume any messages feeds have received so far (every feed should havea few at least): + // let feed_consumers = feeds + // .iter_mut() + // .map(|(_,rx)| rx.next()); + // futures::future::join_all(feed_consumers).await; + println!("feed consumed"); + (nodes, feeds) + }); - // Next, run criterion using the same tokio runtime to benchmark time taken to send - // messages to nodes and receive them from feeds. - c.bench_function( - "throughput time", - |b| b.to_async(&rt).iter(|| async { + // Next, run criterion using the same tokio runtime to benchmark time taken to send + // messages to nodes and receive them from feeds. + c.bench_function( + "throughput time", + |b| b.to_async(&rt).iter(|| async { - // TODO: Actually implement the benchmark. + // TODO: Actually implement the benchmark. - }) - ); - */ + }) + ); + */ } criterion_group!(benches, benchmark_throughput_single_shard); -criterion_main!(benches); \ No newline at end of file +criterion_main!(benches); diff --git a/backend/telemetry_core/src/aggregator/aggregator.rs b/backend/telemetry_core/src/aggregator/aggregator.rs index 2a049c1..bdfbd1f 100644 --- a/backend/telemetry_core/src/aggregator/aggregator.rs +++ b/backend/telemetry_core/src/aggregator/aggregator.rs @@ -74,7 +74,8 @@ impl Aggregator { /// Return a sink that a shard can send messages into to be handled by the aggregator. pub fn subscribe_shard( &self, - ) -> impl Sink + Send + Sync + Unpin + 'static { + ) -> impl Sink + Send + Sync + Unpin + 'static + { // Assign a unique aggregator-local ID to each connection that subscribes, and pass // that along with every message to the aggregator loop: let shard_conn_id = self @@ -96,7 +97,8 @@ impl Aggregator { /// Return a sink that a feed can send messages into to be handled by the aggregator. pub fn subscribe_feed( &self, - ) -> impl Sink + Send + Sync + Unpin + 'static { + ) -> impl Sink + Send + Sync + Unpin + 'static + { // Assign a unique aggregator-local ID to each connection that subscribes, and pass // that along with every message to the aggregator loop: let feed_conn_id = self diff --git a/backend/telemetry_core/src/aggregator/inner_loop.rs b/backend/telemetry_core/src/aggregator/inner_loop.rs index d3dc61a..e9bf79b 100644 --- a/backend/telemetry_core/src/aggregator/inner_loop.rs +++ b/backend/telemetry_core/src/aggregator/inner_loop.rs @@ -179,11 +179,7 @@ impl InnerLoop { } /// Handle messages that come from the node geographical locator. - fn handle_from_find_location( - &mut self, - node_id: NodeId, - location: find_location::Location, - ) { + fn handle_from_find_location(&mut self, node_id: NodeId, location: find_location::Location) { self.node_state .update_node_location(node_id, location.clone()); @@ -227,20 +223,18 @@ impl InnerLoop { match self.node_state.add_node(genesis_hash, node) { state::AddNodeResult::ChainOnDenyList => { if let Some(shard_conn) = self.shard_channels.get_mut(&shard_conn_id) { - let _ = shard_conn - .unbounded_send(ToShardWebsocket::Mute { - local_id, - reason: MuteReason::ChainNotAllowed, - }); + let _ = shard_conn.unbounded_send(ToShardWebsocket::Mute { + local_id, + reason: MuteReason::ChainNotAllowed, + }); } } state::AddNodeResult::ChainOverQuota => { if let Some(shard_conn) = self.shard_channels.get_mut(&shard_conn_id) { - let _ = shard_conn - .unbounded_send(ToShardWebsocket::Mute { - local_id, - reason: MuteReason::Overquota, - }); + let _ = shard_conn.unbounded_send(ToShardWebsocket::Mute { + local_id, + reason: MuteReason::Overquota, + }); } } state::AddNodeResult::NodeAddedToChain(details) => { @@ -473,10 +467,7 @@ impl InnerLoop { } /// Remove all of the node IDs provided and broadcast messages to feeds as needed. - fn remove_nodes_and_broadcast_result( - &mut self, - node_ids: impl IntoIterator, - ) { + fn remove_nodes_and_broadcast_result(&mut self, node_ids: impl IntoIterator) { // Group by chain to simplify the handling of feed messages: let mut node_ids_per_chain: HashMap> = HashMap::new(); for node_id in node_ids.into_iter() { diff --git a/backend/telemetry_core/src/main.rs b/backend/telemetry_core/src/main.rs index 3824e71..589fa10 100644 --- a/backend/telemetry_core/src/main.rs +++ b/backend/telemetry_core/src/main.rs @@ -3,19 +3,19 @@ mod feed_message; mod find_location; mod state; use std::str::FromStr; -use tokio::time::{ Duration, Instant }; +use tokio::time::{Duration, Instant}; use aggregator::{ Aggregator, FromFeedWebsocket, FromShardWebsocket, ToFeedWebsocket, ToShardWebsocket, }; use bincode::Options; +use common::http_utils; use common::internal_messages; use common::ready_chunks_all::ReadyChunksAll; use futures::{channel::mpsc, SinkExt, StreamExt}; +use hyper::{Method, Response}; use simple_logger::SimpleLogger; use structopt::StructOpt; -use hyper::{ Response, Method }; -use common::http_utils; const VERSION: &str = env!("CARGO_PKG_VERSION"); const AUTHORS: &str = env!("CARGO_PKG_AUTHORS"); @@ -42,7 +42,7 @@ struct Opts { /// If it takes longer than this number of seconds to send the current batch of messages /// to a feed, the feed connection will be closed. #[structopt(long, default_value = "10")] - feed_timeout: u64 + feed_timeout: u64, } #[tokio::main] @@ -72,40 +72,55 @@ async fn start_server(opts: Opts) -> anyhow::Result<()> { async move { match (req.method(), req.uri().path().trim_end_matches('/')) { // Check that the server is up and running: - (&Method::GET, "/health") => { - Ok(Response::new("OK".into())) - }, + (&Method::GET, "/health") => Ok(Response::new("OK".into())), // Subscribe to feed messages: (&Method::GET, "/feed") => { - Ok(http_utils::upgrade_to_websocket(req, move |ws_send, ws_recv| async move { - let tx_to_aggregator = aggregator.subscribe_feed(); - let (mut tx_to_aggregator, mut ws_send) - = handle_feed_websocket_connection(ws_send, ws_recv, tx_to_aggregator, feed_timeout).await; - log::info!("Closing /feed connection from {:?}", addr); - // Tell the aggregator that this connection has closed, so it can tidy up. - let _ = tx_to_aggregator.send(FromFeedWebsocket::Disconnected).await; - let _ = ws_send.close().await; - })) - }, + Ok(http_utils::upgrade_to_websocket( + req, + move |ws_send, ws_recv| async move { + let tx_to_aggregator = aggregator.subscribe_feed(); + let (mut tx_to_aggregator, mut ws_send) = + handle_feed_websocket_connection( + ws_send, + ws_recv, + tx_to_aggregator, + feed_timeout, + ) + .await; + log::info!("Closing /feed connection from {:?}", addr); + // Tell the aggregator that this connection has closed, so it can tidy up. + let _ = tx_to_aggregator.send(FromFeedWebsocket::Disconnected).await; + let _ = ws_send.close().await; + }, + )) + } // Subscribe to shard messages: (&Method::GET, "/shard_submit") => { - Ok(http_utils::upgrade_to_websocket(req, move |ws_send, ws_recv| async move { - let tx_to_aggregator = aggregator.subscribe_shard(); - let (mut tx_to_aggregator, mut ws_send) - = handle_shard_websocket_connection(ws_send, ws_recv, tx_to_aggregator).await; - log::info!("Closing /shard_submit connection from {:?}", addr); - // Tell the aggregator that this connection has closed, so it can tidy up. - let _ = tx_to_aggregator.send(FromShardWebsocket::Disconnected).await; - let _ = ws_send.close().await; - })) - }, - // 404 for anything else: - _ => { - Ok(Response::builder() - .status(404) - .body("Not found".into()) - .unwrap()) + Ok(http_utils::upgrade_to_websocket( + req, + move |ws_send, ws_recv| async move { + let tx_to_aggregator = aggregator.subscribe_shard(); + let (mut tx_to_aggregator, mut ws_send) = + handle_shard_websocket_connection( + ws_send, + ws_recv, + tx_to_aggregator, + ) + .await; + log::info!("Closing /shard_submit connection from {:?}", addr); + // Tell the aggregator that this connection has closed, so it can tidy up. + let _ = tx_to_aggregator + .send(FromShardWebsocket::Disconnected) + .await; + let _ = ws_send.close().await; + }, + )) } + // 404 for anything else: + _ => Ok(Response::builder() + .status(404) + .body("Not found".into()) + .unwrap()), } } }); @@ -156,29 +171,44 @@ where break; } if let Err(e) = msg_info { - log::error!("Shutting down websocket connection: Failed to receive data: {}", e); + log::error!( + "Shutting down websocket connection: Failed to receive data: {}", + e + ); break; } - let msg: internal_messages::FromShardAggregator = match bincode::options().deserialize(&bytes) { - Ok(msg) => msg, - Err(e) => { - log::error!("Failed to deserialize message from shard; booting it: {}", e); - break; - } - }; + let msg: internal_messages::FromShardAggregator = + match bincode::options().deserialize(&bytes) { + Ok(msg) => msg, + Err(e) => { + log::error!( + "Failed to deserialize message from shard; booting it: {}", + e + ); + break; + } + }; // Convert and send to the aggregator: let aggregator_msg = match msg { - internal_messages::FromShardAggregator::AddNode { ip, node, local_id, genesis_hash } => { - FromShardWebsocket::Add { ip, node, genesis_hash, local_id } + internal_messages::FromShardAggregator::AddNode { + ip, + node, + local_id, + genesis_hash, + } => FromShardWebsocket::Add { + ip, + node, + genesis_hash, + local_id, }, internal_messages::FromShardAggregator::UpdateNode { payload, local_id } => { FromShardWebsocket::Update { local_id, payload } - }, + } internal_messages::FromShardAggregator::RemoveNode { local_id } => { FromShardWebsocket::Remove { local_id } - }, + } }; if let Err(e) = tx_to_aggregator.send(aggregator_msg).await { @@ -201,7 +231,7 @@ where let msg = match msg { Some(msg) => msg, - None => break + None => break, }; let internal_msg = match msg { @@ -218,9 +248,11 @@ where log::error!("Failed to send message to aggregator; closing shard: {}", e) } if let Err(e) = ws_send.flush().await { - log::error!("Failed to flush message to aggregator; closing shard: {}", e) + log::error!( + "Failed to flush message to aggregator; closing shard: {}", + e + ) } - } drop(recv_closer_tx); // Kill the recv task if this send task ends @@ -241,7 +273,7 @@ async fn handle_feed_websocket_connection( mut ws_send: http_utils::WsSender, mut ws_recv: http_utils::WsReceiver, mut tx_to_aggregator: S, - feed_timeout: u64 + feed_timeout: u64, ) -> (S, http_utils::WsSender) where S: futures::Sink + Unpin + Send + 'static, @@ -281,29 +313,35 @@ where break; } if let Err(e) = msg_info { - log::error!("Shutting down websocket connection: Failed to receive data: {}", e); + log::error!( + "Shutting down websocket connection: Failed to receive data: {}", + e + ); break; } // We ignore all but valid UTF8 text messages from the frontend: let text = match String::from_utf8(bytes) { Ok(s) => s, - Err(_) => continue + Err(_) => continue, }; // Parse the message into a command we understand and send it to the aggregator: let cmd = match FromFeedWebsocket::from_str(&text) { Ok(cmd) => cmd, Err(e) => { - log::warn!("Ignoring invalid command '{}' from the frontend: {}", text, e); - continue + log::warn!( + "Ignoring invalid command '{}' from the frontend: {}", + text, + e + ); + continue; } }; if let Err(e) = tx_to_aggregator.send(cmd).await { log::error!("Failed to send message to aggregator; closing feed: {}", e); break; } - } drop(send_closer_tx); // Kill the send task if this recv task ends @@ -323,15 +361,13 @@ where // End the loop when connection from aggregator ends: let msgs = match msgs { Some(msgs) => msgs, - None => break + None => break, }; // There is only one message type at the mo; bytes to send // to the websocket. collect them all up to dispatch in one shot. - let all_msg_bytes = msgs.into_iter().map(|msg| { - match msg { - ToFeedWebsocket::Bytes(bytes) => bytes - } + let all_msg_bytes = msgs.into_iter().map(|msg| match msg { + ToFeedWebsocket::Bytes(bytes) => bytes, }); // We have a deadline to send and flush messages. If the client isn't keeping up with our @@ -340,7 +376,9 @@ where let message_send_deadline = Instant::now() + Duration::from_secs(feed_timeout); for bytes in all_msg_bytes { - match tokio::time::timeout_at(message_send_deadline, ws_send.send_binary(&bytes)).await { + match tokio::time::timeout_at(message_send_deadline, ws_send.send_binary(&bytes)) + .await + { Err(_) => { log::warn!("Closing feed websocket that was too slow to keep up (1)"); break 'outer; @@ -355,7 +393,7 @@ where match tokio::time::timeout_at(message_send_deadline, ws_send.flush()).await { Err(_) => { log::warn!("Closing feed websocket that was too slow to keep up (2)"); - break + break; } Ok(Err(e)) => { log::warn!("Closing feed websocket due to error flushing data: {}", e); diff --git a/backend/telemetry_core/tests/e2e_tests.rs b/backend/telemetry_core/tests/e2e_tests.rs index dec48c2..2647a00 100644 --- a/backend/telemetry_core/tests/e2e_tests.rs +++ b/backend/telemetry_core/tests/e2e_tests.rs @@ -6,7 +6,7 @@ use std::time::Duration; use test_utils::{ assert_contains_matches, feed_message_de::{FeedMessage, NodeDetails}, - workspace::{ start_server, CoreOpts, ShardOpts, start_server_debug } + workspace::{start_server, start_server_debug, CoreOpts, ShardOpts}, }; /// The simplest test we can run; the main benefit of this test (since we check similar) @@ -55,7 +55,6 @@ async fn feed_ping_responded_to_with_pong() { server.shutdown().await; } - /// As a prelude to `lots_of_mute_messages_dont_cause_a_deadlock`, we can check that /// a lot of nodes can simultaneously subscribe and are all sent the expected response. #[tokio::test] @@ -70,10 +69,8 @@ async fn multiple_feeds_sent_version_on_connect() { .unwrap(); // Wait for responses all at once: - let responses = futures::future::join_all( - feeds.iter_mut() - .map(|(_, rx)| rx.recv_feed_messages()) - ); + let responses = + futures::future::join_all(feeds.iter_mut().map(|(_, rx)| rx.recv_feed_messages())); let responses = tokio::time::timeout(Duration::from_secs(10), responses) .await @@ -113,22 +110,24 @@ async fn lots_of_mute_messages_dont_cause_a_deadlock() { // Every node announces itself on the same chain: for (idx, (node_tx, _)) in nodes.iter_mut().enumerate() { - node_tx.send_json_text(json!({ - "id":1, // message ID, not node ID. Can be the same for all. - "ts":"2021-07-12T10:37:47.714666+01:00", - "payload": { - "authority":true, - "chain":"Local Testnet", - "config":"", - "genesis_hash": BlockHash::from_low_u64_ne(1), - "implementation":"Substrate Node", - "msg":"system.connected", - "name": format!("Alice {}", idx), - "network_id":"12D3KooWEyoppNCUx8Yx66oV9fJnriXwCcXwDDUA2kj6vnc6iDEp", - "startup_time":"1625565542717", - "version":"2.0.0-07a1af348-aarch64-macos" - } - })).unwrap(); + node_tx + .send_json_text(json!({ + "id":1, // message ID, not node ID. Can be the same for all. + "ts":"2021-07-12T10:37:47.714666+01:00", + "payload": { + "authority":true, + "chain":"Local Testnet", + "config":"", + "genesis_hash": BlockHash::from_low_u64_ne(1), + "implementation":"Substrate Node", + "msg":"system.connected", + "name": format!("Alice {}", idx), + "network_id":"12D3KooWEyoppNCUx8Yx66oV9fJnriXwCcXwDDUA2kj6vnc6iDEp", + "startup_time":"1625565542717", + "version":"2.0.0-07a1af348-aarch64-macos" + } + })) + .unwrap(); } // Wait a little time (just to let everything get deadlocked) before @@ -144,11 +143,8 @@ async fn lots_of_mute_messages_dont_cause_a_deadlock() { .expect("feeds can connect"); // Wait to see whether we get anything back: - let msgs_fut = futures::future::join_all( - feeds - .iter_mut() - .map(|(_,rx)| rx.recv_feed_messages()) - ); + let msgs_fut = + futures::future::join_all(feeds.iter_mut().map(|(_, rx)| rx.recv_feed_messages())); // Give up after a timeout: tokio::time::timeout(Duration::from_secs(10), msgs_fut) @@ -234,29 +230,35 @@ async fn feeds_told_about_chain_rename_and_stay_subscribed() { .await .expect("can connect to shard"); - let node_init_msg = |id, chain_name: &str, node_name: &str| json!({ - "id":id, - "ts":"2021-07-12T10:37:47.714666+01:00", - "payload": { - "authority":true, - "chain": chain_name, - "config":"", - "genesis_hash": BlockHash::from_low_u64_ne(1), - "implementation":"Substrate Node", - "msg":"system.connected", - "name": node_name, - "network_id":"12D3KooWEyoppNCUx8Yx66oV9fJnriXwCcXwDDUA2kj6vnc6iDEp", - "startup_time":"1625565542717", - "version":"2.0.0-07a1af348-aarch64-macos" - }, - }); + let node_init_msg = |id, chain_name: &str, node_name: &str| { + json!({ + "id":id, + "ts":"2021-07-12T10:37:47.714666+01:00", + "payload": { + "authority":true, + "chain": chain_name, + "config":"", + "genesis_hash": BlockHash::from_low_u64_ne(1), + "implementation":"Substrate Node", + "msg":"system.connected", + "name": node_name, + "network_id":"12D3KooWEyoppNCUx8Yx66oV9fJnriXwCcXwDDUA2kj6vnc6iDEp", + "startup_time":"1625565542717", + "version":"2.0.0-07a1af348-aarch64-macos" + }, + }) + }; // Subscribe a chain: - node_tx.send_json_text(node_init_msg(1, "Initial chain name", "Node 1")).unwrap(); + node_tx + .send_json_text(node_init_msg(1, "Initial chain name", "Node 1")) + .unwrap(); // Connect a feed and subscribe to the above chain: let (mut feed_tx, mut feed_rx) = server.get_core().connect_feed().await.unwrap(); - feed_tx.send_command("subscribe", "Initial chain name").unwrap(); + feed_tx + .send_command("subscribe", "Initial chain name") + .unwrap(); // Feed is told about the chain, and the node on this chain: let feed_messages = feed_rx.recv_feed_messages().await.unwrap(); @@ -269,7 +271,9 @@ async fn feeds_told_about_chain_rename_and_stay_subscribed() { // Subscribe another node. The chain doesn't rename yet but we are told about the new node // count and the node that's been added. - node_tx.send_json_text(node_init_msg(2, "New chain name", "Node 2")).unwrap(); + node_tx + .send_json_text(node_init_msg(2, "New chain name", "Node 2")) + .unwrap(); let feed_messages = feed_rx.recv_feed_messages().await.unwrap(); assert_contains_matches!( feed_messages, @@ -279,7 +283,9 @@ async fn feeds_told_about_chain_rename_and_stay_subscribed() { // Subscribe a third node. The chain renames, so we're told about the new node but also // about the chain rename. - node_tx.send_json_text(node_init_msg(3, "New chain name", "Node 3")).unwrap(); + node_tx + .send_json_text(node_init_msg(3, "New chain name", "Node 3")) + .unwrap(); let feed_messages = feed_rx.recv_feed_messages().await.unwrap(); assert_contains_matches!( feed_messages, @@ -290,14 +296,15 @@ async fn feeds_told_about_chain_rename_and_stay_subscribed() { // Just to be sure, subscribing a fourth node on this chain will still lead to updates // to this feed. - node_tx.send_json_text(node_init_msg(4, "New chain name", "Node 4")).unwrap(); + node_tx + .send_json_text(node_init_msg(4, "New chain name", "Node 4")) + .unwrap(); let feed_messages = feed_rx.recv_feed_messages().await.unwrap(); assert_contains_matches!( feed_messages, FeedMessage::AddedNode { node: NodeDetails { name: node_name, .. }, ..} if node_name == "Node 4", FeedMessage::AddedChain { name, node_count: 4 } if name == "New chain name", ); - } /// If we add a couple of shards and a node for each, all feeds should be @@ -386,7 +393,12 @@ async fn feed_can_subscribe_and_unsubscribe_from_chain() { // Start server, add shard, connect node: let mut server = start_server_debug().await; let shard_id = server.add_shard().await.unwrap(); - let (mut node_tx, _node_rx) = server.get_shard(shard_id).unwrap().connect_node().await.unwrap(); + let (mut node_tx, _node_rx) = server + .get_shard(shard_id) + .unwrap() + .connect_node() + .await + .unwrap(); // Send a "system connected" message for a few nodes/chains: for id in 1..=3 { @@ -419,7 +431,9 @@ async fn feed_can_subscribe_and_unsubscribe_from_chain() { assert_contains_matches!(feed_messages, AddedChain { name, node_count: 1 } if name == "Local Testnet 1"); // Subscribe it to a chain - feed_tx.send_command("subscribe", "Local Testnet 1").unwrap(); + feed_tx + .send_command("subscribe", "Local Testnet 1") + .unwrap(); let feed_messages = feed_rx.recv_feed_messages().await.unwrap(); assert_contains_matches!( @@ -450,7 +464,9 @@ async fn feed_can_subscribe_and_unsubscribe_from_chain() { .expect_err("Timeout should elapse since no messages sent"); // We can change our subscription: - feed_tx.send_command("subscribe", "Local Testnet 2").unwrap(); + feed_tx + .send_command("subscribe", "Local Testnet 2") + .unwrap(); let feed_messages = feed_rx.recv_feed_messages().await.unwrap(); // We are told about the subscription change and given similar on-subscribe messages to above. @@ -484,36 +500,48 @@ async fn slow_feeds_are_disconnected() { let mut server = start_server( true, // Timeout faster so the test can be quicker: - CoreOpts { feed_timeout: Some(1) }, + CoreOpts { + feed_timeout: Some(1), + }, // Allow us to send more messages in more easily: - ShardOpts { max_nodes_per_connection: Some(100_000) } - ).await; + ShardOpts { + max_nodes_per_connection: Some(100_000), + }, + ) + .await; // Give us a shard to talk to: let shard_id = server.add_shard().await.unwrap(); - let (mut node_tx, _node_rx) = server.get_shard(shard_id).unwrap().connect_node().await.unwrap(); + let (mut node_tx, _node_rx) = server + .get_shard(shard_id) + .unwrap() + .connect_node() + .await + .unwrap(); // Add a load of nodes from this shard so there's plenty of data to give to a feed. // We want to exhaust any buffers between core and feed (eg BufWriters). If the number // is too low, data will happily be sent into a buffer and the connection won't need to // be closed. for n in 1..50_000 { - node_tx.send_json_text(json!({ - "id":n, - "ts":"2021-07-12T10:37:47.714666+01:00", - "payload": { - "authority":true, - "chain":"Polkadot", - "config":"", - "genesis_hash": BlockHash::from_low_u64_ne(1), - "implementation":"Substrate Node", - "msg":"system.connected", - "name": format!("Alice {}", n), - "network_id":"12D3KooWEyoppNCUx8Yx66oV9fJnriXwCcXwDDUA2kj6vnc6iDEp", - "startup_time":"1625565542717", - "version":"2.0.0-07a1af348-aarch64-macos" - } - })).unwrap(); + node_tx + .send_json_text(json!({ + "id":n, + "ts":"2021-07-12T10:37:47.714666+01:00", + "payload": { + "authority":true, + "chain":"Polkadot", + "config":"", + "genesis_hash": BlockHash::from_low_u64_ne(1), + "implementation":"Substrate Node", + "msg":"system.connected", + "name": format!("Alice {}", n), + "network_id":"12D3KooWEyoppNCUx8Yx66oV9fJnriXwCcXwDDUA2kj6vnc6iDEp", + "startup_time":"1625565542717", + "version":"2.0.0-07a1af348-aarch64-macos" + } + })) + .unwrap(); } // Connect a raw feed so that we can control how fast we consume data from the websocket @@ -530,10 +558,8 @@ async fn slow_feeds_are_disconnected() { // waiting to receive mroe data (or see some other error). loop { let mut v = Vec::new(); - let data = tokio::time::timeout( - Duration::from_secs(1), - raw_feed_rx.receive_data(&mut v) - ).await; + let data = + tokio::time::timeout(Duration::from_secs(1), raw_feed_rx.receive_data(&mut v)).await; match data { Ok(Ok(_)) => { @@ -541,13 +567,13 @@ async fn slow_feeds_are_disconnected() { } Ok(Err(soketto::connection::Error::Closed)) => { break; // End loop; success! - }, + } Ok(Err(e)) => { panic!("recv should be closed but instead we saw this error: {}", e); - }, + } Err(_) => { panic!("recv should be closed but seems to be happy waiting for more data"); - }, + } } } @@ -564,49 +590,87 @@ async fn max_nodes_per_connection_is_enforced() { false, CoreOpts::default(), // Limit max nodes per connection to 2; any other msgs should be ignored. - ShardOpts { max_nodes_per_connection: Some(2) } - ).await; + ShardOpts { + max_nodes_per_connection: Some(2), + }, + ) + .await; // Connect to a shard let shard_id = server.add_shard().await.unwrap(); - let (mut node_tx, _node_rx) = server.get_shard(shard_id).unwrap().connect_node().await.unwrap(); + let (mut node_tx, _node_rx) = server + .get_shard(shard_id) + .unwrap() + .connect_node() + .await + .unwrap(); // Connect a feed. let (mut feed_tx, mut feed_rx) = server.get_core().connect_feed().await.unwrap(); // We'll send these messages from the node: - let json_msg = |n| json!({ - "id":n, - "ts":"2021-07-12T10:37:47.714666+01:00", - "payload": { - "authority":true, - "chain":"Test Chain", - "config":"", - "genesis_hash": BlockHash::from_low_u64_ne(1), - "implementation":"Polkadot", - "msg":"system.connected", - "name": format!("Alice {}", n), - "network_id":"12D3KooWEyoppNCUx8Yx66oV9fJnriXwCcXwDDUA2kj6vnc6iDEp", - "startup_time":"1625565542717", - "version":"2.0.0-07a1af348-aarch64-macos" - } - }); + let json_msg = |n| { + json!({ + "id":n, + "ts":"2021-07-12T10:37:47.714666+01:00", + "payload": { + "authority":true, + "chain":"Test Chain", + "config":"", + "genesis_hash": BlockHash::from_low_u64_ne(1), + "implementation":"Polkadot", + "msg":"system.connected", + "name": format!("Alice {}", n), + "network_id":"12D3KooWEyoppNCUx8Yx66oV9fJnriXwCcXwDDUA2kj6vnc6iDEp", + "startup_time":"1625565542717", + "version":"2.0.0-07a1af348-aarch64-macos" + } + }) + }; // First message ID should lead to feed messages: node_tx.send_json_text(json_msg(1)).unwrap(); - assert_ne!(feed_rx.recv_feed_messages_timeout(Duration::from_secs(1)).await.unwrap().len(), 0); + assert_ne!( + feed_rx + .recv_feed_messages_timeout(Duration::from_secs(1)) + .await + .unwrap() + .len(), + 0 + ); // Second message ID should lead to feed messages as well: node_tx.send_json_text(json_msg(2)).unwrap(); - assert_ne!(feed_rx.recv_feed_messages_timeout(Duration::from_secs(1)).await.unwrap().len(), 0); + assert_ne!( + feed_rx + .recv_feed_messages_timeout(Duration::from_secs(1)) + .await + .unwrap() + .len(), + 0 + ); // Third message ID should be ignored: node_tx.send_json_text(json_msg(3)).unwrap(); - assert_eq!(feed_rx.recv_feed_messages_timeout(Duration::from_secs(1)).await.unwrap().len(), 0); + assert_eq!( + feed_rx + .recv_feed_messages_timeout(Duration::from_secs(1)) + .await + .unwrap() + .len(), + 0 + ); // Forth message ID should be ignored as well: node_tx.send_json_text(json_msg(4)).unwrap(); - assert_eq!(feed_rx.recv_feed_messages_timeout(Duration::from_secs(1)).await.unwrap().len(), 0); + assert_eq!( + feed_rx + .recv_feed_messages_timeout(Duration::from_secs(1)) + .await + .unwrap() + .len(), + 0 + ); // (now that the chain "Test Chain" is known about, subscribe to it for update messages. // This wasn't needed to receive messages re the above since everybody hears about node @@ -619,25 +683,53 @@ async fn max_nodes_per_connection_is_enforced() { node_tx.send_json_text(json!( {"id":1, "payload":{ "bandwidth_download":576,"bandwidth_upload":576,"msg":"system.interval","peers":1},"ts":"2021-07-12T10:38:48.330433+01:00" } )).unwrap(); - assert_ne!(feed_rx.recv_feed_messages_timeout(Duration::from_secs(1)).await.unwrap().len(), 0); + assert_ne!( + feed_rx + .recv_feed_messages_timeout(Duration::from_secs(1)) + .await + .unwrap() + .len(), + 0 + ); node_tx.send_json_text(json!( {"id":2, "payload":{ "bandwidth_download":576,"bandwidth_upload":576,"msg":"system.interval","peers":1},"ts":"2021-07-12T10:38:48.330433+01:00" } )).unwrap(); - assert_ne!(feed_rx.recv_feed_messages_timeout(Duration::from_secs(1)).await.unwrap().len(), 0); + assert_ne!( + feed_rx + .recv_feed_messages_timeout(Duration::from_secs(1)) + .await + .unwrap() + .len(), + 0 + ); // Updates about ignored IDs are still ignored: node_tx.send_json_text(json!( {"id":3, "payload":{ "bandwidth_download":576,"bandwidth_upload":576,"msg":"system.interval","peers":1},"ts":"2021-07-12T10:38:48.330433+01:00" } )).unwrap(); - assert_eq!(feed_rx.recv_feed_messages_timeout(Duration::from_secs(1)).await.unwrap().len(), 0); + assert_eq!( + feed_rx + .recv_feed_messages_timeout(Duration::from_secs(1)) + .await + .unwrap() + .len(), + 0 + ); node_tx.send_json_text(json!( {"id":4, "payload":{ "bandwidth_download":576,"bandwidth_upload":576,"msg":"system.interval","peers":1},"ts":"2021-07-12T10:38:48.330433+01:00" } )).unwrap(); - assert_eq!(feed_rx.recv_feed_messages_timeout(Duration::from_secs(1)).await.unwrap().len(), 0); + assert_eq!( + feed_rx + .recv_feed_messages_timeout(Duration::from_secs(1)) + .await + .unwrap() + .len(), + 0 + ); // Tidy up: server.shutdown().await; -} \ No newline at end of file +} diff --git a/backend/telemetry_core/tests/soak_tests.rs b/backend/telemetry_core/tests/soak_tests.rs index e832bec..c9e2e53 100644 --- a/backend/telemetry_core/tests/soak_tests.rs +++ b/backend/telemetry_core/tests/soak_tests.rs @@ -18,15 +18,15 @@ In general, if you run into issues, it may be better to run this on a linux box; MacOS seems to hit limits quicker in general. */ -use futures::{ StreamExt }; +use common::node_types::BlockHash; +use common::ws_client::SentMessage; +use futures::StreamExt; +use serde_json::json; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; +use std::time::Duration; use structopt::StructOpt; use test_utils::workspace::start_server_release; -use common::ws_client::{ SentMessage }; -use serde_json::json; -use std::time::Duration; -use std::sync::atomic::{ Ordering, AtomicUsize }; -use std::sync::Arc; -use common::node_types::BlockHash; /// A configurable soak_test runner. Configure by providing the expected args as /// an environment variable. One example to run this test is: @@ -78,22 +78,24 @@ async fn run_soak_test(opts: SoakTestOpts) { // Each node tells the shard about itself: for (idx, (node_tx, _)) in nodes.iter_mut().enumerate() { - node_tx.send_json_binary(json!({ - "id":1, // Only needs to be unique per node - "ts":"2021-07-12T10:37:47.714666+01:00", - "payload": { - "authority":true, - "chain": "Polkadot", // <- so that we don't go over quota with lots of nodes. - "config":"", - "genesis_hash": BlockHash::from_low_u64_ne(1), - "implementation":"Substrate Node", - "msg":"system.connected", - "name": format!("Node #{}", idx), - "network_id":"12D3KooWEyoppNCUx8Yx66oV9fJnriXwCcXwDDUA2kj6vnc6iDEp", - "startup_time":"1625565542717", - "version":"2.0.0-07a1af348-aarch64-macos" - }, - })).unwrap(); + node_tx + .send_json_binary(json!({ + "id":1, // Only needs to be unique per node + "ts":"2021-07-12T10:37:47.714666+01:00", + "payload": { + "authority":true, + "chain": "Polkadot", // <- so that we don't go over quota with lots of nodes. + "config":"", + "genesis_hash": BlockHash::from_low_u64_ne(1), + "implementation":"Substrate Node", + "msg":"system.connected", + "name": format!("Node #{}", idx), + "network_id":"12D3KooWEyoppNCUx8Yx66oV9fJnriXwCcXwDDUA2kj6vnc6iDEp", + "startup_time":"1625565542717", + "version":"2.0.0-07a1af348-aarch64-macos" + }, + })) + .unwrap(); } // Connect feeds to the core: @@ -127,12 +129,15 @@ async fn run_soak_test(opts: SoakTestOpts) { loop { // every ~1second we aim to have sent messages from all of the nodes. So we cycle through // the node IDs and send a message from each at roughly 1s / number_of_nodes. - let mut interval = tokio::time::interval(Duration::from_secs_f64(1.0 / nodes.len() as f64)); + let mut interval = + tokio::time::interval(Duration::from_secs_f64(1.0 / nodes.len() as f64)); for node_id in (0..nodes.len()).cycle() { interval.tick().await; let node_tx = &mut nodes[node_id].0; - node_tx.unbounded_send(SentMessage::StaticBinary(msg_bytes)).unwrap(); + node_tx + .unbounded_send(SentMessage::StaticBinary(msg_bytes)) + .unwrap(); bytes_in2.fetch_add(msg_bytes.len(), Ordering::Relaxed); } } @@ -162,7 +167,8 @@ async fn run_soak_test(opts: SoakTestOpts) { let bytes_in_val = bytes_in.load(Ordering::Relaxed); let bytes_out_val = bytes_out.load(Ordering::Relaxed); - println!("#{}: MB in/out per measurement: {:.4} / {:.4}, total bytes in/out: {} / {})", + println!( + "#{}: MB in/out per measurement: {:.4} / {:.4}, total bytes in/out: {} / {})", n, (bytes_in_val - last_bytes_in) as f64 / one_mb, (bytes_out_val - last_bytes_out) as f64 / one_mb, @@ -193,19 +199,18 @@ struct SoakTestOpts { feeds: usize, /// The number of nodes to connect to each feed #[structopt(long)] - nodes: usize + nodes: usize, } /// Get soak test args from an envvar and parse them via structopt. fn get_soak_test_opts() -> SoakTestOpts { let arg_string = std::env::var("SOAK_TEST_ARGS") .expect("Expecting args to be provided in the env var SOAK_TEST_ARGS"); - let args = shellwords::split(&arg_string) - .expect("Could not parse SOAK_TEST_ARGS as shell arguments"); + let args = + shellwords::split(&arg_string).expect("Could not parse SOAK_TEST_ARGS as shell arguments"); // The binary name is expected to be the first arg, so fake it: - let all_args = std::iter::once("soak_test".to_owned()) - .chain(args.into_iter()); + let all_args = std::iter::once("soak_test".to_owned()).chain(args.into_iter()); SoakTestOpts::from_iter(all_args) -} \ No newline at end of file +} diff --git a/backend/telemetry_shard/src/aggregator.rs b/backend/telemetry_shard/src/aggregator.rs index 2dc7e56..b112581 100644 --- a/backend/telemetry_shard/src/aggregator.rs +++ b/backend/telemetry_shard/src/aggregator.rs @@ -5,7 +5,7 @@ use common::{ node_types::BlockHash, AssignId, }; -use futures::{channel::mpsc}; +use futures::channel::mpsc; use futures::{Sink, SinkExt, StreamExt}; use std::collections::{HashMap, HashSet}; use std::sync::atomic::AtomicU64; @@ -101,7 +101,7 @@ impl Aggregator { }; if let Err(_) = tx_to_aggregator2.send(msg_to_aggregator).await { // This will close the ws channels, which themselves log messages. - break + break; } } }); diff --git a/backend/telemetry_shard/src/connection.rs b/backend/telemetry_shard/src/connection.rs index d058ccd..f3f99de 100644 --- a/backend/telemetry_shard/src/connection.rs +++ b/backend/telemetry_shard/src/connection.rs @@ -1,7 +1,7 @@ +use bincode::Options; +use common::ws_client; use futures::channel::mpsc; use futures::{SinkExt, StreamExt}; -use common::ws_client; -use bincode::Options; #[derive(Clone, Debug)] pub enum Message { @@ -49,7 +49,7 @@ where if let Err(e) = tx_out.send(Message::Connected).await { // If receiving end is closed, bail now. log::warn!("Aggregator is no longer receiving messages from core; disconnecting (permanently): {}", e); - return + return; } // Loop, forwarding messages to and from the core until something goes wrong. @@ -103,7 +103,7 @@ where } }; } - }, + } Err(connect_err) => { // Issue connecting? Wait and try again on the next loop iteration. log::error!( @@ -127,4 +127,4 @@ where }); (tx_in, rx_out) -} \ No newline at end of file +} diff --git a/backend/telemetry_shard/src/main.rs b/backend/telemetry_shard/src/main.rs index 1a57d79..af81c72 100644 --- a/backend/telemetry_shard/src/main.rs +++ b/backend/telemetry_shard/src/main.rs @@ -1,5 +1,4 @@ #[warn(missing_docs)] - mod aggregator; mod connection; mod json_message; @@ -8,13 +7,13 @@ mod real_ip; use std::{collections::HashSet, net::IpAddr}; use aggregator::{Aggregator, FromWebsocket}; +use common::http_utils; use common::node_message; use futures::{channel::mpsc, SinkExt, StreamExt}; use http::Uri; +use hyper::{Method, Response}; use simple_logger::SimpleLogger; use structopt::StructOpt; -use hyper::{ Response, Method }; -use common::http_utils; const VERSION: &str = env!("CARGO_PKG_VERSION"); const AUTHORS: &str = env!("CARGO_PKG_AUTHORS"); @@ -47,7 +46,7 @@ struct Opts { /// This is important because without a limit, a single connection could exhaust /// RAM by suggesting that it accounts for billions of nodes. #[structopt(long, default_value = "20")] - max_nodes_per_connection: usize + max_nodes_per_connection: usize, } #[tokio::main] @@ -77,29 +76,35 @@ async fn start_server(opts: Opts) -> anyhow::Result<()> { async move { match (req.method(), req.uri().path().trim_end_matches('/')) { // Check that the server is up and running: - (&Method::GET, "/health") => { - Ok(Response::new("OK".into())) - }, + (&Method::GET, "/health") => Ok(Response::new("OK".into())), // Nodes send messages here: (&Method::GET, "/submit") => { let real_addr = real_ip::real_ip(addr, req.headers()); - Ok(http_utils::upgrade_to_websocket(req, move |ws_send, ws_recv| async move { - let tx_to_aggregator = aggregator.subscribe_node(); - let (mut tx_to_aggregator, mut ws_send) - = handle_node_websocket_connection(real_addr, ws_send, ws_recv, tx_to_aggregator, max_nodes_per_connection).await; - log::info!("Closing /submit connection from {:?}", addr); - // Tell the aggregator that this connection has closed, so it can tidy up. - let _ = tx_to_aggregator.send(FromWebsocket::Disconnected).await; - let _ = ws_send.close().await; - })) - }, - // 404 for anything else: - _ => { - Ok(Response::builder() - .status(404) - .body("Not found".into()) - .unwrap()) + Ok(http_utils::upgrade_to_websocket( + req, + move |ws_send, ws_recv| async move { + let tx_to_aggregator = aggregator.subscribe_node(); + let (mut tx_to_aggregator, mut ws_send) = + handle_node_websocket_connection( + real_addr, + ws_send, + ws_recv, + tx_to_aggregator, + max_nodes_per_connection, + ) + .await; + log::info!("Closing /submit connection from {:?}", addr); + // Tell the aggregator that this connection has closed, so it can tidy up. + let _ = tx_to_aggregator.send(FromWebsocket::Disconnected).await; + let _ = ws_send.close().await; + }, + )) } + // 404 for anything else: + _ => Ok(Response::builder() + .status(404) + .body("Not found".into()) + .unwrap()), } } }); @@ -114,7 +119,7 @@ async fn handle_node_websocket_connection( ws_send: http_utils::WsSender, mut ws_recv: http_utils::WsReceiver, mut tx_to_aggregator: S, - max_nodes_per_connection: usize + max_nodes_per_connection: usize, ) -> (S, http_utils::WsSender) where S: futures::Sink + Unpin + Send + 'static, diff --git a/backend/test_utils/src/lib.rs b/backend/test_utils/src/lib.rs index 445406e..102b1ec 100644 --- a/backend/test_utils/src/lib.rs +++ b/backend/test_utils/src/lib.rs @@ -11,4 +11,4 @@ pub mod feed_message_de; pub mod contains_matches; /// Utilities to help with running tests from within this current workspace. -pub mod workspace; \ No newline at end of file +pub mod workspace; diff --git a/backend/test_utils/src/server/channels.rs b/backend/test_utils/src/server/channels.rs index 7643a14..14ee595 100644 --- a/backend/test_utils/src/server/channels.rs +++ b/backend/test_utils/src/server/channels.rs @@ -1,4 +1,7 @@ -use std::{ops::{Deref, DerefMut}, time::Duration}; +use std::{ + ops::{Deref, DerefMut}, + time::Duration, +}; use crate::feed_message_de::FeedMessage; use common::ws_client; @@ -51,10 +54,7 @@ impl ShardSender { self.unbounded_send(ws_client::SentMessage::Binary(bytes)) } /// Send JSON as a textual websocket message - pub fn send_json_text( - &mut self, - json: serde_json::Value, - ) -> Result<(), ws_client::SendError> { + pub fn send_json_text(&mut self, json: serde_json::Value) -> Result<(), ws_client::SendError> { let s = serde_json::to_string(&json).expect("valid string"); self.unbounded_send(ws_client::SentMessage::Text(s)) } @@ -169,7 +169,6 @@ impl FeedSender { } } - /// Wrap a `ws_client::Receiver` with convenient utility methods for feed connections pub struct FeedReceiver(ws_client::Receiver); @@ -208,23 +207,26 @@ impl FeedReceiver { /// Prefer [`FeedReceiver::recv_feed_messages`]; tests should generally be /// robust in assuming that messages may not all be delivered at once (unless we are /// specifically testing which messages are buffered together). - pub async fn recv_feed_messages_once_timeout(&mut self, timeout: Duration) -> Result, anyhow::Error> { + pub async fn recv_feed_messages_once_timeout( + &mut self, + timeout: Duration, + ) -> Result, anyhow::Error> { let msg = match tokio::time::timeout(timeout, self.0.next()).await { // Timeout elapsed; no messages back: Err(_) => return Ok(Vec::new()), // Something back; Complain if error no stream closed: - Ok(res) => res.ok_or_else(|| anyhow::anyhow!("Stream closed: no more messages"))?? + Ok(res) => res.ok_or_else(|| anyhow::anyhow!("Stream closed: no more messages"))??, }; match msg { ws_client::RecvMessage::Binary(data) => { let messages = FeedMessage::from_bytes(&data)?; Ok(messages) - }, + } ws_client::RecvMessage::Text(text) => { let messages = FeedMessage::from_bytes(text.as_bytes())?; Ok(messages) - }, + } } } @@ -232,7 +234,8 @@ impl FeedReceiver { /// See `recv_feed_messages_once_timeout`. pub async fn recv_feed_messages_once(&mut self) -> Result, anyhow::Error> { // Default to a timeout of 30 seconds, meaning that the test will eventually end, - self.recv_feed_messages_once_timeout(Duration::from_secs(30)).await + self.recv_feed_messages_once_timeout(Duration::from_secs(30)) + .await } /// Wait for feed messages to be sent back, building up a list of output messages until @@ -241,12 +244,16 @@ impl FeedReceiver { /// If no new messages are received within the timeout given, bail with whatever we have so far. /// This differs from `recv_feed_messages` and `recv_feed_messages_once`, which will block indefinitely /// waiting for something to arrive - pub async fn recv_feed_messages_timeout(&mut self, timeout: Duration) -> Result, anyhow::Error> { + pub async fn recv_feed_messages_timeout( + &mut self, + timeout: Duration, + ) -> Result, anyhow::Error> { // Block as long as needed for messages to start coming in: - let mut feed_messages = match tokio::time::timeout(timeout, self.recv_feed_messages_once()).await { - Ok(msgs) => msgs?, - Err(_) => return Ok(Vec::new()), - }; + let mut feed_messages = + match tokio::time::timeout(timeout, self.recv_feed_messages_once()).await { + Ok(msgs) => msgs?, + Err(_) => return Ok(Vec::new()), + }; // Then, loop a little to make sure we catch any additional messages that are sent soon after: loop { @@ -271,6 +278,7 @@ impl FeedReceiver { /// See `recv_feed_messages_timeout`. pub async fn recv_feed_messages(&mut self) -> Result, anyhow::Error> { // Default to a timeout of 30 seconds, meaning that the test will eventually end, - self.recv_feed_messages_timeout(Duration::from_secs(30)).await + self.recv_feed_messages_timeout(Duration::from_secs(30)) + .await } } diff --git a/backend/test_utils/src/server/server.rs b/backend/test_utils/src/server/server.rs index e4436a6..41b52df 100644 --- a/backend/test_utils/src/server/server.rs +++ b/backend/test_utils/src/server/server.rs @@ -36,7 +36,7 @@ pub enum StartOpts { /// Where is the process that we can subscribe to the `/feed` of? /// Eg: `127.0.0.1:3000` feed_host: String, - } + }, } /// This represents a telemetry server. It can be in different modes @@ -47,7 +47,7 @@ pub enum Server { /// A virtual shard that we can hand out. virtual_shard: ShardProcess, /// Core process that we can connect to. - core: CoreProcess + core: CoreProcess, }, ShardAndCoreMode { /// Command to run to start a new shard. @@ -67,10 +67,9 @@ pub enum Server { shards: DenseMap, /// Core process that we can connect to. core: CoreProcess, - } + }, } - #[derive(thiserror::Error, Debug)] pub enum Error { #[error("Can't establsih connection: {0}")] @@ -88,23 +87,23 @@ pub enum Error { )] CannotAddShard, #[error("The URI provided was invalid: {0}")] - InvalidUri(#[from] http::uri::InvalidUri) + InvalidUri(#[from] http::uri::InvalidUri), } impl Server { pub fn get_core(&self) -> &CoreProcess { match self { Server::SingleProcessMode { core, .. } => core, - Server::ShardAndCoreMode { core, ..} => core, - Server::ConnectToExistingMode { core, .. } => core + Server::ShardAndCoreMode { core, .. } => core, + Server::ConnectToExistingMode { core, .. } => core, } } pub fn get_shard(&self, id: ProcessId) -> Option<&ShardProcess> { match self { Server::SingleProcessMode { virtual_shard, .. } => Some(virtual_shard), - Server::ShardAndCoreMode { shards, ..} => shards.get(id), - Server::ConnectToExistingMode { shards, .. } => shards.get(id) + Server::ShardAndCoreMode { shards, .. } => shards.get(id), + Server::ConnectToExistingMode { shards, .. } => shards.get(id), } } @@ -112,8 +111,8 @@ impl Server { let shard = match self { // Can't remove the pretend shard: Server::SingleProcessMode { .. } => return false, - Server::ShardAndCoreMode { shards, ..} => shards.remove(id), - Server::ConnectToExistingMode { shards, .. } => shards.remove(id) + Server::ShardAndCoreMode { shards, .. } => shards.remove(id), + Server::ConnectToExistingMode { shards, .. } => shards.remove(id), }; let shard = match shard { @@ -137,12 +136,9 @@ impl Server { // Run all kill futs simultaneously. let handle = tokio::spawn(async move { let (core, shards) = match self { - Server::SingleProcessMode { core, .. } - => (core, DenseMap::new()), - Server::ShardAndCoreMode { core, shards, ..} - => (core, shards), - Server::ConnectToExistingMode { core, shards, .. } - => (core, shards) + Server::SingleProcessMode { core, .. } => (core, DenseMap::new()), + Server::ShardAndCoreMode { core, shards, .. } => (core, shards), + Server::ConnectToExistingMode { core, shards, .. } => (core, shards), }; let shard_kill_futs = shards.into_iter().map(|(_, s)| s.kill()); @@ -157,15 +153,18 @@ impl Server { pub async fn add_shard(&mut self) -> Result { match self { // Always get back the same "virtual" shard; we're always just talking to the core anyway. - Server::SingleProcessMode { virtual_shard, .. } => { - Ok(virtual_shard.id) - }, + Server::SingleProcessMode { virtual_shard, .. } => Ok(virtual_shard.id), // We're connecting to an existing process. Find the next host we've been told about // round-robin style and use that as our new virtual shard. - Server::ConnectToExistingMode { submit_hosts, next_submit_host_idx, shards, .. } => { + Server::ConnectToExistingMode { + submit_hosts, + next_submit_host_idx, + shards, + .. + } => { let host = match submit_hosts.get(*next_submit_host_idx % submit_hosts.len()) { Some(host) => host, - None => return Err(Error::CannotAddShard) + None => return Err(Error::CannotAddShard), }; *next_submit_host_idx += 1; @@ -177,9 +176,13 @@ impl Server { }); Ok(pid) - }, + } // Start a new process and return that. - Server::ShardAndCoreMode { shard_command, shards, core } => { + Server::ShardAndCoreMode { + shard_command, + shards, + core, + } => { // Where is the URI we'll want to submit things to? let core_shard_submit_uri = format!("http://{}/shard_submit", core.host); @@ -224,7 +227,7 @@ impl Server { }); Ok(pid) - }, + } } } @@ -240,31 +243,35 @@ impl Server { id: ProcessId(0), host: virtual_shard_host, handle: None, - _channel_type: PhantomData - } - } - }, - StartOpts::ShardAndCore { core_command, shard_command } => { - let core_process = Server::start_core(core_command).await?; - Server::ShardAndCoreMode { - core: core_process, - shard_command, - shards: DenseMap::new() - } - }, - StartOpts::ConnectToExisting { feed_host, submit_hosts } => { - Server::ConnectToExistingMode { - submit_hosts, - next_submit_host_idx: 0, - shards: DenseMap::new(), - core: Process { - id: ProcessId(0), - host: feed_host, - handle: None, _channel_type: PhantomData, }, } } + StartOpts::ShardAndCore { + core_command, + shard_command, + } => { + let core_process = Server::start_core(core_command).await?; + Server::ShardAndCoreMode { + core: core_process, + shard_command, + shards: DenseMap::new(), + } + } + StartOpts::ConnectToExisting { + feed_host, + submit_hosts, + } => Server::ConnectToExistingMode { + submit_hosts, + next_submit_host_idx: 0, + shards: DenseMap::new(), + core: Process { + id: ProcessId(0), + host: feed_host, + handle: None, + _channel_type: PhantomData, + }, + }, }; Ok(server) @@ -340,7 +347,9 @@ impl Process { } /// Establish a raw WebSocket connection (not cancel-safe) -async fn connect_to_uri_raw(uri: &http::Uri) -> Result<(ws_client::RawSender, ws_client::RawReceiver), Error> { +async fn connect_to_uri_raw( + uri: &http::Uri, +) -> Result<(ws_client::RawSender, ws_client::RawReceiver), Error> { ws_client::connect(uri) .await .map(|c| c.into_raw()) @@ -371,19 +380,26 @@ impl, Recv: From> Process<(Se impl ShardProcess { /// Establish a raw connection to the process - pub async fn connect_node_raw(&self) -> Result<(ws_client::RawSender, ws_client::RawReceiver), Error> { + pub async fn connect_node_raw( + &self, + ) -> Result<(ws_client::RawSender, ws_client::RawReceiver), Error> { let uri = format!("http://{}/submit", self.host).parse()?; connect_to_uri_raw(&uri).await } /// Establish a connection to the process - pub async fn connect_node(&self) -> Result<(channels::ShardSender, channels::ShardReceiver), Error> { + pub async fn connect_node( + &self, + ) -> Result<(channels::ShardSender, channels::ShardReceiver), Error> { let uri = format!("http://{}/submit", self.host).parse()?; Process::connect_to_uri(&uri).await } /// Establish multiple connections to the process - pub async fn connect_multiple_nodes(&self, num_connections: usize) -> Result, Error> { + pub async fn connect_multiple_nodes( + &self, + num_connections: usize, + ) -> Result, Error> { let uri = format!("http://{}/submit", self.host).parse()?; Process::connect_multiple_to_uri(&uri, num_connections).await } @@ -391,19 +407,26 @@ impl ShardProcess { impl CoreProcess { /// Establish a raw connection to the process - pub async fn connect_feed_raw(&self) -> Result<(ws_client::RawSender, ws_client::RawReceiver), Error> { + pub async fn connect_feed_raw( + &self, + ) -> Result<(ws_client::RawSender, ws_client::RawReceiver), Error> { let uri = format!("http://{}/feed", self.host).parse()?; connect_to_uri_raw(&uri).await } /// Establish a connection to the process - pub async fn connect_feed(&self) -> Result<(channels::FeedSender, channels::FeedReceiver), Error> { + pub async fn connect_feed( + &self, + ) -> Result<(channels::FeedSender, channels::FeedReceiver), Error> { let uri = format!("http://{}/feed", self.host).parse()?; Process::connect_to_uri(&uri).await } /// Establish multiple connections to the process - pub async fn connect_multiple_feeds(&self, num_connections: usize) -> Result, Error> { + pub async fn connect_multiple_feeds( + &self, + num_connections: usize, + ) -> Result, Error> { let uri = format!("http://{}/feed", self.host).parse()?; Process::connect_multiple_to_uri(&uri, num_connections).await } diff --git a/backend/test_utils/src/server/utils.rs b/backend/test_utils/src/server/utils.rs index e2ff5dd..a16e0cd 100644 --- a/backend/test_utils/src/server/utils.rs +++ b/backend/test_utils/src/server/utils.rs @@ -1,5 +1,5 @@ -use common::ws_client; use anyhow::{anyhow, Context}; +use common::ws_client; use tokio::io::BufReader; use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncWrite}; use tokio::time::Duration; @@ -45,13 +45,9 @@ pub async fn wait_for_line_containing bool> let line = match line { // timeout expired; couldn't get port: - Err(_) => { - return Err(anyhow!("Timeout elapsed waiting for text match")) - } + Err(_) => return Err(anyhow!("Timeout elapsed waiting for text match")), // Something went wrong reading line; bail: - Ok(Err(e)) => { - return Err(anyhow!("Could not read line from stdout: {}", e)) - }, + Ok(Err(e)) => return Err(anyhow!("Could not read line from stdout: {}", e)), // No more output; process ended? bail: Ok(Ok(None)) => { return Err(anyhow!( diff --git a/backend/test_utils/src/workspace/commands.rs b/backend/test_utils/src/workspace/commands.rs index df7f97a..43a87d3 100644 --- a/backend/test_utils/src/workspace/commands.rs +++ b/backend/test_utils/src/workspace/commands.rs @@ -28,7 +28,8 @@ fn telemetry_command(bin: &'static str, release_mode: bool) -> Result + pub feed_timeout: Option, } impl Default for CoreOpts { fn default() -> Self { - Self { - feed_timeout: None - } + Self { feed_timeout: None } } } /// Additional options to pass to the shard command. pub struct ShardOpts { - pub max_nodes_per_connection: Option + pub max_nodes_per_connection: Option, } impl Default for ShardOpts { fn default() -> Self { Self { - max_nodes_per_connection: None + max_nodes_per_connection: None, } } } @@ -44,12 +42,18 @@ impl Default for ShardOpts { /// - `TELEMETRY_SUBMIT_HOSTS` - hosts (comma separated) to connect to for telemetry `/submit`s. /// - `TELEMETRY_FEED_HOST` - host to connect to for feeds (eg 127.0.0.1:3000) /// -pub async fn start_server(release_mode: bool, core_opts: CoreOpts, shard_opts: ShardOpts) -> Server { +pub async fn start_server( + release_mode: bool, + core_opts: CoreOpts, + shard_opts: ShardOpts, +) -> Server { // Start to a single process: if let Ok(bin) = std::env::var("TELEMETRY_BIN") { return Server::start(server::StartOpts::SingleProcess { - command: Command::new(bin) - }).await.unwrap(); + command: Command::new(bin), + }) + .await + .unwrap(); } // Connect to a running instance: @@ -61,13 +65,18 @@ pub async fn start_server(release_mode: bool, core_opts: CoreOpts, shard_opts: S return Server::start(server::StartOpts::ConnectToExisting { feed_host, submit_hosts, - }).await.unwrap(); + }) + .await + .unwrap(); } // Build the shard command let mut shard_command = std::env::var("TELEMETRY_SHARD_BIN") .map(|val| Command::new(val)) - .unwrap_or_else(|_| commands::cargo_run_telemetry_shard(release_mode).expect("must be in rust workspace to run shard command")); + .unwrap_or_else(|_| { + commands::cargo_run_telemetry_shard(release_mode) + .expect("must be in rust workspace to run shard command") + }); // Append additional opts to the shard command if let Some(max_nodes_per_connection) = shard_opts.max_nodes_per_connection { @@ -79,7 +88,10 @@ pub async fn start_server(release_mode: bool, core_opts: CoreOpts, shard_opts: S // Build the core command let mut core_command = std::env::var("TELEMETRY_CORE_BIN") .map(|val| Command::new(val)) - .unwrap_or_else(|_| commands::cargo_run_telemetry_core(release_mode).expect("must be in rust workspace to run core command")); + .unwrap_or_else(|_| { + commands::cargo_run_telemetry_core(release_mode) + .expect("must be in rust workspace to run core command") + }); // Append additional opts to the core command if let Some(feed_timeout) = core_opts.feed_timeout { @@ -91,8 +103,10 @@ pub async fn start_server(release_mode: bool, core_opts: CoreOpts, shard_opts: S // Star the server Server::start(server::StartOpts::ShardAndCore { shard_command, - core_command - }).await.unwrap() + core_command, + }) + .await + .unwrap() } /// Start a telemetry core server in debug mode. see [`start_server`] for details. @@ -103,4 +117,4 @@ pub async fn start_server_debug() -> Server { /// Start a telemetry core server in release mode. see [`start_server`] for details. pub async fn start_server_release() -> Server { start_server(true, CoreOpts::default(), ShardOpts::default()).await -} \ No newline at end of file +}