diff --git a/backend/common/src/ws_client/connect.rs b/backend/common/src/ws_client/connect.rs index 3e59f81..f101c47 100644 --- a/backend/common/src/ws_client/connect.rs +++ b/backend/common/src/ws_client/connect.rs @@ -13,13 +13,13 @@ // // You should have received a copy of the GNU General Public License // along with this program. If not, see . +use super::on_close::OnClose; use futures::channel::mpsc; use futures::{SinkExt, StreamExt}; use soketto::handshake::{Client, ServerResponse}; +use std::sync::Arc; use tokio::net::TcpStream; use tokio_util::compat::TokioAsyncReadCompatExt; -use std::sync::Arc; -use super::on_close::OnClose; use super::{ receiver::{Receiver, RecvMessage}, @@ -89,7 +89,10 @@ impl Connection { Err(e) => { // The socket had an error, so notify interested parties that we should // shut the connection down and bail out of this receive loop. - log::error!("Shutting down websocket connection: Failed to receive data: {}", e); + log::error!( + "Shutting down websocket connection: Failed to receive data: {}", + e + ); let _ = tx_closed1.send(()); break; } @@ -198,14 +201,16 @@ impl Connection { // been dropped. If both have, we close the socket connection. let on_close = Arc::new(OnClose(tx_closed2)); - (Sender { - inner: tx_to_ws, - closer: Arc::clone(&on_close), - }, - Receiver { - inner: rx_from_ws, - closer: on_close, - }) + ( + Sender { + inner: tx_to_ws, + closer: Arc::clone(&on_close), + }, + Receiver { + inner: rx_from_ws, + closer: on_close, + }, + ) } } diff --git a/backend/common/src/ws_client/mod.rs b/backend/common/src/ws_client/mod.rs index ea12913..f2c66d5 100644 --- a/backend/common/src/ws_client/mod.rs +++ b/backend/common/src/ws_client/mod.rs @@ -16,12 +16,12 @@ /// Functionality to establish a connection mod connect; +/// A close helper that we use in sender/receiver. +mod on_close; /// The channel based receive interface mod receiver; /// The channel based send interface mod sender; -/// A close helper that we use in sender/receiver. -mod on_close; pub use connect::{connect, ConnectError, Connection, RawReceiver, RawSender}; pub use receiver::{Receiver, RecvError, RecvMessage}; diff --git a/backend/common/src/ws_client/on_close.rs b/backend/common/src/ws_client/on_close.rs index 0c03ce0..4f6e4e8 100644 --- a/backend/common/src/ws_client/on_close.rs +++ b/backend/common/src/ws_client/on_close.rs @@ -23,4 +23,4 @@ impl Drop for OnClose { fn drop(&mut self) { let _ = self.0.send(()); } -} \ No newline at end of file +} diff --git a/backend/common/src/ws_client/receiver.rs b/backend/common/src/ws_client/receiver.rs index cdd4912..6f45da7 100644 --- a/backend/common/src/ws_client/receiver.rs +++ b/backend/common/src/ws_client/receiver.rs @@ -14,10 +14,10 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . +use super::on_close::OnClose; use futures::channel::mpsc; use futures::{Stream, StreamExt}; use std::sync::Arc; -use super::on_close::OnClose; /// Receive messages out of a connection pub struct Receiver { @@ -32,7 +32,7 @@ pub enum RecvError { #[error("Stream finished")] StreamFinished, #[error("Failed to send close message")] - CloseError + CloseError, } impl Receiver { diff --git a/backend/common/src/ws_client/sender.rs b/backend/common/src/ws_client/sender.rs index 71872a4..45c3d66 100644 --- a/backend/common/src/ws_client/sender.rs +++ b/backend/common/src/ws_client/sender.rs @@ -14,10 +14,10 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . +use super::on_close::OnClose; use futures::channel::mpsc; use futures::{Sink, SinkExt}; use std::sync::Arc; -use super::on_close::OnClose; /// A message that can be sent into the channel interface #[derive(Debug, Clone)] @@ -70,7 +70,7 @@ pub enum SendError { #[error("Failed to send message: {0}")] ChannelError(#[from] mpsc::SendError), #[error("Failed to send close message")] - CloseError + CloseError, } impl Sink for Sender { @@ -85,9 +85,7 @@ impl Sink for Sender { mut self: std::pin::Pin<&mut Self>, item: SentMessage, ) -> Result<(), Self::Error> { - self.inner - .start_send_unpin(item) - .map_err(|e| e.into()) + self.inner.start_send_unpin(item).map_err(|e| e.into()) } fn poll_flush( mut self: std::pin::Pin<&mut Self>, diff --git a/backend/telemetry_core/benches/subscribe.rs b/backend/telemetry_core/benches/subscribe.rs index e3d0909..743606a 100644 --- a/backend/telemetry_core/benches/subscribe.rs +++ b/backend/telemetry_core/benches/subscribe.rs @@ -1,10 +1,10 @@ use common::node_types::BlockHash; use criterion::{criterion_group, criterion_main, Criterion}; use serde_json::json; -use test_utils::workspace::{ start_server, ServerOpts, CoreOpts, ShardOpts }; -use test_utils::feed_message_de::FeedMessage; -use tokio::runtime::Runtime; use std::time::{Duration, Instant}; +use test_utils::feed_message_de::FeedMessage; +use test_utils::workspace::{start_server, CoreOpts, ServerOpts, ShardOpts}; +use tokio::runtime::Runtime; /// This benchmark roughly times the subscribe function. Note that there's a lot of /// overhead in other areas, so even with the entire subscribe function commented out @@ -19,10 +19,8 @@ pub fn benchmark_subscribe_speed(c: &mut Criterion) { let rt = Runtime::new().expect("tokio runtime should start"); - c.bench_function( - "subscribe speed: time till pong", - move |b| b.to_async(&rt).iter_custom(|iters| async move { - + c.bench_function("subscribe speed: time till pong", move |b| { + b.to_async(&rt).iter_custom(|iters| async move { // Start a server: let mut server = start_server( ServerOpts { @@ -38,8 +36,9 @@ pub fn benchmark_subscribe_speed(c: &mut Criterion) { max_node_data_per_second: Some(usize::MAX), worker_threads: Some(2), ..Default::default() - } - ).await; + }, + ) + .await; let shard_id = server.add_shard().await.unwrap(); // Connect a shard: @@ -52,22 +51,24 @@ pub fn benchmark_subscribe_speed(c: &mut Criterion) { // Add a bunch of actual nodes on the same chain: for n in 0..NUMBER_OF_NODES { - node_tx.send_json_text(json!({ - "id":n, - "ts":"2021-07-12T10:37:47.714666+01:00", - "payload": { - "authority":true, - "chain":"Polkadot", // No limit to #nodes on this network. - "config":"", - "genesis_hash": BlockHash::from_low_u64_ne(1), - "implementation":"Substrate Node", - "msg":"system.connected", - "name": format!("Node {}", n), - "network_id":"12D3KooWEyoppNCUx8Yx66oV9fJnriXwCcXwDDUA2kj6vnc6iDEp", - "startup_time":"1625565542717", - "version":"2.0.0-07a1af348-aarch64-macos" - } - })).unwrap(); + node_tx + .send_json_text(json!({ + "id":n, + "ts":"2021-07-12T10:37:47.714666+01:00", + "payload": { + "authority":true, + "chain":"Polkadot", // No limit to #nodes on this network. + "config":"", + "genesis_hash": BlockHash::from_low_u64_ne(1), + "implementation":"Substrate Node", + "msg":"system.connected", + "name": format!("Node {}", n), + "network_id":"12D3KooWEyoppNCUx8Yx66oV9fJnriXwCcXwDDUA2kj6vnc6iDEp", + "startup_time":"1625565542717", + "version":"2.0.0-07a1af348-aarch64-macos" + } + })) + .unwrap(); } // Give those messages a chance to be handled. This, of course, @@ -79,7 +80,6 @@ pub fn benchmark_subscribe_speed(c: &mut Criterion) { // iters performed here, but a lot of the time that number is "1". let mut total_time = Duration::ZERO; for _n in 0..iters { - // Start a bunch of feeds: let mut feeds = server .get_core() @@ -94,7 +94,9 @@ pub fn benchmark_subscribe_speed(c: &mut Criterion) { // Then, Ping a feed: feeds[0].0.send_command("ping", "Finished!").unwrap(); - let finished = FeedMessage::Pong { msg: "Finished!".to_owned() }; + let finished = FeedMessage::Pong { + msg: "Finished!".to_owned(), + }; // Wait and see how long it takes to get a pong back: let start = Instant::now(); @@ -115,7 +117,7 @@ pub fn benchmark_subscribe_speed(c: &mut Criterion) { // The total time spent waiting for subscribes: total_time }) - ); + }); } criterion_group!(benches, benchmark_subscribe_speed); diff --git a/backend/telemetry_core/src/aggregator/aggregator.rs b/backend/telemetry_core/src/aggregator/aggregator.rs index c8c21af..3f90131 100644 --- a/backend/telemetry_core/src/aggregator/aggregator.rs +++ b/backend/telemetry_core/src/aggregator/aggregator.rs @@ -113,8 +113,10 @@ impl Aggregator { /// Return a sink that a feed can send messages into to be handled by the aggregator. pub fn subscribe_feed( &self, - ) -> (u64, impl Sink + Send + Sync + Unpin + 'static) - { + ) -> ( + u64, + impl Sink + Send + Sync + Unpin + 'static, + ) { // Assign a unique aggregator-local ID to each connection that subscribes, and pass // that along with every message to the aggregator loop: let feed_conn_id = self @@ -125,11 +127,14 @@ impl Aggregator { // Calling `send` on this Sink requires Unpin. There may be a nicer way than this, // but pinning by boxing is the easy solution for now: - (feed_conn_id, Box::pin(tx_to_aggregator.with(move |msg| async move { - Ok(inner_loop::ToAggregator::FromFeedWebsocket( - feed_conn_id.into(), - msg, - )) - }))) + ( + feed_conn_id, + Box::pin(tx_to_aggregator.with(move |msg| async move { + Ok(inner_loop::ToAggregator::FromFeedWebsocket( + feed_conn_id.into(), + msg, + )) + })), + ) } } diff --git a/backend/telemetry_core/src/aggregator/inner_loop.rs b/backend/telemetry_core/src/aggregator/inner_loop.rs index f8abe5e..813c740 100644 --- a/backend/telemetry_core/src/aggregator/inner_loop.rs +++ b/backend/telemetry_core/src/aggregator/inner_loop.rs @@ -447,7 +447,10 @@ impl InnerLoop { .chunks(64) .filter_map(|nodes| { let mut feed_serializer = FeedMessageSerializer::new(); - for (node_id, node) in nodes.iter().filter_map(|&(idx, n)| n.as_ref().map(|n| (idx, n))) { + for (node_id, node) in nodes + .iter() + .filter_map(|&(idx, n)| n.as_ref().map(|n| (idx, n))) + { feed_serializer.push(feed_message::AddedNode(node_id, node)); feed_serializer.push(feed_message::FinalizedBlock( node_id, diff --git a/backend/telemetry_core/src/feed_message.rs b/backend/telemetry_core/src/feed_message.rs index 0b28ed3..1b5c14e 100644 --- a/backend/telemetry_core/src/feed_message.rs +++ b/backend/telemetry_core/src/feed_message.rs @@ -18,7 +18,6 @@ //! send to subscribed feeds (browsers). use serde::Serialize; -use std::mem; use crate::state::Node; use common::node_types::{ @@ -82,20 +81,6 @@ impl FeedMessageSerializer { let _ = to_writer(&mut self.buffer, value); } - /// Return the bytes we've serialized so far and prepare a new buffer. If you're - /// finished serializing data, prefer [`FeedMessageSerializer::into_finalized`] - pub fn finalize(&mut self) -> Option { - if self.buffer.is_empty() { - return None; - } - - self.buffer.push(b']'); - - let bytes = mem::replace(&mut self.buffer, Vec::with_capacity(BUFCAP)); - - Some(bytes.into()) - } - /// Return the bytes that we've serialized so far, consuming the serializer. pub fn into_finalized(mut self) -> Option { if self.buffer.is_empty() { diff --git a/backend/telemetry_core/src/main.rs b/backend/telemetry_core/src/main.rs index 0ac46ab..0c38214 100644 --- a/backend/telemetry_core/src/main.rs +++ b/backend/telemetry_core/src/main.rs @@ -77,7 +77,7 @@ fn main() { let worker_threads = match opts.worker_threads { 0 => num_cpus::get(), - n => n + n => n, }; tokio::runtime::Builder::new_multi_thread() @@ -308,7 +308,7 @@ async fn handle_feed_websocket_connection( mut ws_recv: http_utils::WsReceiver, mut tx_to_aggregator: S, feed_timeout: u64, - _feed_id: u64 // <- can be useful for debugging purposes. + _feed_id: u64, // <- can be useful for debugging purposes. ) -> (S, http_utils::WsSender) where S: futures::Sink + Unpin + Send + 'static, diff --git a/backend/telemetry_core/tests/e2e_tests.rs b/backend/telemetry_core/tests/e2e_tests.rs index ee426a5..b3d8b1b 100644 --- a/backend/telemetry_core/tests/e2e_tests.rs +++ b/backend/telemetry_core/tests/e2e_tests.rs @@ -37,7 +37,7 @@ use std::time::Duration; use test_utils::{ assert_contains_matches, feed_message_de::{FeedMessage, NodeDetails}, - workspace::{start_server, start_server_debug, ServerOpts, CoreOpts, ShardOpts}, + workspace::{start_server, start_server_debug, CoreOpts, ServerOpts, ShardOpts}, }; /// The simplest test we can run; the main benefit of this test (since we check similar) diff --git a/backend/telemetry_core/tests/soak_tests.rs b/backend/telemetry_core/tests/soak_tests.rs index 460d71b..51faef4 100644 --- a/backend/telemetry_core/tests/soak_tests.rs +++ b/backend/telemetry_core/tests/soak_tests.rs @@ -36,13 +36,13 @@ box; MacOS seems to hit limits quicker in general. use common::node_types::BlockHash; use common::ws_client::SentMessage; -use futures::{StreamExt, future}; +use futures::{future, StreamExt}; use serde_json::json; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use std::time::Duration; use structopt::StructOpt; -use test_utils::workspace::{start_server, ServerOpts, CoreOpts, ShardOpts}; +use test_utils::workspace::{start_server, CoreOpts, ServerOpts, ShardOpts}; /// A configurable soak_test runner. Configure by providing the expected args as /// an environment variable. One example to run this test is: @@ -87,7 +87,8 @@ async fn run_soak_test(opts: SoakTestOpts) { worker_threads: opts.shard_worker_threads, ..Default::default() }, - ).await; + ) + .await; println!("Telemetry core running at {}", server.get_core().host()); // Start up the shards we requested: @@ -274,7 +275,8 @@ async fn run_realistic_soak_test(opts: SoakTestOpts) { worker_threads: opts.shard_worker_threads, ..Default::default() }, - ).await; + ) + .await; println!("Telemetry core running at {}", server.get_core().host()); // Start up the shards we requested: @@ -307,14 +309,16 @@ async fn run_realistic_soak_test(opts: SoakTestOpts) { Duration::from_secs(3), format!("Node {}", idx + 1), "Polkadot".to_owned(), - idx + 1 + idx + 1, ); - let res = telemetry.start(|msg| async { - bytes_in.fetch_add(msg.len(), Ordering::Relaxed); - tx.unbounded_send(SentMessage::Binary(msg))?; - Ok::<_, anyhow::Error>(()) - }).await; + let res = telemetry + .start(|msg| async { + bytes_in.fetch_add(msg.len(), Ordering::Relaxed); + tx.unbounded_send(SentMessage::Binary(msg))?; + Ok::<_, anyhow::Error>(()) + }) + .await; if let Err(e) = res { log::error!("Telemetry Node #{} has died with error: {}", idx, e); @@ -408,7 +412,7 @@ struct SoakTestOpts { shard_worker_threads: Option, /// Should we log output from the core/shards to stdout? #[structopt(long)] - log_output: bool + log_output: bool, } /// Get soak test args from an envvar and parse them via structopt. diff --git a/backend/telemetry_shard/src/main.rs b/backend/telemetry_shard/src/main.rs index 27b464b..dc8723d 100644 --- a/backend/telemetry_shard/src/main.rs +++ b/backend/telemetry_shard/src/main.rs @@ -98,7 +98,7 @@ fn main() { let worker_threads = match opts.worker_threads { 0 => num_cpus::get(), - n => n + n => n, }; tokio::runtime::Builder::new_multi_thread() diff --git a/backend/test_utils/src/fake_telemetry.rs b/backend/test_utils/src/fake_telemetry.rs index c91d51a..dfcaca3 100644 --- a/backend/test_utils/src/fake_telemetry.rs +++ b/backend/test_utils/src/fake_telemetry.rs @@ -1,9 +1,9 @@ -use std::time::Duration; -use std::future::Future; -use serde_json::json; +use ::time::{format_description::well_known::Rfc3339, OffsetDateTime}; use common::node_types::BlockHash; +use serde_json::json; +use std::future::Future; +use std::time::Duration; use tokio::time::{self, MissedTickBehavior}; -use ::time::{ OffsetDateTime, format_description::well_known::Rfc3339 }; /// This emits fake but realistic looking telemetry messages. /// Can be connected to a telemetry server to emit realistic messages. @@ -11,7 +11,7 @@ pub struct FakeTelemetry { block_time: Duration, node_name: String, chain: String, - message_id: usize + message_id: usize, } impl FakeTelemetry { @@ -20,7 +20,7 @@ impl FakeTelemetry { block_time, node_name, chain, - message_id + message_id, } } @@ -33,9 +33,8 @@ impl FakeTelemetry { where Func: Send + FnMut(Vec) -> Fut, Fut: Future>, - E: Into + E: Into, { - let id = self.message_id; let name = self.node_name; let chain = self.chain; @@ -94,10 +93,12 @@ impl FakeTelemetry { let mut new_block_every = time::interval_at(now + block_time, block_time); new_block_every.set_missed_tick_behavior(MissedTickBehavior::Burst); - let mut system_interval_every = time::interval_at(now + Duration::from_secs(2), block_time * 2); + let mut system_interval_every = + time::interval_at(now + Duration::from_secs(2), block_time * 2); new_block_every.set_missed_tick_behavior(MissedTickBehavior::Burst); - let mut finalised_every = time::interval_at(now + Duration::from_secs(1) + block_time * 3, block_time); + let mut finalised_every = + time::interval_at(now + Duration::from_secs(1) + block_time * 3, block_time); new_block_every.set_missed_tick_behavior(MissedTickBehavior::Burst); // Send messages every interval: @@ -192,6 +193,6 @@ fn now_iso() -> String { /// Spread the u64 across the resulting u256 hash so that it's /// more visible in the UI. fn block_hash(n: u64) -> BlockHash { - let a: [u8; 32] = unsafe { std::mem::transmute([n,n,n,n]) }; + let a: [u8; 32] = unsafe { std::mem::transmute([n, n, n, n]) }; BlockHash::from(a) -} \ No newline at end of file +} diff --git a/backend/test_utils/src/lib.rs b/backend/test_utils/src/lib.rs index 1e9d119..bf0de0f 100644 --- a/backend/test_utils/src/lib.rs +++ b/backend/test_utils/src/lib.rs @@ -30,4 +30,4 @@ pub mod contains_matches; pub mod workspace; /// A utility to generate fake telemetry messages at realistic intervals. -pub mod fake_telemetry; \ No newline at end of file +pub mod fake_telemetry; diff --git a/backend/test_utils/src/server/server.rs b/backend/test_utils/src/server/server.rs index ac00af5..2934224 100644 --- a/backend/test_utils/src/server/server.rs +++ b/backend/test_utils/src/server/server.rs @@ -70,7 +70,7 @@ pub struct Server { /// Core process that we can connect to. core: CoreProcess, /// Things that vary based on the mode we are in. - mode: ServerMode + mode: ServerMode, } pub enum ServerMode { SingleProcessMode { @@ -258,7 +258,10 @@ impl Server { /// Start a server. pub async fn start(opts: StartOpts) -> Result { let server = match opts { - StartOpts::SingleProcess { command, log_output } => { + StartOpts::SingleProcess { + command, + log_output, + } => { let core_process = Server::start_core(log_output, command).await?; let virtual_shard_host = core_process.host.clone(); Server { @@ -271,13 +274,13 @@ impl Server { handle: None, _channel_type: PhantomData, }, - } + }, } } StartOpts::ShardAndCore { core_command, shard_command, - log_output + log_output, } => { let core_process = Server::start_core(log_output, core_command).await?; Server { @@ -286,13 +289,13 @@ impl Server { mode: ServerMode::ShardAndCoreMode { shard_command, shards: DenseMap::new(), - } + }, } } StartOpts::ConnectToExisting { feed_host, submit_hosts, - log_output + log_output, } => Server { log_output, core: Process { @@ -305,7 +308,7 @@ impl Server { submit_hosts, next_submit_host_idx: 0, shards: DenseMap::new(), - } + }, }, }; diff --git a/backend/test_utils/src/workspace/start_server.rs b/backend/test_utils/src/workspace/start_server.rs index d34c7dc..10d0f6f 100644 --- a/backend/test_utils/src/workspace/start_server.rs +++ b/backend/test_utils/src/workspace/start_server.rs @@ -20,14 +20,14 @@ use crate::server::{self, Command, Server}; /// Options for the server pub struct ServerOpts { pub release_mode: bool, - pub log_output: bool + pub log_output: bool, } impl Default for ServerOpts { fn default() -> Self { Self { release_mode: false, - log_output: true, + log_output: false, } } } @@ -42,7 +42,7 @@ impl Default for CoreOpts { fn default() -> Self { Self { feed_timeout: None, - worker_threads: None + worker_threads: None, } } } @@ -61,7 +61,7 @@ impl Default for ShardOpts { max_nodes_per_connection: None, max_node_data_per_second: None, node_block_seconds: None, - worker_threads: None + worker_threads: None, } } } @@ -92,7 +92,7 @@ pub async fn start_server( if let Ok(bin) = std::env::var("TELEMETRY_BIN") { return Server::start(server::StartOpts::SingleProcess { command: Command::new(bin), - log_output: server_opts.log_output + log_output: server_opts.log_output, }) .await .unwrap(); @@ -107,7 +107,7 @@ pub async fn start_server( return Server::start(server::StartOpts::ConnectToExisting { feed_host, submit_hosts, - log_output: server_opts.log_output + log_output: server_opts.log_output, }) .await .unwrap(); @@ -138,9 +138,7 @@ pub async fn start_server( .arg(val.to_string()); } if let Some(val) = shard_opts.worker_threads { - shard_command = shard_command - .arg("--worker-threads") - .arg(val.to_string()); + shard_command = shard_command.arg("--worker-threads").arg(val.to_string()); } // Build the core command @@ -163,7 +161,7 @@ pub async fn start_server( Server::start(server::StartOpts::ShardAndCore { shard_command, core_command, - log_output: server_opts.log_output + log_output: server_opts.log_output, }) .await .unwrap() @@ -171,10 +169,20 @@ pub async fn start_server( /// Start a telemetry core server in debug mode. see [`start_server`] for details. pub async fn start_server_debug() -> Server { - start_server(ServerOpts::default(), CoreOpts::default(), ShardOpts::default()).await + start_server( + ServerOpts::default(), + CoreOpts::default(), + ShardOpts::default(), + ) + .await } /// Start a telemetry core server in release mode. see [`start_server`] for details. pub async fn start_server_release() -> Server { - start_server(ServerOpts::default(), CoreOpts::default(), ShardOpts::default()).await + start_server( + ServerOpts::default(), + CoreOpts::default(), + ShardOpts::default(), + ) + .await }