From 74cf55174e04fa7d93c936347f132255312e4307 Mon Sep 17 00:00:00 2001 From: James Wilson Date: Fri, 6 Aug 2021 17:42:03 +0100 Subject: [PATCH] Use rayon to speed up subscribe message serializing --- backend/Cargo.lock | 1 + backend/common/src/dense_map.rs | 4 ++ backend/telemetry_core/Cargo.toml | 3 +- backend/telemetry_core/benches/subscribe.rs | 1 + .../src/aggregator/inner_loop.rs | 52 ++++++++++++------- backend/telemetry_core/src/state/chain.rs | 4 +- backend/telemetry_core/src/state/state.rs | 4 +- backend/telemetry_core/tests/soak_tests.rs | 5 ++ 8 files changed, 49 insertions(+), 25 deletions(-) diff --git a/backend/Cargo.lock b/backend/Cargo.lock index 78e230a..2536d50 100644 --- a/backend/Cargo.lock +++ b/backend/Cargo.lock @@ -1530,6 +1530,7 @@ dependencies = [ "once_cell", "parking_lot", "primitive-types", + "rayon", "reqwest", "rustc-hash", "serde", diff --git a/backend/common/src/dense_map.rs b/backend/common/src/dense_map.rs index b157a51..f3ee90f 100644 --- a/backend/common/src/dense_map.rs +++ b/backend/common/src/dense_map.rs @@ -49,6 +49,10 @@ where self.add_with(|_| item) } + pub fn as_slice(&self) -> &[Option] { + &self.items + } + pub fn add_with(&mut self, f: F) -> Id where F: FnOnce(Id) -> T, diff --git a/backend/telemetry_core/Cargo.toml b/backend/telemetry_core/Cargo.toml index 373c280..495b460 100644 --- a/backend/telemetry_core/Cargo.toml +++ b/backend/telemetry_core/Cargo.toml @@ -20,6 +20,7 @@ num_cpus = "1.13.0" once_cell = "1.8.0" parking_lot = "0.11.1" primitive-types = { version = "0.9.0", features = ["serde"] } +rayon = "1.5.1" reqwest = { version = "0.11.4", features = ["json"] } rustc-hash = "1.1.0" serde = { version = "1.0.126", features = ["derive"] } @@ -39,4 +40,4 @@ criterion = { version = "0.3.4", features = ["async", "async_tokio"] } [[bench]] name = "subscribe" -harness = false \ No newline at end of file +harness = false diff --git a/backend/telemetry_core/benches/subscribe.rs b/backend/telemetry_core/benches/subscribe.rs index 275901e..e3d0909 100644 --- a/backend/telemetry_core/benches/subscribe.rs +++ b/backend/telemetry_core/benches/subscribe.rs @@ -30,6 +30,7 @@ pub fn benchmark_subscribe_speed(c: &mut Criterion) { log_output: false, }, CoreOpts { + worker_threads: Some(2), ..Default::default() }, ShardOpts { diff --git a/backend/telemetry_core/src/aggregator/inner_loop.rs b/backend/telemetry_core/src/aggregator/inner_loop.rs index a1efb8c..f8abe5e 100644 --- a/backend/telemetry_core/src/aggregator/inner_loop.rs +++ b/backend/telemetry_core/src/aggregator/inner_loop.rs @@ -429,30 +429,42 @@ impl InnerLoop { new_chain.finalized_block().height, new_chain.finalized_block().hash, )); - for (idx, (chain_node_id, node)) in new_chain.iter_nodes().enumerate() { - let chain_node_id = chain_node_id.into(); - - // Send subscription confirmation and chain head before doing all the nodes, - // and continue sending batches of 32 nodes a time over the wire subsequently - if idx % 32 == 0 { - if let Some(bytes) = feed_serializer.finalize() { - let _ = feed_channel.unbounded_send(ToFeedWebsocket::Bytes(bytes)); - } - } - feed_serializer.push(feed_message::AddedNode(chain_node_id, node)); - feed_serializer.push(feed_message::FinalizedBlock( - chain_node_id, - node.finalized().height, - node.finalized().hash, - )); - if node.stale() { - feed_serializer.push(feed_message::StaleNode(chain_node_id)); - } - } if let Some(bytes) = feed_serializer.into_finalized() { let _ = feed_channel.unbounded_send(ToFeedWebsocket::Bytes(bytes)); } + // If many (eg 10k) nodes are connected, serializing all of their info takes time. + // So, parallelise this with Rayon, but we still send out messages for each node in order + // (which is helpful for the UI as it tries to maintain a sorted list of nodes). The chunk + // size is the max number of node info we fit into 1 message; smaller messages allow the UI + // to react a little faster and not have to wait for a larger update to come in. A chunk size + // of 64 means each mesage is ~32k. + use rayon::prelude::*; + let all_feed_messages: Vec<_> = new_chain + .nodes_slice() + .par_iter() + .enumerate() + .chunks(64) + .filter_map(|nodes| { + let mut feed_serializer = FeedMessageSerializer::new(); + for (node_id, node) in nodes.iter().filter_map(|&(idx, n)| n.as_ref().map(|n| (idx, n))) { + feed_serializer.push(feed_message::AddedNode(node_id, node)); + feed_serializer.push(feed_message::FinalizedBlock( + node_id, + node.finalized().height, + node.finalized().hash, + )); + if node.stale() { + feed_serializer.push(feed_message::StaleNode(node_id)); + } + } + feed_serializer.into_finalized() + }) + .collect(); + for bytes in all_feed_messages { + let _ = feed_channel.unbounded_send(ToFeedWebsocket::Bytes(bytes)); + } + // Actually make a note of the new chain subsciption: let new_genesis_hash = *new_chain.genesis_hash(); self.feed_conn_id_to_chain diff --git a/backend/telemetry_core/src/state/chain.rs b/backend/telemetry_core/src/state/chain.rs index 68fb11c..ea416f4 100644 --- a/backend/telemetry_core/src/state/chain.rs +++ b/backend/telemetry_core/src/state/chain.rs @@ -348,8 +348,8 @@ impl Chain { pub fn get_node(&self, id: ChainNodeId) -> Option<&Node> { self.nodes.get(id) } - pub fn iter_nodes(&self) -> impl Iterator { - self.nodes.iter() + pub fn nodes_slice(&self) -> &[Option] { + self.nodes.as_slice() } pub fn label(&self) -> &str { &self.labels.best() diff --git a/backend/telemetry_core/src/state/state.rs b/backend/telemetry_core/src/state/state.rs index c06f569..80e153f 100644 --- a/backend/telemetry_core/src/state/state.rs +++ b/backend/telemetry_core/src/state/state.rs @@ -286,8 +286,8 @@ impl<'a> StateChain<'a> { pub fn finalized_block(&self) -> &'a Block { self.chain.finalized_block() } - pub fn iter_nodes(&self) -> impl Iterator + 'a { - self.chain.iter_nodes() + pub fn nodes_slice(&self) -> &[Option] { + self.chain.nodes_slice() } } diff --git a/backend/telemetry_core/tests/soak_tests.rs b/backend/telemetry_core/tests/soak_tests.rs index 34e8378..460d71b 100644 --- a/backend/telemetry_core/tests/soak_tests.rs +++ b/backend/telemetry_core/tests/soak_tests.rs @@ -76,6 +76,7 @@ async fn run_soak_test(opts: SoakTestOpts) { let mut server = start_server( ServerOpts { release_mode: true, + log_output: opts.log_output, ..Default::default() }, CoreOpts { @@ -262,6 +263,7 @@ async fn run_realistic_soak_test(opts: SoakTestOpts) { let mut server = start_server( ServerOpts { release_mode: true, + log_output: opts.log_output, ..Default::default() }, CoreOpts { @@ -404,6 +406,9 @@ struct SoakTestOpts { /// Number of worker threads each shard will use #[structopt(long)] shard_worker_threads: Option, + /// Should we log output from the core/shards to stdout? + #[structopt(long)] + log_output: bool } /// Get soak test args from an envvar and parse them via structopt.