From 47e46d178bf7e433d421f34f2f223490db59e576 Mon Sep 17 00:00:00 2001
From: Andrei Sandu <54316454+sandreim@users.noreply.github.com>
Date: Thu, 25 Jan 2024 19:02:24 +0200
Subject: [PATCH] Add subsystem benchmarks for `availability-distribution` and
`biftield-distribution` (availability write) (#2970)
Introduce a new test objective : `DataAvailabilityWrite`.
The new benchmark measures the network and cpu usage of
`availability-distribution`, `biftield-distribution` and
`availability-store` subsystems from the perspective of a validator node
during the process when candidates are made available.
Additionally I refactored the networking emulation to support bandwidth
acounting and limits of incoming and outgoing requests.
Screenshot of succesful run
---------
Signed-off-by: Andrei Sandu
---
Cargo.lock | 16 +-
Cargo.toml | 1 +
polkadot/node/subsystem-bench/Cargo.toml | 18 +-
polkadot/node/subsystem-bench/README.md | 56 +-
.../examples/availability_read.yaml | 36 +-
.../examples/availability_write.yaml | 15 +
.../src/availability/av_store_helpers.rs | 57 +
.../subsystem-bench/src/availability/mod.rs | 463 ++++++--
polkadot/node/subsystem-bench/src/cli.rs | 4 +-
.../subsystem-bench/src/core/configuration.rs | 92 +-
.../node/subsystem-bench/src/core/display.rs | 5 +-
.../subsystem-bench/src/core/environment.rs | 165 ++-
.../node/subsystem-bench/src/core/keyring.rs | 38 +-
.../subsystem-bench/src/core/mock/av_store.rs | 82 +-
.../src/core/mock/chain_api.rs | 92 ++
.../subsystem-bench/src/core/mock/dummy.rs | 1 +
.../node/subsystem-bench/src/core/mock/mod.rs | 3 +
.../src/core/mock/network_bridge.rs | 379 +++----
.../src/core/mock/runtime_api.rs | 99 +-
.../node/subsystem-bench/src/core/network.rs | 994 ++++++++++++++----
.../subsystem-bench/src/subsystem-bench.rs | 137 ++-
prdoc/pr_2970.prdoc | 15 +
22 files changed, 1967 insertions(+), 801 deletions(-)
create mode 100644 polkadot/node/subsystem-bench/examples/availability_write.yaml
create mode 100644 polkadot/node/subsystem-bench/src/availability/av_store_helpers.rs
create mode 100644 polkadot/node/subsystem-bench/src/core/mock/chain_api.rs
create mode 100644 prdoc/pr_2970.prdoc
diff --git a/Cargo.lock b/Cargo.lock
index dbccf3e7d7..f7f1dcf606 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -13454,6 +13454,7 @@ version = "1.0.0"
dependencies = [
"assert_matches",
"async-trait",
+ "bitvec",
"clap 4.4.18",
"clap-num",
"color-eyre",
@@ -13462,12 +13463,17 @@ dependencies = [
"futures",
"futures-timer",
"itertools 0.11.0",
+ "kvdb-memorydb",
"log",
"orchestra",
"parity-scale-codec",
"paste",
+ "polkadot-availability-bitfield-distribution",
+ "polkadot-availability-distribution",
"polkadot-availability-recovery",
"polkadot-erasure-coding",
+ "polkadot-node-core-av-store",
+ "polkadot-node-core-chain-api",
"polkadot-node-metrics",
"polkadot-node-network-protocol",
"polkadot-node-primitives",
@@ -13482,12 +13488,14 @@ dependencies = [
"pyroscope",
"pyroscope_pprofrs",
"rand",
+ "rand_distr",
"sc-keystore",
"sc-network",
"sc-service",
"serde",
"serde_yaml",
"sp-application-crypto",
+ "sp-consensus",
"sp-core",
"sp-keyring",
"sp-keystore",
@@ -17079,9 +17087,9 @@ dependencies = [
[[package]]
name = "serde_yaml"
-version = "0.9.30"
+version = "0.9.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "b1bf28c79a99f70ee1f1d83d10c875d2e70618417fda01ad1785e027579d9d38"
+checksum = "3cc7a1570e38322cfe4154732e5110f887ea57e22b76f4bfd32b5bdd3368666c"
dependencies = [
"indexmap 2.0.0",
"itoa",
@@ -20752,9 +20760,9 @@ dependencies = [
[[package]]
name = "unsafe-libyaml"
-version = "0.2.10"
+version = "0.2.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "ab4c90930b95a82d00dc9e9ac071b4991924390d46cbd0dfe566148667605e4b"
+checksum = "f28467d3e1d3c6586d8f25fa243f544f5800fec42d97032474e17222c2b75cfa"
[[package]]
name = "unsigned-varint"
diff --git a/Cargo.toml b/Cargo.toml
index 20cc16039f..1edc64217f 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -159,6 +159,7 @@ members = [
"polkadot/node/gum/proc-macro",
"polkadot/node/jaeger",
"polkadot/node/malus",
+ "polkadot/node/subsystem-bench",
"polkadot/node/metrics",
"polkadot/node/network/approval-distribution",
"polkadot/node/network/availability-distribution",
diff --git a/polkadot/node/subsystem-bench/Cargo.toml b/polkadot/node/subsystem-bench/Cargo.toml
index 750f7a7e2f..40702411d8 100644
--- a/polkadot/node/subsystem-bench/Cargo.toml
+++ b/polkadot/node/subsystem-bench/Cargo.toml
@@ -22,12 +22,16 @@ polkadot-node-subsystem-types = { path = "../subsystem-types" }
polkadot-node-primitives = { path = "../primitives" }
polkadot-primitives = { path = "../../primitives" }
polkadot-node-network-protocol = { path = "../network/protocol" }
-polkadot-availability-recovery = { path = "../network/availability-recovery", features = ["subsystem-benchmarks"] }
+polkadot-availability-recovery = { path = "../network/availability-recovery", features=["subsystem-benchmarks"]}
+polkadot-availability-distribution = { path = "../network/availability-distribution"}
+polkadot-node-core-av-store = { path = "../core/av-store"}
+polkadot-node-core-chain-api = { path = "../core/chain-api"}
+polkadot-availability-bitfield-distribution = { path = "../network/bitfield-distribution"}
color-eyre = { version = "0.6.1", default-features = false }
-polkadot-overseer = { path = "../overseer" }
+polkadot-overseer = { path = "../overseer" }
colored = "2.0.4"
assert_matches = "1.5"
-async-trait = "0.1.74"
+async-trait = "0.1.57"
sp-keystore = { path = "../../../substrate/primitives/keystore" }
sc-keystore = { path = "../../../substrate/client/keystore" }
sp-core = { path = "../../../substrate/primitives/core" }
@@ -39,7 +43,12 @@ polkadot-erasure-coding = { package = "polkadot-erasure-coding", path = "../../e
log = "0.4.17"
env_logger = "0.9.0"
rand = "0.8.5"
-parity-scale-codec = { version = "3.6.1", features = ["derive", "std"] }
+# `rand` only supports uniform distribution, we need normal distribution for latency.
+rand_distr = "0.4.3"
+bitvec="1.0.1"
+kvdb-memorydb = "0.13.0"
+
+parity-scale-codec = { version = "3.6.1", features = ["std", "derive"] }
tokio = "1.24.2"
clap-num = "1.0.2"
polkadot-node-subsystem-test-helpers = { path = "../subsystem-test-helpers" }
@@ -47,6 +56,7 @@ sp-keyring = { path = "../../../substrate/primitives/keyring" }
sp-application-crypto = { path = "../../../substrate/primitives/application-crypto" }
sc-network = { path = "../../../substrate/client/network" }
sc-service = { path = "../../../substrate/client/service" }
+sp-consensus = { path = "../../../substrate/primitives/consensus/common" }
polkadot-node-metrics = { path = "../metrics" }
itertools = "0.11.0"
polkadot-primitives-test-helpers = { path = "../../primitives/test-helpers" }
diff --git a/polkadot/node/subsystem-bench/README.md b/polkadot/node/subsystem-bench/README.md
index 1ff5b129e1..e090a0392c 100644
--- a/polkadot/node/subsystem-bench/README.md
+++ b/polkadot/node/subsystem-bench/README.md
@@ -1,6 +1,6 @@
# Subsystem benchmark client
-Run parachain consensus stress and performance tests on your development machine.
+Run parachain consensus stress and performance tests on your development machine.
## Motivation
@@ -111,30 +111,28 @@ Commands:
```
Note: `test-sequence` is a special test objective that wraps up an arbitrary number of test objectives. It is tipically
-used to run a suite of tests defined in a `yaml` file like in this [example](examples/availability_read.yaml).
+ used to run a suite of tests defined in a `yaml` file like in this [example](examples/availability_read.yaml).
### Standard test options
-
+
```
-Options:
- --network The type of network to be emulated [default: ideal] [possible
- values: ideal, healthy, degraded]
- --n-cores Number of cores to fetch availability for [default: 100]
- --n-validators Number of validators to fetch chunks from [default: 500]
- --min-pov-size The minimum pov size in KiB [default: 5120]
- --max-pov-size The maximum pov size bytes [default: 5120]
--n, --num-blocks The number of blocks the test is going to run [default: 1]
--p, --peer-bandwidth The bandwidth of simulated remote peers in KiB
--b, --bandwidth The bandwidth of our simulated node in KiB
- --peer-error Simulated conection error ratio [0-100]
- --peer-min-latency Minimum remote peer latency in milliseconds [0-5000]
- --peer-max-latency Maximum remote peer latency in milliseconds [0-5000]
- --profile Enable CPU Profiling with Pyroscope
- --pyroscope-url Pyroscope Server URL [default: http://localhost:4040]
- --pyroscope-sample-rate Pyroscope Sample Rate [default: 113]
- --cache-misses Enable Cache Misses Profiling with Valgrind. Linux only, Valgrind
- must be in the PATH
--h, --help Print help
+ --network The type of network to be emulated [default: ideal] [possible values: ideal, healthy,
+ degraded]
+ --n-cores Number of cores to fetch availability for [default: 100]
+ --n-validators Number of validators to fetch chunks from [default: 500]
+ --min-pov-size The minimum pov size in KiB [default: 5120]
+ --max-pov-size The maximum pov size bytes [default: 5120]
+ -n, --num-blocks The number of blocks the test is going to run [default: 1]
+ -p, --peer-bandwidth The bandwidth of emulated remote peers in KiB
+ -b, --bandwidth The bandwidth of our node in KiB
+ --connectivity Emulated peer connection ratio [0-100]
+ --peer-mean-latency Mean remote peer latency in milliseconds [0-5000]
+ --peer-latency-std-dev Remote peer latency standard deviation
+ --profile Enable CPU Profiling with Pyroscope
+ --pyroscope-url Pyroscope Server URL [default: http://localhost:4040]
+ --pyroscope-sample-rate Pyroscope Sample Rate [default: 113]
+ --cache-misses Enable Cache Misses Profiling with Valgrind. Linux only, Valgrind must be in the PATH
+ -h, --help Print help
```
These apply to all test objectives, except `test-sequence` which relies on the values being specified in a file.
@@ -152,8 +150,8 @@ Benchmark availability recovery strategies
Usage: subsystem-bench data-availability-read [OPTIONS]
Options:
- -f, --fetch-from-backers Turbo boost AD Read by fetching the full availability datafrom backers first. Saves CPU
- as we don't need to re-construct from chunks. Tipically this is only faster if nodes
+ -f, --fetch-from-backers Turbo boost AD Read by fetching the full availability datafrom backers first. Saves CPU
+ as we don't need to re-construct from chunks. Tipically this is only faster if nodes
have enough bandwidth
-h, --help Print help
```
@@ -181,9 +179,9 @@ Let's run an availabilty read test which will recover availability for 10 cores
node validator network.
```
- target/testnet/subsystem-bench --n-cores 10 data-availability-read
-[2023-11-28T09:01:59Z INFO subsystem_bench::core::display] n_validators = 500, n_cores = 10, pov_size = 5120 - 5120,
- error = 0, latency = None
+ target/testnet/subsystem-bench --n-cores 10 data-availability-read
+[2023-11-28T09:01:59Z INFO subsystem_bench::core::display] n_validators = 500, n_cores = 10, pov_size = 5120 - 5120,
+ latency = None
[2023-11-28T09:01:59Z INFO subsystem-bench::availability] Generating template candidate index=0 pov_size=5242880
[2023-11-28T09:01:59Z INFO subsystem-bench::availability] Created test environment.
[2023-11-28T09:01:59Z INFO subsystem-bench::availability] Pre-generating 10 candidates.
@@ -196,8 +194,8 @@ node validator network.
[2023-11-28T09:02:07Z INFO subsystem_bench::availability] All blocks processed in 6001ms
[2023-11-28T09:02:07Z INFO subsystem_bench::availability] Throughput: 51200 KiB/block
[2023-11-28T09:02:07Z INFO subsystem_bench::availability] Block time: 6001 ms
-[2023-11-28T09:02:07Z INFO subsystem_bench::availability]
-
+[2023-11-28T09:02:07Z INFO subsystem_bench::availability]
+
Total received from network: 66 MiB
Total sent to network: 58 KiB
Total subsystem CPU usage 4.16s
diff --git a/polkadot/node/subsystem-bench/examples/availability_read.yaml b/polkadot/node/subsystem-bench/examples/availability_read.yaml
index 311ea97214..82355b0e29 100644
--- a/polkadot/node/subsystem-bench/examples/availability_read.yaml
+++ b/polkadot/node/subsystem-bench/examples/availability_read.yaml
@@ -1,7 +1,7 @@
TestConfiguration:
# Test 1
- objective: !DataAvailabilityRead
- fetch_from_backers: false
+ fetch_from_backers: true
n_validators: 300
n_cores: 20
min_pov_size: 5120
@@ -9,18 +9,14 @@ TestConfiguration:
peer_bandwidth: 52428800
bandwidth: 52428800
latency:
- min_latency:
- secs: 0
- nanos: 1000000
- max_latency:
- secs: 0
- nanos: 100000000
- error: 3
+ mean_latency_ms: 100
+ std_dev: 1
num_blocks: 3
+ connectivity: 90
# Test 2
- objective: !DataAvailabilityRead
- fetch_from_backers: false
+ fetch_from_backers: true
n_validators: 500
n_cores: 20
min_pov_size: 5120
@@ -28,18 +24,14 @@ TestConfiguration:
peer_bandwidth: 52428800
bandwidth: 52428800
latency:
- min_latency:
- secs: 0
- nanos: 1000000
- max_latency:
- secs: 0
- nanos: 100000000
- error: 3
+ mean_latency_ms: 100
+ std_dev: 1
num_blocks: 3
+ connectivity: 90
# Test 3
- objective: !DataAvailabilityRead
- fetch_from_backers: false
+ fetch_from_backers: true
n_validators: 1000
n_cores: 20
min_pov_size: 5120
@@ -47,11 +39,7 @@ TestConfiguration:
peer_bandwidth: 52428800
bandwidth: 52428800
latency:
- min_latency:
- secs: 0
- nanos: 1000000
- max_latency:
- secs: 0
- nanos: 100000000
- error: 3
+ mean_latency_ms: 100
+ std_dev: 1
num_blocks: 3
+ connectivity: 90
diff --git a/polkadot/node/subsystem-bench/examples/availability_write.yaml b/polkadot/node/subsystem-bench/examples/availability_write.yaml
new file mode 100644
index 0000000000..64e07d7696
--- /dev/null
+++ b/polkadot/node/subsystem-bench/examples/availability_write.yaml
@@ -0,0 +1,15 @@
+TestConfiguration:
+# Test 1kV, 200 cores, max Pov
+- objective: DataAvailabilityWrite
+ n_validators: 1000
+ n_cores: 200
+ max_validators_per_core: 5
+ min_pov_size: 5120
+ max_pov_size: 5120
+ peer_bandwidth: 52428800
+ bandwidth: 52428800
+ latency:
+ mean_latency_ms: 30
+ std_dev: 2.0
+ connectivity: 75
+ num_blocks: 3
diff --git a/polkadot/node/subsystem-bench/src/availability/av_store_helpers.rs b/polkadot/node/subsystem-bench/src/availability/av_store_helpers.rs
new file mode 100644
index 0000000000..18ea2f7289
--- /dev/null
+++ b/polkadot/node/subsystem-bench/src/availability/av_store_helpers.rs
@@ -0,0 +1,57 @@
+// Copyright (C) Parity Technologies (UK) Ltd.
+// This file is part of Polkadot.
+
+// Polkadot is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Polkadot is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Polkadot. If not, see .
+
+use super::*;
+
+use polkadot_node_metrics::metrics::Metrics;
+
+use polkadot_node_core_av_store::Config;
+use polkadot_node_subsystem_util::database::Database;
+
+use polkadot_node_core_av_store::AvailabilityStoreSubsystem;
+
+mod columns {
+ pub const DATA: u32 = 0;
+ pub const META: u32 = 1;
+ pub const NUM_COLUMNS: u32 = 2;
+}
+
+const TEST_CONFIG: Config = Config { col_data: columns::DATA, col_meta: columns::META };
+
+struct DumbOracle;
+
+impl sp_consensus::SyncOracle for DumbOracle {
+ fn is_major_syncing(&self) -> bool {
+ false
+ }
+
+ fn is_offline(&self) -> bool {
+ unimplemented!("oh no!")
+ }
+}
+
+pub fn new_av_store(dependencies: &TestEnvironmentDependencies) -> AvailabilityStoreSubsystem {
+ let metrics = Metrics::try_register(&dependencies.registry).unwrap();
+
+ AvailabilityStoreSubsystem::new(test_store(), TEST_CONFIG, Box::new(DumbOracle), metrics)
+}
+
+fn test_store() -> Arc {
+ let db = kvdb_memorydb::create(columns::NUM_COLUMNS);
+ let db =
+ polkadot_node_subsystem_util::database::kvdb_impl::DbAdapter::new(db, &[columns::META]);
+ Arc::new(db)
+}
diff --git a/polkadot/node/subsystem-bench/src/availability/mod.rs b/polkadot/node/subsystem-bench/src/availability/mod.rs
index faedccdf3e..3a42190e6e 100644
--- a/polkadot/node/subsystem-bench/src/availability/mod.rs
+++ b/polkadot/node/subsystem-bench/src/availability/mod.rs
@@ -13,25 +13,41 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see .
+use crate::{core::mock::ChainApiState, TestEnvironment};
+use av_store::NetworkAvailabilityState;
+use bitvec::bitvec;
+use colored::Colorize;
use itertools::Itertools;
+use polkadot_availability_bitfield_distribution::BitfieldDistribution;
+use polkadot_node_core_av_store::AvailabilityStoreSubsystem;
+use polkadot_node_subsystem::{Overseer, OverseerConnector, SpawnGlue};
+use polkadot_node_subsystem_types::{
+ messages::{AvailabilityStoreMessage, NetworkBridgeEvent},
+ Span,
+};
+use polkadot_overseer::Handle as OverseerHandle;
+use sc_network::{request_responses::ProtocolConfig, PeerId};
+use sp_core::H256;
use std::{collections::HashMap, iter::Cycle, ops::Sub, sync::Arc, time::Instant};
-use crate::TestEnvironment;
-use polkadot_node_subsystem::{Overseer, OverseerConnector, SpawnGlue};
-use polkadot_node_subsystem_test_helpers::derive_erasure_chunks_with_proofs_and_root;
-use polkadot_overseer::Handle as OverseerHandle;
-use sc_network::request_responses::ProtocolConfig;
-
-use colored::Colorize;
-
+use av_store_helpers::new_av_store;
use futures::{channel::oneshot, stream::FuturesUnordered, StreamExt};
+use polkadot_availability_distribution::{
+ AvailabilityDistributionSubsystem, IncomingRequestReceivers,
+};
use polkadot_node_metrics::metrics::Metrics;
use polkadot_availability_recovery::AvailabilityRecoverySubsystem;
+use polkadot_node_primitives::{AvailableData, ErasureChunk};
use crate::GENESIS_HASH;
use parity_scale_codec::Encode;
-use polkadot_node_network_protocol::request_response::{IncomingRequest, ReqProtocolNames};
+use polkadot_node_network_protocol::{
+ request_response::{v1::ChunkFetchingRequest, IncomingRequest, ReqProtocolNames},
+ OurView, Versioned, VersionedValidationProtocol,
+};
+use sc_network::request_responses::IncomingRequest as RawIncomingRequest;
+
use polkadot_node_primitives::{BlockData, PoV};
use polkadot_node_subsystem::messages::{AllMessages, AvailabilityRecoveryMessage};
@@ -39,8 +55,8 @@ use crate::core::{
environment::TestEnvironmentDependencies,
mock::{
av_store,
- network_bridge::{self, MockNetworkBridgeTx, NetworkAvailabilityState},
- runtime_api, MockAvailabilityStore, MockRuntimeApi,
+ network_bridge::{self, MockNetworkBridgeRx, MockNetworkBridgeTx},
+ runtime_api, MockAvailabilityStore, MockChainApi, MockRuntimeApi,
},
};
@@ -48,24 +64,26 @@ use super::core::{configuration::TestConfiguration, mock::dummy_builder, network
const LOG_TARGET: &str = "subsystem-bench::availability";
-use polkadot_node_primitives::{AvailableData, ErasureChunk};
-
use super::{cli::TestObjective, core::mock::AlwaysSupportsParachains};
-use polkadot_node_subsystem_test_helpers::mock::new_block_import_info;
+use polkadot_node_subsystem_test_helpers::{
+ derive_erasure_chunks_with_proofs_and_root, mock::new_block_import_info,
+};
use polkadot_primitives::{
- CandidateHash, CandidateReceipt, GroupIndex, Hash, HeadData, PersistedValidationData,
+ AvailabilityBitfield, BlockNumber, CandidateHash, CandidateReceipt, GroupIndex, Hash, HeadData,
+ Header, PersistedValidationData, Signed, SigningContext, ValidatorIndex,
};
use polkadot_primitives_test_helpers::{dummy_candidate_receipt, dummy_hash};
use sc_service::SpawnTaskHandle;
+mod av_store_helpers;
mod cli;
pub use cli::{DataAvailabilityReadOptions, NetworkEmulation};
-fn build_overseer(
+fn build_overseer_for_availability_read(
spawn_task_handle: SpawnTaskHandle,
runtime_api: MockRuntimeApi,
av_store: MockAvailabilityStore,
- network_bridge: MockNetworkBridgeTx,
+ network_bridge: (MockNetworkBridgeTx, MockNetworkBridgeRx),
availability_recovery: AvailabilityRecoverySubsystem,
) -> (Overseer, AlwaysSupportsParachains>, OverseerHandle) {
let overseer_connector = OverseerConnector::with_event_capacity(64000);
@@ -73,7 +91,8 @@ fn build_overseer(
let builder = dummy
.replace_runtime_api(|_| runtime_api)
.replace_availability_store(|_| av_store)
- .replace_network_bridge_tx(|_| network_bridge)
+ .replace_network_bridge_tx(|_| network_bridge.0)
+ .replace_network_bridge_rx(|_| network_bridge.1)
.replace_availability_recovery(|_| availability_recovery);
let (overseer, raw_handle) =
@@ -82,11 +101,38 @@ fn build_overseer(
(overseer, OverseerHandle::new(raw_handle))
}
-/// Takes a test configuration and uses it to creates the `TestEnvironment`.
+fn build_overseer_for_availability_write(
+ spawn_task_handle: SpawnTaskHandle,
+ runtime_api: MockRuntimeApi,
+ network_bridge: (MockNetworkBridgeTx, MockNetworkBridgeRx),
+ availability_distribution: AvailabilityDistributionSubsystem,
+ chain_api: MockChainApi,
+ availability_store: AvailabilityStoreSubsystem,
+ bitfield_distribution: BitfieldDistribution,
+) -> (Overseer, AlwaysSupportsParachains>, OverseerHandle) {
+ let overseer_connector = OverseerConnector::with_event_capacity(64000);
+ let dummy = dummy_builder!(spawn_task_handle);
+ let builder = dummy
+ .replace_runtime_api(|_| runtime_api)
+ .replace_availability_store(|_| availability_store)
+ .replace_network_bridge_tx(|_| network_bridge.0)
+ .replace_network_bridge_rx(|_| network_bridge.1)
+ .replace_chain_api(|_| chain_api)
+ .replace_bitfield_distribution(|_| bitfield_distribution)
+ // This is needed to test own chunk recovery for `n_cores`.
+ .replace_availability_distribution(|_| availability_distribution);
+
+ let (overseer, raw_handle) =
+ builder.build_with_connector(overseer_connector).expect("Should not fail");
+
+ (overseer, OverseerHandle::new(raw_handle))
+}
+
+/// Takes a test configuration and uses it to create the `TestEnvironment`.
pub fn prepare_test(
config: TestConfiguration,
state: &mut TestState,
-) -> (TestEnvironment, ProtocolConfig) {
+) -> (TestEnvironment, Vec) {
prepare_test_inner(config, state, TestEnvironmentDependencies::default())
}
@@ -94,14 +140,38 @@ fn prepare_test_inner(
config: TestConfiguration,
state: &mut TestState,
dependencies: TestEnvironmentDependencies,
-) -> (TestEnvironment, ProtocolConfig) {
+) -> (TestEnvironment, Vec) {
// Generate test authorities.
let test_authorities = config.generate_authorities();
- let runtime_api = runtime_api::MockRuntimeApi::new(config.clone(), test_authorities.clone());
+ let mut candidate_hashes: HashMap> = HashMap::new();
- let av_store =
- av_store::MockAvailabilityStore::new(state.chunks.clone(), state.candidate_hashes.clone());
+ // Prepare per block candidates.
+ // Genesis block is always finalized, so we start at 1.
+ for block_num in 1..=config.num_blocks {
+ for _ in 0..config.n_cores {
+ candidate_hashes
+ .entry(Hash::repeat_byte(block_num as u8))
+ .or_default()
+ .push(state.next_candidate().expect("Cycle iterator"))
+ }
+
+ // First candidate is our backed candidate.
+ state.backed_candidates.push(
+ candidate_hashes
+ .get(&Hash::repeat_byte(block_num as u8))
+ .expect("just inserted above")
+ .get(0)
+ .expect("just inserted above")
+ .clone(),
+ );
+ }
+
+ let runtime_api = runtime_api::MockRuntimeApi::new(
+ config.clone(),
+ test_authorities.clone(),
+ candidate_hashes,
+ );
let availability_state = NetworkAvailabilityState {
candidate_hashes: state.candidate_hashes.clone(),
@@ -109,45 +179,112 @@ fn prepare_test_inner(
chunks: state.chunks.clone(),
};
- let req_protocol_names = ReqProtocolNames::new(GENESIS_HASH, None);
- let (collation_req_receiver, req_cfg) =
- IncomingRequest::get_config_receiver(&req_protocol_names);
+ let mut req_cfgs = Vec::new();
- let network =
- NetworkEmulator::new(&config, &dependencies, &test_authorities, req_protocol_names);
+ let (collation_req_receiver, collation_req_cfg) =
+ IncomingRequest::get_config_receiver(&ReqProtocolNames::new(GENESIS_HASH, None));
+ req_cfgs.push(collation_req_cfg);
+
+ let (pov_req_receiver, pov_req_cfg) =
+ IncomingRequest::get_config_receiver(&ReqProtocolNames::new(GENESIS_HASH, None));
+
+ let (chunk_req_receiver, chunk_req_cfg) =
+ IncomingRequest::get_config_receiver(&ReqProtocolNames::new(GENESIS_HASH, None));
+ req_cfgs.push(pov_req_cfg);
+
+ let (network, network_interface, network_receiver) =
+ new_network(&config, &dependencies, &test_authorities, vec![Arc::new(availability_state)]);
let network_bridge_tx = network_bridge::MockNetworkBridgeTx::new(
- config.clone(),
- availability_state,
network.clone(),
+ network_interface.subsystem_sender(),
);
- let use_fast_path = match &state.config().objective {
- TestObjective::DataAvailabilityRead(options) => options.fetch_from_backers,
- _ => panic!("Unexpected objective"),
+ let network_bridge_rx =
+ network_bridge::MockNetworkBridgeRx::new(network_receiver, Some(chunk_req_cfg.clone()));
+
+ let (overseer, overseer_handle) = match &state.config().objective {
+ TestObjective::DataAvailabilityRead(options) => {
+ let use_fast_path = options.fetch_from_backers;
+
+ let subsystem = if use_fast_path {
+ AvailabilityRecoverySubsystem::with_fast_path(
+ collation_req_receiver,
+ Metrics::try_register(&dependencies.registry).unwrap(),
+ )
+ } else {
+ AvailabilityRecoverySubsystem::with_chunks_only(
+ collation_req_receiver,
+ Metrics::try_register(&dependencies.registry).unwrap(),
+ )
+ };
+
+ // Use a mocked av-store.
+ let av_store = av_store::MockAvailabilityStore::new(
+ state.chunks.clone(),
+ state.candidate_hashes.clone(),
+ );
+
+ build_overseer_for_availability_read(
+ dependencies.task_manager.spawn_handle(),
+ runtime_api,
+ av_store,
+ (network_bridge_tx, network_bridge_rx),
+ subsystem,
+ )
+ },
+ TestObjective::DataAvailabilityWrite => {
+ let availability_distribution = AvailabilityDistributionSubsystem::new(
+ test_authorities.keyring.keystore(),
+ IncomingRequestReceivers { pov_req_receiver, chunk_req_receiver },
+ Metrics::try_register(&dependencies.registry).unwrap(),
+ );
+
+ let block_headers = (0..=config.num_blocks)
+ .map(|block_number| {
+ (
+ Hash::repeat_byte(block_number as u8),
+ Header {
+ digest: Default::default(),
+ number: block_number as BlockNumber,
+ parent_hash: Default::default(),
+ extrinsics_root: Default::default(),
+ state_root: Default::default(),
+ },
+ )
+ })
+ .collect::>();
+
+ let chain_api_state = ChainApiState { block_headers };
+ let chain_api = MockChainApi::new(chain_api_state);
+ let bitfield_distribution =
+ BitfieldDistribution::new(Metrics::try_register(&dependencies.registry).unwrap());
+ build_overseer_for_availability_write(
+ dependencies.task_manager.spawn_handle(),
+ runtime_api,
+ (network_bridge_tx, network_bridge_rx),
+ availability_distribution,
+ chain_api,
+ new_av_store(&dependencies),
+ bitfield_distribution,
+ )
+ },
+ _ => {
+ unimplemented!("Invalid test objective")
+ },
};
- let subsystem = if use_fast_path {
- AvailabilityRecoverySubsystem::with_fast_path(
- collation_req_receiver,
- Metrics::try_register(&dependencies.registry).unwrap(),
- )
- } else {
- AvailabilityRecoverySubsystem::with_chunks_only(
- collation_req_receiver,
- Metrics::try_register(&dependencies.registry).unwrap(),
- )
- };
-
- let (overseer, overseer_handle) = build_overseer(
- dependencies.task_manager.spawn_handle(),
- runtime_api,
- av_store,
- network_bridge_tx,
- subsystem,
- );
-
- (TestEnvironment::new(dependencies, config, network, overseer, overseer_handle), req_cfg)
+ (
+ TestEnvironment::new(
+ dependencies,
+ config,
+ network,
+ overseer,
+ overseer_handle,
+ test_authorities,
+ ),
+ req_cfgs,
+ )
}
#[derive(Clone)]
@@ -169,6 +306,8 @@ pub struct TestState {
available_data: Vec,
// Per candiadte index chunks
chunks: Vec>,
+ // Per relay chain block - candidate backed by our backing group
+ backed_candidates: Vec,
}
impl TestState {
@@ -255,24 +394,27 @@ impl TestState {
candidate_receipt_templates.push(candidate_receipt);
}
- let pov_sizes = config.pov_sizes().to_owned();
- let pov_sizes = pov_sizes.into_iter().cycle();
gum::info!(target: LOG_TARGET, "{}","Created test environment.".bright_blue());
let mut _self = Self {
- config,
available_data,
candidate_receipt_templates,
chunks,
pov_size_to_candidate,
- pov_sizes,
+ pov_sizes: Vec::from(config.pov_sizes()).into_iter().cycle(),
candidate_hashes: HashMap::new(),
candidates: Vec::new().into_iter().cycle(),
+ backed_candidates: Vec::new(),
+ config,
};
_self.generate_candidates();
_self
}
+
+ pub fn backed_candidates(&mut self) -> &mut Vec {
+ &mut self.backed_candidates
+ }
}
pub async fn benchmark_availability_read(env: &mut TestEnvironment, mut state: TestState) {
@@ -280,15 +422,15 @@ pub async fn benchmark_availability_read(env: &mut TestEnvironment, mut state: T
env.import_block(new_block_import_info(Hash::repeat_byte(1), 1)).await;
- let start_marker = Instant::now();
+ let test_start = Instant::now();
let mut batch = FuturesUnordered::new();
let mut availability_bytes = 0u128;
env.metrics().set_n_validators(config.n_validators);
env.metrics().set_n_cores(config.n_cores);
- for block_num in 0..env.config().num_blocks {
- gum::info!(target: LOG_TARGET, "Current block {}/{}", block_num + 1, env.config().num_blocks);
+ for block_num in 1..=env.config().num_blocks {
+ gum::info!(target: LOG_TARGET, "Current block {}/{}", block_num, env.config().num_blocks);
env.metrics().set_current_block(block_num);
let block_start_ts = Instant::now();
@@ -311,7 +453,7 @@ pub async fn benchmark_availability_read(env: &mut TestEnvironment, mut state: T
env.send_message(message).await;
}
- gum::info!("{}", format!("{} recoveries pending", batch.len()).bright_black());
+ gum::info!(target: LOG_TARGET, "{}", format!("{} recoveries pending", batch.len()).bright_black());
while let Some(completed) = batch.next().await {
let available_data = completed.unwrap().unwrap();
env.metrics().on_pov_size(available_data.encoded_size());
@@ -320,22 +462,199 @@ pub async fn benchmark_availability_read(env: &mut TestEnvironment, mut state: T
let block_time = Instant::now().sub(block_start_ts).as_millis() as u64;
env.metrics().set_block_time(block_time);
- gum::info!("All work for block completed in {}", format!("{:?}ms", block_time).cyan());
+ gum::info!(target: LOG_TARGET, "All work for block completed in {}", format!("{:?}ms", block_time).cyan());
}
- let duration: u128 = start_marker.elapsed().as_millis();
+ let duration: u128 = test_start.elapsed().as_millis();
let availability_bytes = availability_bytes / 1024;
- gum::info!("All blocks processed in {}", format!("{:?}ms", duration).cyan());
- gum::info!(
+ gum::info!(target: LOG_TARGET, "All blocks processed in {}", format!("{:?}ms", duration).cyan());
+ gum::info!(target: LOG_TARGET,
"Throughput: {}",
format!("{} KiB/block", availability_bytes / env.config().num_blocks as u128).bright_red()
);
- gum::info!(
- "Block time: {}",
- format!("{} ms", start_marker.elapsed().as_millis() / env.config().num_blocks as u128)
- .red()
+ gum::info!(target: LOG_TARGET,
+ "Avg block time: {}",
+ format!("{} ms", test_start.elapsed().as_millis() / env.config().num_blocks as u128).red()
);
- gum::info!("{}", &env);
+ env.display_network_usage();
+ env.display_cpu_usage(&["availability-recovery"]);
env.stop().await;
}
+
+pub async fn benchmark_availability_write(env: &mut TestEnvironment, mut state: TestState) {
+ let config = env.config().clone();
+
+ env.metrics().set_n_validators(config.n_validators);
+ env.metrics().set_n_cores(config.n_cores);
+
+ gum::info!(target: LOG_TARGET, "Seeding availability store with candidates ...");
+ for backed_candidate in state.backed_candidates().clone() {
+ let candidate_index = *state.candidate_hashes.get(&backed_candidate.hash()).unwrap();
+ let available_data = state.available_data[candidate_index].clone();
+ let (tx, rx) = oneshot::channel();
+ env.send_message(AllMessages::AvailabilityStore(
+ AvailabilityStoreMessage::StoreAvailableData {
+ candidate_hash: backed_candidate.hash(),
+ n_validators: config.n_validators as u32,
+ available_data,
+ expected_erasure_root: backed_candidate.descriptor().erasure_root,
+ tx,
+ },
+ ))
+ .await;
+
+ rx.await
+ .unwrap()
+ .expect("Test candidates are stored nicely in availability store");
+ }
+
+ gum::info!(target: LOG_TARGET, "Done");
+
+ let test_start = Instant::now();
+
+ for block_num in 1..=env.config().num_blocks {
+ gum::info!(target: LOG_TARGET, "Current block #{}", block_num);
+ env.metrics().set_current_block(block_num);
+
+ let block_start_ts = Instant::now();
+ let relay_block_hash = Hash::repeat_byte(block_num as u8);
+ env.import_block(new_block_import_info(relay_block_hash, block_num as BlockNumber))
+ .await;
+
+ // Inform bitfield distribution about our view of current test block
+ let message = polkadot_node_subsystem_types::messages::BitfieldDistributionMessage::NetworkBridgeUpdate(
+ NetworkBridgeEvent::OurViewChange(OurView::new(vec![(relay_block_hash, Arc::new(Span::Disabled))], 0))
+ );
+ env.send_message(AllMessages::BitfieldDistribution(message)).await;
+
+ let chunk_fetch_start_ts = Instant::now();
+
+ // Request chunks of our own backed candidate from all other validators.
+ let mut receivers = Vec::new();
+ for index in 1..config.n_validators {
+ let (pending_response, pending_response_receiver) = oneshot::channel();
+
+ let request = RawIncomingRequest {
+ peer: PeerId::random(),
+ payload: ChunkFetchingRequest {
+ candidate_hash: state.backed_candidates()[block_num - 1].hash(),
+ index: ValidatorIndex(index as u32),
+ }
+ .encode(),
+ pending_response,
+ };
+
+ let peer = env
+ .authorities()
+ .validator_authority_id
+ .get(index)
+ .expect("all validators have keys");
+
+ if env.network().is_peer_connected(peer) &&
+ env.network().send_request_from_peer(peer, request).is_ok()
+ {
+ receivers.push(pending_response_receiver);
+ }
+ }
+
+ gum::info!(target: LOG_TARGET, "Waiting for all emulated peers to receive their chunk from us ...");
+ for receiver in receivers.into_iter() {
+ let response = receiver.await.expect("Chunk is always served succesfully");
+ // TODO: check if chunk is the one the peer expects to receive.
+ assert!(response.result.is_ok());
+ }
+
+ let chunk_fetch_duration = Instant::now().sub(chunk_fetch_start_ts).as_millis();
+
+ gum::info!(target: LOG_TARGET, "All chunks received in {}ms", chunk_fetch_duration);
+
+ let signing_context = SigningContext { session_index: 0, parent_hash: relay_block_hash };
+ let network = env.network().clone();
+ let authorities = env.authorities().clone();
+ let n_validators = config.n_validators;
+
+ // Spawn a task that will generate `n_validator` - 1 signed bitfiends and
+ // send them from the emulated peers to the subsystem.
+ // TODO: Implement topology.
+ env.spawn_blocking("send-bitfields", async move {
+ for index in 1..n_validators {
+ let validator_public =
+ authorities.validator_public.get(index).expect("All validator keys are known");
+
+ // Node has all the chunks in the world.
+ let payload: AvailabilityBitfield =
+ AvailabilityBitfield(bitvec![u8, bitvec::order::Lsb0; 1u8; 32]);
+ // TODO(soon): Use pre-signed messages. This is quite intensive on the CPU.
+ let signed_bitfield = Signed::::sign(
+ &authorities.keyring.keystore(),
+ payload,
+ &signing_context,
+ ValidatorIndex(index as u32),
+ validator_public,
+ )
+ .ok()
+ .flatten()
+ .expect("should be signed");
+
+ let from_peer = &authorities.validator_authority_id[index];
+
+ let message = peer_bitfield_message_v2(relay_block_hash, signed_bitfield);
+
+ // Send the action from peer only if it is connected to our node.
+ if network.is_peer_connected(from_peer) {
+ let _ = network.send_message_from_peer(from_peer, message);
+ }
+ }
+ });
+
+ gum::info!(
+ "Waiting for {} bitfields to be received and processed",
+ config.connected_count()
+ );
+
+ // Wait for all bitfields to be processed.
+ env.wait_until_metric_eq(
+ "polkadot_parachain_received_availabilty_bitfields_total",
+ config.connected_count() * block_num,
+ )
+ .await;
+
+ gum::info!(target: LOG_TARGET, "All bitfields processed");
+
+ let block_time = Instant::now().sub(block_start_ts).as_millis() as u64;
+ env.metrics().set_block_time(block_time);
+ gum::info!(target: LOG_TARGET, "All work for block completed in {}", format!("{:?}ms", block_time).cyan());
+ }
+
+ let duration: u128 = test_start.elapsed().as_millis();
+ gum::info!(target: LOG_TARGET, "All blocks processed in {}", format!("{:?}ms", duration).cyan());
+ gum::info!(target: LOG_TARGET,
+ "Avg block time: {}",
+ format!("{} ms", test_start.elapsed().as_millis() / env.config().num_blocks as u128).red()
+ );
+
+ env.display_network_usage();
+
+ env.display_cpu_usage(&[
+ "availability-distribution",
+ "bitfield-distribution",
+ "availability-store",
+ ]);
+
+ env.stop().await;
+}
+
+pub fn peer_bitfield_message_v2(
+ relay_hash: H256,
+ signed_bitfield: Signed,
+) -> VersionedValidationProtocol {
+ let bitfield = polkadot_node_network_protocol::v2::BitfieldDistributionMessage::Bitfield(
+ relay_hash,
+ signed_bitfield.into(),
+ );
+
+ Versioned::V2(polkadot_node_network_protocol::v2::ValidationProtocol::BitfieldDistribution(
+ bitfield,
+ ))
+}
diff --git a/polkadot/node/subsystem-bench/src/cli.rs b/polkadot/node/subsystem-bench/src/cli.rs
index 3352f33a35..7213713eb6 100644
--- a/polkadot/node/subsystem-bench/src/cli.rs
+++ b/polkadot/node/subsystem-bench/src/cli.rs
@@ -24,12 +24,14 @@ pub struct TestSequenceOptions {
pub path: String,
}
-/// Define the supported benchmarks targets
+/// Supported test objectives
#[derive(Debug, Clone, clap::Parser, Serialize, Deserialize)]
#[command(rename_all = "kebab-case")]
pub enum TestObjective {
/// Benchmark availability recovery strategies.
DataAvailabilityRead(DataAvailabilityReadOptions),
+ /// Benchmark availability and bitfield distribution.
+ DataAvailabilityWrite,
/// Run a test sequence specified in a file
TestSequence(TestSequenceOptions),
}
diff --git a/polkadot/node/subsystem-bench/src/core/configuration.rs b/polkadot/node/subsystem-bench/src/core/configuration.rs
index 164addb519..66da8a1db4 100644
--- a/polkadot/node/subsystem-bench/src/core/configuration.rs
+++ b/polkadot/node/subsystem-bench/src/core/configuration.rs
@@ -17,11 +17,13 @@
//! Test configuration definition and helpers.
use super::*;
use keyring::Keyring;
-use std::{path::Path, time::Duration};
+use std::path::Path;
pub use crate::cli::TestObjective;
use polkadot_primitives::{AuthorityDiscoveryId, ValidatorId};
-use rand::{distributions::Uniform, prelude::Distribution, thread_rng};
+use rand::thread_rng;
+use rand_distr::{Distribution, Normal, Uniform};
+
use serde::{Deserialize, Serialize};
pub fn random_pov_size(min_pov_size: usize, max_pov_size: usize) -> usize {
@@ -34,13 +36,13 @@ fn random_uniform_sample + From>(min_value: T, max_value:
.into()
}
-/// Peer response latency configuration.
+/// Peer networking latency configuration.
#[derive(Clone, Debug, Default, Serialize, Deserialize)]
pub struct PeerLatency {
- /// Min latency for `NetworkAction` completion.
- pub min_latency: Duration,
- /// Max latency or `NetworkAction` completion.
- pub max_latency: Duration,
+ /// The mean latency(milliseconds) of the peers.
+ pub mean_latency_ms: usize,
+ /// The standard deviation
+ pub std_dev: f64,
}
// Default PoV size in KiB.
@@ -58,6 +60,11 @@ fn default_connectivity() -> usize {
100
}
+// Default backing group size
+fn default_backing_group_size() -> usize {
+ 5
+}
+
/// The test input parameters
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct TestConfiguration {
@@ -67,6 +74,9 @@ pub struct TestConfiguration {
pub n_validators: usize,
/// Number of cores
pub n_cores: usize,
+ /// Maximum backing group size
+ #[serde(default = "default_backing_group_size")]
+ pub max_validators_per_core: usize,
/// The min PoV size
#[serde(default = "default_pov_size")]
pub min_pov_size: usize,
@@ -82,12 +92,9 @@ pub struct TestConfiguration {
/// The amount of bandiwdth our node has.
#[serde(default = "default_bandwidth")]
pub bandwidth: usize,
- /// Optional peer emulation latency
+ /// Optional peer emulation latency (round trip time) wrt node under test
#[serde(default)]
pub latency: Option,
- /// Error probability, applies to sending messages to the emulated network peers
- #[serde(default)]
- pub error: usize,
/// Connectivity ratio, the percentage of peers we are not connected to, but ar part of
/// the topology.
#[serde(default = "default_connectivity")]
@@ -129,7 +136,7 @@ impl TestSequence {
/// Helper struct for authority related state.
#[derive(Clone)]
pub struct TestAuthorities {
- pub keyrings: Vec,
+ pub keyring: Keyring,
pub validator_public: Vec,
pub validator_authority_id: Vec,
}
@@ -146,25 +153,27 @@ impl TestConfiguration {
pub fn pov_sizes(&self) -> &[usize] {
&self.pov_sizes
}
+ /// Return the number of peers connected to our node.
+ pub fn connected_count(&self) -> usize {
+ ((self.n_validators - 1) as f64 / (100.0 / self.connectivity as f64)) as usize
+ }
/// Generates the authority keys we need for the network emulation.
pub fn generate_authorities(&self) -> TestAuthorities {
- let keyrings = (0..self.n_validators)
- .map(|peer_index| Keyring::new(format!("Node{}", peer_index)))
+ let keyring = Keyring::default();
+
+ let keys = (0..self.n_validators)
+ .map(|peer_index| keyring.sr25519_new(format!("Node{}", peer_index)))
.collect::>();
// Generate `AuthorityDiscoveryId`` for each peer
- let validator_public: Vec = keyrings
- .iter()
- .map(|keyring: &Keyring| keyring.clone().public().into())
- .collect::>();
+ let validator_public: Vec =
+ keys.iter().map(|key| (*key).into()).collect::>();
- let validator_authority_id: Vec = keyrings
- .iter()
- .map(|keyring| keyring.clone().public().into())
- .collect::>();
+ let validator_authority_id: Vec =
+ keys.iter().map(|key| (*key).into()).collect::>();
- TestAuthorities { keyrings, validator_public, validator_authority_id }
+ TestAuthorities { keyring, validator_public, validator_authority_id }
}
/// An unconstrained standard configuration matching Polkadot/Kusama
@@ -180,12 +189,12 @@ impl TestConfiguration {
objective,
n_cores,
n_validators,
+ max_validators_per_core: 5,
pov_sizes: generate_pov_sizes(n_cores, min_pov_size, max_pov_size),
bandwidth: 50 * 1024 * 1024,
peer_bandwidth: 50 * 1024 * 1024,
// No latency
latency: None,
- error: 0,
num_blocks,
min_pov_size,
max_pov_size,
@@ -205,14 +214,11 @@ impl TestConfiguration {
objective,
n_cores,
n_validators,
+ max_validators_per_core: 5,
pov_sizes: generate_pov_sizes(n_cores, min_pov_size, max_pov_size),
bandwidth: 50 * 1024 * 1024,
peer_bandwidth: 50 * 1024 * 1024,
- latency: Some(PeerLatency {
- min_latency: Duration::from_millis(1),
- max_latency: Duration::from_millis(100),
- }),
- error: 3,
+ latency: Some(PeerLatency { mean_latency_ms: 50, std_dev: 12.5 }),
num_blocks,
min_pov_size,
max_pov_size,
@@ -232,14 +238,11 @@ impl TestConfiguration {
objective,
n_cores,
n_validators,
+ max_validators_per_core: 5,
pov_sizes: generate_pov_sizes(n_cores, min_pov_size, max_pov_size),
bandwidth: 50 * 1024 * 1024,
peer_bandwidth: 50 * 1024 * 1024,
- latency: Some(PeerLatency {
- min_latency: Duration::from_millis(10),
- max_latency: Duration::from_millis(500),
- }),
- error: 33,
+ latency: Some(PeerLatency { mean_latency_ms: 150, std_dev: 40.0 }),
num_blocks,
min_pov_size,
max_pov_size,
@@ -248,15 +251,14 @@ impl TestConfiguration {
}
}
-/// Produce a randomized duration between `min` and `max`.
-pub fn random_latency(maybe_peer_latency: Option<&PeerLatency>) -> Option {
- maybe_peer_latency.map(|peer_latency| {
- Uniform::from(peer_latency.min_latency..=peer_latency.max_latency).sample(&mut thread_rng())
- })
-}
-
-/// Generate a random error based on `probability`.
-/// `probability` should be a number between 0 and 100.
-pub fn random_error(probability: usize) -> bool {
- Uniform::from(0..=99).sample(&mut thread_rng()) < probability
+/// Sample latency (in milliseconds) from a normal distribution with parameters
+/// specified in `maybe_peer_latency`.
+pub fn random_latency(maybe_peer_latency: Option<&PeerLatency>) -> usize {
+ maybe_peer_latency
+ .map(|latency_config| {
+ Normal::new(latency_config.mean_latency_ms as f64, latency_config.std_dev)
+ .expect("normal distribution parameters are good")
+ .sample(&mut thread_rng())
+ })
+ .unwrap_or(0.0) as usize
}
diff --git a/polkadot/node/subsystem-bench/src/core/display.rs b/polkadot/node/subsystem-bench/src/core/display.rs
index d600cc484c..bca82d7b90 100644
--- a/polkadot/node/subsystem-bench/src/core/display.rs
+++ b/polkadot/node/subsystem-bench/src/core/display.rs
@@ -180,12 +180,13 @@ pub fn parse_metrics(registry: &Registry) -> MetricCollection {
pub fn display_configuration(test_config: &TestConfiguration) {
gum::info!(
- "{}, {}, {}, {}, {}",
+ "[{}] {}, {}, {}, {}, {}",
+ format!("objective = {:?}", test_config.objective).green(),
format!("n_validators = {}", test_config.n_validators).blue(),
format!("n_cores = {}", test_config.n_cores).blue(),
format!("pov_size = {} - {}", test_config.min_pov_size, test_config.max_pov_size)
.bright_black(),
- format!("error = {}", test_config.error).bright_black(),
+ format!("connectivity = {}", test_config.connectivity).bright_black(),
format!("latency = {:?}", test_config.latency).bright_black(),
);
}
diff --git a/polkadot/node/subsystem-bench/src/core/environment.rs b/polkadot/node/subsystem-bench/src/core/environment.rs
index 2475964740..b684631643 100644
--- a/polkadot/node/subsystem-bench/src/core/environment.rs
+++ b/polkadot/node/subsystem-bench/src/core/environment.rs
@@ -15,12 +15,12 @@
// along with Polkadot. If not, see .
//! Test environment implementation
use crate::{
- core::{mock::AlwaysSupportsParachains, network::NetworkEmulator},
+ core::{mock::AlwaysSupportsParachains, network::NetworkEmulatorHandle},
TestConfiguration,
};
use colored::Colorize;
use core::time::Duration;
-use futures::FutureExt;
+use futures::{Future, FutureExt};
use polkadot_overseer::{BlockInfo, Handle as OverseerHandle};
use polkadot_node_subsystem::{messages::AllMessages, Overseer, SpawnGlue, TimeoutExt};
@@ -29,15 +29,12 @@ use polkadot_node_subsystem_util::metrics::prometheus::{
self, Gauge, Histogram, PrometheusError, Registry, U64,
};
-use sc_network::peer_store::LOG_TARGET;
use sc_service::{SpawnTaskHandle, TaskManager};
-use std::{
- fmt::Display,
- net::{Ipv4Addr, SocketAddr},
-};
+use std::net::{Ipv4Addr, SocketAddr};
use tokio::runtime::Handle;
-const MIB: f64 = 1024.0 * 1024.0;
+const LOG_TARGET: &str = "subsystem-bench::environment";
+use super::configuration::TestAuthorities;
/// Test environment/configuration metrics
#[derive(Clone)]
@@ -56,9 +53,8 @@ pub struct TestEnvironmentMetrics {
impl TestEnvironmentMetrics {
pub fn new(registry: &Registry) -> Result {
- let mut buckets = prometheus::exponential_buckets(16384.0, 2.0, 9)
+ let buckets = prometheus::exponential_buckets(16384.0, 2.0, 9)
.expect("arguments are always valid; qed");
- buckets.extend(vec![5.0 * MIB, 6.0 * MIB, 7.0 * MIB, 8.0 * MIB, 9.0 * MIB, 10.0 * MIB]);
Ok(Self {
n_validators: prometheus::register(
@@ -150,7 +146,7 @@ pub const GENESIS_HASH: Hash = Hash::repeat_byte(0xff);
// We use this to bail out sending messages to the subsystem if it is overloaded such that
// the time of flight is breaches 5s.
// This should eventually be a test parameter.
-const MAX_TIME_OF_FLIGHT: Duration = Duration::from_millis(5000);
+pub const MAX_TIME_OF_FLIGHT: Duration = Duration::from_millis(5000);
/// The test environment is the high level wrapper of all things required to test
/// a certain subsystem.
@@ -189,9 +185,11 @@ pub struct TestEnvironment {
/// The test configuration.
config: TestConfiguration,
/// A handle to the network emulator.
- network: NetworkEmulator,
+ network: NetworkEmulatorHandle,
/// Configuration/env metrics
metrics: TestEnvironmentMetrics,
+ /// Test authorities generated from the configuration.
+ authorities: TestAuthorities,
}
impl TestEnvironment {
@@ -199,9 +197,10 @@ impl TestEnvironment {
pub fn new(
dependencies: TestEnvironmentDependencies,
config: TestConfiguration,
- network: NetworkEmulator,
+ network: NetworkEmulatorHandle,
overseer: Overseer, AlwaysSupportsParachains>,
overseer_handle: OverseerHandle,
+ authorities: TestAuthorities,
) -> Self {
let metrics = TestEnvironmentMetrics::new(&dependencies.registry)
.expect("Metrics need to be registered");
@@ -230,30 +229,62 @@ impl TestEnvironment {
config,
network,
metrics,
+ authorities,
}
}
+ /// Returns the test configuration.
pub fn config(&self) -> &TestConfiguration {
&self.config
}
- pub fn network(&self) -> &NetworkEmulator {
+ /// Returns a reference to the inner network emulator handle.
+ pub fn network(&self) -> &NetworkEmulatorHandle {
&self.network
}
+ /// Returns the Prometheus registry.
pub fn registry(&self) -> &Registry {
&self.dependencies.registry
}
+ /// Spawn a named task in the `test-environment` task group.
+ #[allow(unused)]
+ pub fn spawn(&self, name: &'static str, task: impl Future