Use rayon to speed up subscribe message serializing

This commit is contained in:
James Wilson
2021-08-06 17:42:03 +01:00
parent 0788270756
commit 74cf55174e
8 changed files with 49 additions and 25 deletions
+1
View File
@@ -1530,6 +1530,7 @@ dependencies = [
"once_cell",
"parking_lot",
"primitive-types",
"rayon",
"reqwest",
"rustc-hash",
"serde",
+4
View File
@@ -49,6 +49,10 @@ where
self.add_with(|_| item)
}
pub fn as_slice(&self) -> &[Option<T>] {
&self.items
}
pub fn add_with<F>(&mut self, f: F) -> Id
where
F: FnOnce(Id) -> T,
+2 -1
View File
@@ -20,6 +20,7 @@ num_cpus = "1.13.0"
once_cell = "1.8.0"
parking_lot = "0.11.1"
primitive-types = { version = "0.9.0", features = ["serde"] }
rayon = "1.5.1"
reqwest = { version = "0.11.4", features = ["json"] }
rustc-hash = "1.1.0"
serde = { version = "1.0.126", features = ["derive"] }
@@ -39,4 +40,4 @@ criterion = { version = "0.3.4", features = ["async", "async_tokio"] }
[[bench]]
name = "subscribe"
harness = false
harness = false
@@ -30,6 +30,7 @@ pub fn benchmark_subscribe_speed(c: &mut Criterion) {
log_output: false,
},
CoreOpts {
worker_threads: Some(2),
..Default::default()
},
ShardOpts {
@@ -429,30 +429,42 @@ impl InnerLoop {
new_chain.finalized_block().height,
new_chain.finalized_block().hash,
));
for (idx, (chain_node_id, node)) in new_chain.iter_nodes().enumerate() {
let chain_node_id = chain_node_id.into();
// Send subscription confirmation and chain head before doing all the nodes,
// and continue sending batches of 32 nodes a time over the wire subsequently
if idx % 32 == 0 {
if let Some(bytes) = feed_serializer.finalize() {
let _ = feed_channel.unbounded_send(ToFeedWebsocket::Bytes(bytes));
}
}
feed_serializer.push(feed_message::AddedNode(chain_node_id, node));
feed_serializer.push(feed_message::FinalizedBlock(
chain_node_id,
node.finalized().height,
node.finalized().hash,
));
if node.stale() {
feed_serializer.push(feed_message::StaleNode(chain_node_id));
}
}
if let Some(bytes) = feed_serializer.into_finalized() {
let _ = feed_channel.unbounded_send(ToFeedWebsocket::Bytes(bytes));
}
// If many (eg 10k) nodes are connected, serializing all of their info takes time.
// So, parallelise this with Rayon, but we still send out messages for each node in order
// (which is helpful for the UI as it tries to maintain a sorted list of nodes). The chunk
// size is the max number of node info we fit into 1 message; smaller messages allow the UI
// to react a little faster and not have to wait for a larger update to come in. A chunk size
// of 64 means each mesage is ~32k.
use rayon::prelude::*;
let all_feed_messages: Vec<_> = new_chain
.nodes_slice()
.par_iter()
.enumerate()
.chunks(64)
.filter_map(|nodes| {
let mut feed_serializer = FeedMessageSerializer::new();
for (node_id, node) in nodes.iter().filter_map(|&(idx, n)| n.as_ref().map(|n| (idx, n))) {
feed_serializer.push(feed_message::AddedNode(node_id, node));
feed_serializer.push(feed_message::FinalizedBlock(
node_id,
node.finalized().height,
node.finalized().hash,
));
if node.stale() {
feed_serializer.push(feed_message::StaleNode(node_id));
}
}
feed_serializer.into_finalized()
})
.collect();
for bytes in all_feed_messages {
let _ = feed_channel.unbounded_send(ToFeedWebsocket::Bytes(bytes));
}
// Actually make a note of the new chain subsciption:
let new_genesis_hash = *new_chain.genesis_hash();
self.feed_conn_id_to_chain
+2 -2
View File
@@ -348,8 +348,8 @@ impl Chain {
pub fn get_node(&self, id: ChainNodeId) -> Option<&Node> {
self.nodes.get(id)
}
pub fn iter_nodes(&self) -> impl Iterator<Item = (ChainNodeId, &Node)> {
self.nodes.iter()
pub fn nodes_slice(&self) -> &[Option<Node>] {
self.nodes.as_slice()
}
pub fn label(&self) -> &str {
&self.labels.best()
+2 -2
View File
@@ -286,8 +286,8 @@ impl<'a> StateChain<'a> {
pub fn finalized_block(&self) -> &'a Block {
self.chain.finalized_block()
}
pub fn iter_nodes(&self) -> impl Iterator<Item = (ChainNodeId, &'a Node)> + 'a {
self.chain.iter_nodes()
pub fn nodes_slice(&self) -> &[Option<Node>] {
self.chain.nodes_slice()
}
}
@@ -76,6 +76,7 @@ async fn run_soak_test(opts: SoakTestOpts) {
let mut server = start_server(
ServerOpts {
release_mode: true,
log_output: opts.log_output,
..Default::default()
},
CoreOpts {
@@ -262,6 +263,7 @@ async fn run_realistic_soak_test(opts: SoakTestOpts) {
let mut server = start_server(
ServerOpts {
release_mode: true,
log_output: opts.log_output,
..Default::default()
},
CoreOpts {
@@ -404,6 +406,9 @@ struct SoakTestOpts {
/// Number of worker threads each shard will use
#[structopt(long)]
shard_worker_threads: Option<usize>,
/// Should we log output from the core/shards to stdout?
#[structopt(long)]
log_output: bool
}
/// Get soak test args from an envvar and parse them via structopt.