Introduce approval-voting/distribution benchmark (#2621)

## Summary
Built on top of the tooling and ideas introduced in
https://github.com/paritytech/polkadot-sdk/pull/2528, this PR introduces
a synthetic benchmark for measuring and assessing the performance
characteristics of the approval-voting and approval-distribution
subsystems.

Currently this allows, us to simulate the behaviours of these systems
based on the following dimensions:
```
TestConfiguration:
# Test 1
- objective: !ApprovalsTest
    last_considered_tranche: 89
    min_coalesce: 1
    max_coalesce: 6
    enable_assignments_v2: true
    send_till_tranche: 60
    stop_when_approved: false
    coalesce_tranche_diff: 12
    workdir_prefix: "/tmp"
    num_no_shows_per_candidate: 0
    approval_distribution_expected_tof: 6.0
    approval_distribution_cpu_ms: 3.0
    approval_voting_cpu_ms: 4.30
  n_validators: 500
  n_cores: 100
  n_included_candidates: 100
  min_pov_size: 1120
  max_pov_size: 5120
  peer_bandwidth: 524288000000
  bandwidth: 524288000000
  latency:
    min_latency:
      secs: 0
      nanos: 1000000
    max_latency:
      secs: 0
      nanos: 100000000
  error: 0
  num_blocks: 10
```

## The approach
1. We build a real overseer with the real implementations for
approval-voting and approval-distribution subsystems.
2. For a given network size, for each validator we pre-computed all
potential assignments and approvals it would send, because this a
computation heavy operation this will be cached on a file on disk and be
re-used if the generation parameters don't change.
3. The messages will be sent accordingly to the configured parameters
and those are split into 3 main benchmarking scenarios.

## Benchmarking scenarios

### Best case scenario *approvals_throughput_best_case.yaml*
It send to the approval-distribution only the minimum required tranche
to gathered the needed_approvals, so that a candidate is approved.

### Behaviour in the presence of no-shows *approvals_no_shows.yaml*
It sends the tranche needed to approve a candidate when we have a
maximum of *num_no_shows_per_candidate* tranches with no-shows for each
candidate.

### Maximum throughput *approvals_throughput.yaml*
It sends all the tranches for each block and measures the used CPU and
necessary network bandwidth. by the approval-voting and
approval-distribution subsystem.

## How to run it
```
cargo run -p polkadot-subsystem-bench --release -- test-sequence --path polkadot/node/subsystem-bench/examples/approvals_throughput.yaml
```

## Evaluating performance
### Use the real subsystems metrics
If you follow the steps in
https://github.com/paritytech/polkadot-sdk/tree/master/polkadot/node/subsystem-bench#install-grafana
for installing locally prometheus and grafana, all real metrics for the
`approval-distribution`, `approval-voting` and overseer are available.
E.g:
<img width="2149" alt="Screenshot 2023-12-05 at 11 07 46"
src="https://github.com/paritytech/polkadot-sdk/assets/49718502/cb8ae2dd-178b-4922-bfa4-dc37e572ed38">

<img width="2551" alt="Screenshot 2023-12-05 at 11 09 42"
src="https://github.com/paritytech/polkadot-sdk/assets/49718502/8b4542ba-88b9-46f9-9b70-cc345366081b">

<img width="2154" alt="Screenshot 2023-12-05 at 11 10 15"
src="https://github.com/paritytech/polkadot-sdk/assets/49718502/b8874d8d-632e-443a-9840-14ad8e90c54f">

<img width="2535" alt="Screenshot 2023-12-05 at 11 10 52"
src="https://github.com/paritytech/polkadot-sdk/assets/49718502/779a439f-fd18-4985-bb80-85d5afad78e2">

### Profile with pyroscope
1. Setup pyroscope following the steps in
https://github.com/paritytech/polkadot-sdk/tree/master/polkadot/node/subsystem-bench#install-pyroscope,
then run any of the benchmark scenario with `--profile` as the
arguments.
2. Open the pyroscope dashboard in grafana, e.g:
<img width="2544" alt="Screenshot 2024-01-09 at 17 09 58"
src="https://github.com/paritytech/polkadot-sdk/assets/49718502/58f50c99-a910-4d20-951a-8b16639303d9">



### Useful  logs
1. Network bandwidth requirements:
```
Payload bytes received from peers: 503993 KiB total, 50399 KiB/block
Payload bytes sent to peers: 629971 KiB total, 62997 KiB/block
```

2. Cpu usage by the approval-distribution/approval-voting subsystems.
```
approval-distribution CPU usage 84.061s
approval-distribution CPU usage per block 8.406s
approval-voting CPU usage 96.532s
approval-voting CPU usage per block 9.653s
```

3. Time passed until a given block is approved
```
 Chain selection approved  after 3500 ms hash=0x0101010101010101010101010101010101010101010101010101010101010101
Chain selection approved  after 4500 ms hash=0x0202020202020202020202020202020202020202020202020202020202020202
```

### Using benchmark to quantify improvements from
https://github.com/paritytech/polkadot-sdk/pull/1178 +
https://github.com/paritytech/polkadot-sdk/pull/1191

Using a versi-node we compare the scenarios where all new optimisations
are disabled with a scenarios where tranche0 assignments are sent in a
single message and a conservative simulation where the coalescing of
approvals gives us just 50% reduction in the number of messages we send.

Overall, what we see is a speedup of around 30-40% in the time it takes
to process the necessary messages and a 30-40% reduction in the
necessary bandwidth.

#### Best case scenario comparison(minimum required tranches sent).
Unoptimised
```
    Number of blocks: 10
    Payload bytes received from peers: 53289 KiB total, 5328 KiB/block
    Payload bytes sent to peers: 52489 KiB total, 5248 KiB/block
    approval-distribution CPU usage 6.732s
    approval-distribution CPU usage per block 0.673s
    approval-voting CPU usage 9.523s
    approval-voting CPU usage per block 0.952s
```

vs Optimisation enabled
```
   Number of blocks: 10
   Payload bytes received from peers: 32141 KiB total, 3214 KiB/block
   Payload bytes sent to peers: 37314 KiB total, 3731 KiB/block
   approval-distribution CPU usage 4.658s
   approval-distribution CPU usage per block 0.466s
   approval-voting CPU usage 6.236s
   approval-voting CPU usage per block 0.624s
```

#### Worst case all tranches sent, very unlikely happens when sharding
breaks.

Unoptimised
```
   Number of blocks: 10
   Payload bytes received from peers: 746393 KiB total, 74639 KiB/block
   Payload bytes sent to peers: 729151 KiB total, 72915 KiB/block
   approval-distribution CPU usage 118.681s
   approval-distribution CPU usage per block 11.868s
   approval-voting CPU usage 124.118s
   approval-voting CPU usage per block 12.412s
```

vs optimised
```
    Number of blocks: 10
    Payload bytes received from peers: 503993 KiB total, 50399 KiB/block
    Payload bytes sent to peers: 629971 KiB total, 62997 KiB/block
    approval-distribution CPU usage 84.061s
    approval-distribution CPU usage per block 8.406s
    approval-voting CPU usage 96.532s
    approval-voting CPU usage per block 9.653s
```


## TODOs
[x] Polish implementation.
[x] Use what we have so far to evaluate
https://github.com/paritytech/polkadot-sdk/pull/1191 before merging.
[x] List of features and additional dimensions we want to use for
benchmarking.
[x] Run benchmark on hardware similar with versi and kusama nodes.
[ ] Add benchmark to be run in CI for catching regression in
performance.
[ ] Rebase on latest changes for network emulation.

---------

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>
Signed-off-by: Alexandru Gheorghe <alexandru.gheorghe@parity.io>
Co-authored-by: Andrei Sandu <andrei-mihail@parity.io>
Co-authored-by: Andrei Sandu <54316454+sandreim@users.noreply.github.com>
This commit is contained in:
Alexandru Gheorghe
2024-02-05 08:46:22 +02:00
committed by GitHub
parent 90849b66b9
commit f9f886886b
29 changed files with 2856 additions and 126 deletions
Generated
+13 -2
View File
@@ -13467,6 +13467,7 @@ version = "1.0.0"
dependencies = [
"assert_matches",
"async-trait",
"bincode",
"bitvec",
"clap 4.4.18",
"clap-num",
@@ -13475,16 +13476,19 @@ dependencies = [
"env_logger 0.9.3",
"futures",
"futures-timer",
"hex",
"itertools 0.11.0",
"kvdb-memorydb",
"log",
"orchestra",
"parity-scale-codec",
"paste",
"polkadot-approval-distribution",
"polkadot-availability-bitfield-distribution",
"polkadot-availability-distribution",
"polkadot-availability-recovery",
"polkadot-erasure-coding",
"polkadot-node-core-approval-voting",
"polkadot-node-core-av-store",
"polkadot-node-core-chain-api",
"polkadot-node-metrics",
@@ -13501,17 +13505,24 @@ dependencies = [
"pyroscope",
"pyroscope_pprofrs",
"rand",
"rand_chacha 0.3.1",
"rand_core 0.6.4",
"rand_distr",
"sc-keystore",
"sc-network",
"sc-service",
"schnorrkel 0.9.1",
"serde",
"serde_yaml",
"sha1",
"sp-application-crypto",
"sp-consensus",
"sp-consensus-babe",
"sp-core",
"sp-keyring",
"sp-keystore",
"sp-runtime",
"sp-timestamp",
"substrate-prometheus-endpoint",
"tokio",
"tracing-gum",
@@ -17208,9 +17219,9 @@ dependencies = [
[[package]]
name = "sha1"
version = "0.10.5"
version = "0.10.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f04293dc80c3993519f2d7f6f511707ee7094fe0c6d3406feb330cdb3540eba3"
checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba"
dependencies = [
"cfg-if",
"cpufeatures",
@@ -55,11 +55,11 @@ pub struct OurAssignment {
}
impl OurAssignment {
pub(crate) fn cert(&self) -> &AssignmentCertV2 {
pub fn cert(&self) -> &AssignmentCertV2 {
&self.cert
}
pub(crate) fn tranche(&self) -> DelayTranche {
pub fn tranche(&self) -> DelayTranche {
self.tranche
}
@@ -225,7 +225,7 @@ fn assigned_core_transcript(core_index: CoreIndex) -> Transcript {
/// Information about the world assignments are being produced in.
#[derive(Clone, Debug)]
pub(crate) struct Config {
pub struct Config {
/// The assignment public keys for validators.
assignment_keys: Vec<AssignmentId>,
/// The groups of validators assigned to each core.
@@ -321,7 +321,7 @@ impl AssignmentCriteria for RealAssignmentCriteria {
/// different times. The idea is that most assignments are never triggered and fall by the wayside.
///
/// This will not assign to anything the local validator was part of the backing group for.
pub(crate) fn compute_assignments(
pub fn compute_assignments(
keystore: &LocalKeystore,
relay_vrf_story: RelayVRFStory,
config: &Config,
+28 -13
View File
@@ -92,11 +92,11 @@ use time::{slot_number_to_tick, Clock, ClockExt, DelayedApprovalTimer, SystemClo
mod approval_checking;
pub mod approval_db;
mod backend;
mod criteria;
pub mod criteria;
mod import;
mod ops;
mod persisted_entries;
mod time;
pub mod time;
use crate::{
approval_checking::{Check, TranchesToApproveResult},
@@ -159,6 +159,7 @@ pub struct ApprovalVotingSubsystem {
db: Arc<dyn Database>,
mode: Mode,
metrics: Metrics,
clock: Box<dyn Clock + Send + Sync>,
}
#[derive(Clone)]
@@ -444,6 +445,25 @@ impl ApprovalVotingSubsystem {
keystore: Arc<LocalKeystore>,
sync_oracle: Box<dyn SyncOracle + Send>,
metrics: Metrics,
) -> Self {
ApprovalVotingSubsystem::with_config_and_clock(
config,
db,
keystore,
sync_oracle,
metrics,
Box::new(SystemClock {}),
)
}
/// Create a new approval voting subsystem with the given keystore, config, and database.
pub fn with_config_and_clock(
config: Config,
db: Arc<dyn Database>,
keystore: Arc<LocalKeystore>,
sync_oracle: Box<dyn SyncOracle + Send>,
metrics: Metrics,
clock: Box<dyn Clock + Send + Sync>,
) -> Self {
ApprovalVotingSubsystem {
keystore,
@@ -452,6 +472,7 @@ impl ApprovalVotingSubsystem {
db_config: DatabaseConfig { col_approval_data: config.col_approval_data },
mode: Mode::Syncing(sync_oracle),
metrics,
clock,
}
}
@@ -493,15 +514,10 @@ fn db_sanity_check(db: Arc<dyn Database>, config: DatabaseConfig) -> SubsystemRe
impl<Context: Send> ApprovalVotingSubsystem {
fn start(self, ctx: Context) -> SpawnedSubsystem {
let backend = DbBackend::new(self.db.clone(), self.db_config);
let future = run::<DbBackend, Context>(
ctx,
self,
Box::new(SystemClock),
Box::new(RealAssignmentCriteria),
backend,
)
.map_err(|e| SubsystemError::with_origin("approval-voting", e))
.boxed();
let future =
run::<DbBackend, Context>(ctx, self, Box::new(RealAssignmentCriteria), backend)
.map_err(|e| SubsystemError::with_origin("approval-voting", e))
.boxed();
SpawnedSubsystem { name: "approval-voting-subsystem", future }
}
@@ -909,7 +925,6 @@ enum Action {
async fn run<B, Context>(
mut ctx: Context,
mut subsystem: ApprovalVotingSubsystem,
clock: Box<dyn Clock + Send + Sync>,
assignment_criteria: Box<dyn AssignmentCriteria + Send + Sync>,
mut backend: B,
) -> SubsystemResult<()>
@@ -923,7 +938,7 @@ where
let mut state = State {
keystore: subsystem.keystore,
slot_duration_millis: subsystem.slot_duration_millis,
clock,
clock: subsystem.clock,
assignment_criteria,
spans: HashMap::new(),
};
@@ -549,7 +549,7 @@ fn test_harness<T: Future<Output = VirtualOverseer>>(
let subsystem = run(
context,
ApprovalVotingSubsystem::with_config(
ApprovalVotingSubsystem::with_config_and_clock(
Config {
col_approval_data: test_constants::TEST_CONFIG.col_approval_data,
slot_duration_millis: SLOT_DURATION_MILLIS,
@@ -558,8 +558,8 @@ fn test_harness<T: Future<Output = VirtualOverseer>>(
Arc::new(keystore),
sync_oracle,
Metrics::default(),
clock.clone(),
),
clock.clone(),
assignment_criteria,
backend,
);
+18 -6
View File
@@ -33,14 +33,14 @@ use std::{
};
use polkadot_primitives::{Hash, ValidatorIndex};
const TICK_DURATION_MILLIS: u64 = 500;
pub const TICK_DURATION_MILLIS: u64 = 500;
/// A base unit of time, starting from the Unix epoch, split into half-second intervals.
pub(crate) type Tick = u64;
pub type Tick = u64;
/// A clock which allows querying of the current tick as well as
/// waiting for a tick to be reached.
pub(crate) trait Clock {
pub trait Clock {
/// Yields the current tick.
fn tick_now(&self) -> Tick;
@@ -49,7 +49,7 @@ pub(crate) trait Clock {
}
/// Extension methods for clocks.
pub(crate) trait ClockExt {
pub trait ClockExt {
fn tranche_now(&self, slot_duration_millis: u64, base_slot: Slot) -> DelayTranche;
}
@@ -61,7 +61,8 @@ impl<C: Clock + ?Sized> ClockExt for C {
}
/// A clock which uses the actual underlying system clock.
pub(crate) struct SystemClock;
#[derive(Clone)]
pub struct SystemClock;
impl Clock for SystemClock {
/// Yields the current tick.
@@ -93,11 +94,22 @@ fn tick_to_time(tick: Tick) -> SystemTime {
}
/// assumes `slot_duration_millis` evenly divided by tick duration.
pub(crate) fn slot_number_to_tick(slot_duration_millis: u64, slot: Slot) -> Tick {
pub fn slot_number_to_tick(slot_duration_millis: u64, slot: Slot) -> Tick {
let ticks_per_slot = slot_duration_millis / TICK_DURATION_MILLIS;
u64::from(slot) * ticks_per_slot
}
/// Converts a tick to the slot number.
pub fn tick_to_slot_number(slot_duration_millis: u64, tick: Tick) -> Slot {
let ticks_per_slot = slot_duration_millis / TICK_DURATION_MILLIS;
(tick / ticks_per_slot).into()
}
/// Converts a tranche from a slot to the tick number.
pub fn tranche_to_tick(slot_duration_millis: u64, slot: Slot, tranche: u32) -> Tick {
slot_number_to_tick(slot_duration_millis, slot) + tranche as u64
}
/// A list of delayed futures that gets triggered when the waiting time has expired and it is
/// time to sign the candidate.
/// We have a timer per relay-chain block.
+13
View File
@@ -38,6 +38,9 @@ sp-core = { path = "../../../substrate/primitives/core" }
clap = { version = "4.4.18", features = ["derive"] }
futures = "0.3.21"
futures-timer = "3.0.2"
bincode = "1.3.3"
sha1 = "0.10.6"
hex = "0.4.3"
gum = { package = "tracing-gum", path = "../gum" }
polkadot-erasure-coding = { package = "polkadot-erasure-coding", path = "../../erasure-coding" }
log = "0.4.17"
@@ -64,6 +67,16 @@ prometheus_endpoint = { package = "substrate-prometheus-endpoint", path = "../..
prometheus = { version = "0.13.0", default-features = false }
serde = "1.0.195"
serde_yaml = "0.9"
polkadot-node-core-approval-voting = { path = "../core/approval-voting" }
polkadot-approval-distribution = { path = "../network/approval-distribution" }
sp-consensus-babe = { path = "../../../substrate/primitives/consensus/babe" }
sp-runtime = { path = "../../../substrate/primitives/runtime", default-features = false }
sp-timestamp = { path = "../../../substrate/primitives/timestamp" }
schnorrkel = { version = "0.9.1", default-features = false }
rand_core = "0.6.2" # should match schnorrkel
rand_chacha = { version = "0.3.1" }
paste = "1.0.14"
orchestra = { version = "0.3.5", default-features = false, features = ["futures_channel"] }
pyroscope = "0.5.7"
@@ -0,0 +1,18 @@
TestConfiguration:
# Test 1
- objective: !ApprovalVoting
last_considered_tranche: 89
coalesce_mean: 3.0
coalesce_std_dev: 1.0
stop_when_approved: true
coalesce_tranche_diff: 12
workdir_prefix: "/tmp/"
enable_assignments_v2: true
num_no_shows_per_candidate: 10
n_validators: 500
n_cores: 100
min_pov_size: 1120
max_pov_size: 5120
peer_bandwidth: 524288000000
bandwidth: 524288000000
num_blocks: 10
@@ -0,0 +1,19 @@
TestConfiguration:
# Test 1
- objective: !ApprovalVoting
coalesce_mean: 3.0
coalesce_std_dev: 1.0
enable_assignments_v2: true
last_considered_tranche: 89
stop_when_approved: false
coalesce_tranche_diff: 12
workdir_prefix: "/tmp"
num_no_shows_per_candidate: 0
n_validators: 500
n_cores: 100
n_included_candidates: 100
min_pov_size: 1120
max_pov_size: 5120
peer_bandwidth: 524288000000
bandwidth: 524288000000
num_blocks: 10
@@ -0,0 +1,18 @@
TestConfiguration:
# Test 1
- objective: !ApprovalVoting
coalesce_mean: 3.0
coalesce_std_dev: 1.0
enable_assignments_v2: true
last_considered_tranche: 89
stop_when_approved: true
coalesce_tranche_diff: 12
workdir_prefix: "/tmp/"
num_no_shows_per_candidate: 0
n_validators: 500
n_cores: 100
min_pov_size: 1120
max_pov_size: 5120
peer_bandwidth: 524288000000
bandwidth: 524288000000
num_blocks: 10
@@ -0,0 +1,18 @@
TestConfiguration:
# Test 1
- objective: !ApprovalVoting
coalesce_mean: 1.0
coalesce_std_dev: 0.0
enable_assignments_v2: false
last_considered_tranche: 89
stop_when_approved: false
coalesce_tranche_diff: 12
workdir_prefix: "/tmp/"
num_no_shows_per_candidate: 0
n_validators: 500
n_cores: 100
min_pov_size: 1120
max_pov_size: 5120
peer_bandwidth: 524288000000
bandwidth: 524288000000
num_blocks: 10
@@ -0,0 +1,207 @@
// Copyright (C) Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
use crate::core::configuration::TestAuthorities;
use itertools::Itertools;
use polkadot_node_core_approval_voting::time::{Clock, SystemClock, Tick};
use polkadot_node_network_protocol::{
grid_topology::{SessionGridTopology, TopologyPeerInfo},
View,
};
use polkadot_node_subsystem_types::messages::{
network_bridge_event::NewGossipTopology, ApprovalDistributionMessage, NetworkBridgeEvent,
};
use polkadot_overseer::AllMessages;
use polkadot_primitives::{
BlockNumber, CandidateEvent, CandidateReceipt, CoreIndex, GroupIndex, Hash, Header,
Id as ParaId, Slot, ValidatorIndex,
};
use polkadot_primitives_test_helpers::dummy_candidate_receipt_bad_sig;
use rand::{seq::SliceRandom, SeedableRng};
use rand_chacha::ChaCha20Rng;
use sc_network::PeerId;
use sp_consensus_babe::{
digests::{CompatibleDigestItem, PreDigest, SecondaryVRFPreDigest},
AllowedSlots, BabeEpochConfiguration, Epoch as BabeEpoch, VrfSignature, VrfTranscript,
};
use sp_core::crypto::VrfSecret;
use sp_keyring::sr25519::Keyring as Sr25519Keyring;
use sp_runtime::{Digest, DigestItem};
use std::sync::{atomic::AtomicU64, Arc};
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
/// A fake system clock used for driving the approval voting and make
/// it process blocks, assignments and approvals from the past.
#[derive(Clone)]
pub struct PastSystemClock {
/// The real system clock
real_system_clock: SystemClock,
/// The difference in ticks between the real system clock and the current clock.
delta_ticks: Arc<AtomicU64>,
}
impl PastSystemClock {
/// Creates a new fake system clock with `delta_ticks` between the real time and the fake one.
pub fn new(real_system_clock: SystemClock, delta_ticks: Arc<AtomicU64>) -> Self {
PastSystemClock { real_system_clock, delta_ticks }
}
}
impl Clock for PastSystemClock {
fn tick_now(&self) -> Tick {
self.real_system_clock.tick_now() -
self.delta_ticks.load(std::sync::atomic::Ordering::SeqCst)
}
fn wait(
&self,
tick: Tick,
) -> std::pin::Pin<Box<dyn futures::prelude::Future<Output = ()> + Send + 'static>> {
self.real_system_clock
.wait(tick + self.delta_ticks.load(std::sync::atomic::Ordering::SeqCst))
}
}
/// Helper function to generate a babe epoch for this benchmark.
/// It does not change for the duration of the test.
pub fn generate_babe_epoch(current_slot: Slot, authorities: TestAuthorities) -> BabeEpoch {
let authorities = authorities
.validator_babe_id
.into_iter()
.enumerate()
.map(|(index, public)| (public, index as u64))
.collect_vec();
BabeEpoch {
epoch_index: 1,
start_slot: current_slot.saturating_sub(1u64),
duration: 200,
authorities,
randomness: [0xde; 32],
config: BabeEpochConfiguration { c: (1, 4), allowed_slots: AllowedSlots::PrimarySlots },
}
}
/// Generates a topology to be used for this benchmark.
pub fn generate_topology(test_authorities: &TestAuthorities) -> SessionGridTopology {
let keyrings = test_authorities
.validator_authority_id
.clone()
.into_iter()
.zip(test_authorities.peer_ids.clone())
.collect_vec();
let topology = keyrings
.clone()
.into_iter()
.enumerate()
.map(|(index, (discovery_id, peer_id))| TopologyPeerInfo {
peer_ids: vec![peer_id],
validator_index: ValidatorIndex(index as u32),
discovery_id,
})
.collect_vec();
let shuffled = (0..keyrings.len()).collect_vec();
SessionGridTopology::new(shuffled, topology)
}
/// Generates new session topology message.
pub fn generate_new_session_topology(
test_authorities: &TestAuthorities,
test_node: ValidatorIndex,
) -> Vec<AllMessages> {
let topology = generate_topology(test_authorities);
let event = NetworkBridgeEvent::NewGossipTopology(NewGossipTopology {
session: 1,
topology,
local_index: Some(test_node),
});
vec![AllMessages::ApprovalDistribution(ApprovalDistributionMessage::NetworkBridgeUpdate(event))]
}
/// Generates a peer view change for the passed `block_hash`
pub fn generate_peer_view_change_for(block_hash: Hash, peer_id: PeerId) -> AllMessages {
let network = NetworkBridgeEvent::PeerViewChange(peer_id, View::new([block_hash], 0));
AllMessages::ApprovalDistribution(ApprovalDistributionMessage::NetworkBridgeUpdate(network))
}
/// Helper function to create a a signature for the block header.
fn garbage_vrf_signature() -> VrfSignature {
let transcript = VrfTranscript::new(b"test-garbage", &[]);
Sr25519Keyring::Alice.pair().vrf_sign(&transcript.into())
}
/// Helper function to create a block header.
pub fn make_header(parent_hash: Hash, slot: Slot, number: u32) -> Header {
let digest =
{
let mut digest = Digest::default();
let vrf_signature = garbage_vrf_signature();
digest.push(DigestItem::babe_pre_digest(PreDigest::SecondaryVRF(
SecondaryVRFPreDigest { authority_index: 0, slot, vrf_signature },
)));
digest
};
Header {
digest,
extrinsics_root: Default::default(),
number,
state_root: Default::default(),
parent_hash,
}
}
/// Helper function to create a candidate receipt.
fn make_candidate(para_id: ParaId, hash: &Hash) -> CandidateReceipt {
let mut r = dummy_candidate_receipt_bad_sig(*hash, Some(Default::default()));
r.descriptor.para_id = para_id;
r
}
/// Helper function to create a list of candidates that are included in the block
pub fn make_candidates(
block_hash: Hash,
block_number: BlockNumber,
num_cores: u32,
num_candidates: u32,
) -> Vec<CandidateEvent> {
let seed = [block_number as u8; 32];
let mut rand_chacha = ChaCha20Rng::from_seed(seed);
let mut candidates = (0..num_cores)
.map(|core| {
CandidateEvent::CandidateIncluded(
make_candidate(ParaId::from(core), &block_hash),
Vec::new().into(),
CoreIndex(core),
GroupIndex(core),
)
})
.collect_vec();
let (candidates, _) = candidates.partial_shuffle(&mut rand_chacha, num_candidates as usize);
candidates
.iter_mut()
.map(|val| val.clone())
.sorted_by(|a, b| match (a, b) {
(
CandidateEvent::CandidateIncluded(_, _, core_a, _),
CandidateEvent::CandidateIncluded(_, _, core_b, _),
) => core_a.0.cmp(&core_b.0),
(_, _) => todo!("Should not happen"),
})
.collect_vec()
}
@@ -0,0 +1,686 @@
// Copyright (C) Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use std::{
cmp::max,
collections::{BTreeMap, HashSet},
fs,
io::Write,
path::{Path, PathBuf},
time::Duration,
};
use futures::SinkExt;
use itertools::Itertools;
use parity_scale_codec::Encode;
use polkadot_node_core_approval_voting::{
criteria::{compute_assignments, Config},
time::tranche_to_tick,
};
use polkadot_node_network_protocol::grid_topology::{
GridNeighbors, RandomRouting, RequiredRouting, SessionGridTopology,
};
use polkadot_node_primitives::approval::{
self,
v2::{CoreBitfield, IndirectAssignmentCertV2, IndirectSignedApprovalVoteV2},
};
use polkadot_primitives::{
vstaging::ApprovalVoteMultipleCandidates, CandidateEvent, CandidateHash, CandidateIndex,
CoreIndex, SessionInfo, Slot, ValidatorId, ValidatorIndex, ASSIGNMENT_KEY_TYPE_ID,
};
use rand::{seq::SliceRandom, RngCore, SeedableRng};
use rand_chacha::ChaCha20Rng;
use rand_distr::{Distribution, Normal};
use sc_keystore::LocalKeystore;
use sc_network::PeerId;
use sha1::Digest;
use sp_application_crypto::AppCrypto;
use sp_consensus_babe::SlotDuration;
use sp_keystore::Keystore;
use sp_timestamp::Timestamp;
use super::{
test_message::{MessagesBundle, TestMessageInfo},
ApprovalTestState, ApprovalsOptions, BlockTestData,
};
use crate::{
approval::{
helpers::{generate_babe_epoch, generate_topology},
GeneratedState, BUFFER_FOR_GENERATION_MILLIS, LOG_TARGET, SLOT_DURATION_MILLIS,
},
core::{
configuration::{TestAuthorities, TestConfiguration, TestObjective},
mock::session_info_for_peers,
NODE_UNDER_TEST,
},
};
use polkadot_node_network_protocol::v3 as protocol_v3;
use polkadot_primitives::Hash;
use sc_service::SpawnTaskHandle;
/// A generator of messages coming from a given Peer/Validator
pub struct PeerMessagesGenerator {
/// The grid neighbors of the node under test.
pub topology_node_under_test: GridNeighbors,
/// The topology of the network for the epoch under test.
pub topology: SessionGridTopology,
/// The validator index for this object generates the messages.
pub validator_index: ValidatorIndex,
/// An array of pre-generated random samplings, that is used to determine, which nodes would
/// send a given assignment, to the node under test because of the random samplings.
/// As an optimization we generate this sampling at the begining of the test and just pick
/// one randomly, because always taking the samples would be too expensive for benchamrk.
pub random_samplings: Vec<Vec<ValidatorIndex>>,
/// Channel for sending the generated messages to the aggregator
pub tx_messages: futures::channel::mpsc::UnboundedSender<(Hash, Vec<MessagesBundle>)>,
/// The list of test authorities
pub test_authorities: TestAuthorities,
//// The session info used for the test.
pub session_info: SessionInfo,
/// The blocks used for testing
pub blocks: Vec<BlockTestData>,
/// Approval options params.
pub options: ApprovalsOptions,
}
impl PeerMessagesGenerator {
/// Generates messages by spawning a blocking task in the background which begins creating
/// the assignments/approvals and peer view changes at the begining of each block.
pub fn generate_messages(mut self, spawn_task_handle: &SpawnTaskHandle) {
spawn_task_handle.spawn("generate-messages", "generate-messages", async move {
for block_info in &self.blocks {
let assignments = self.generate_assignments(block_info);
let bytes = self.validator_index.0.to_be_bytes();
let seed = [
bytes[0], bytes[1], bytes[2], bytes[3], 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
];
let mut rand_chacha = ChaCha20Rng::from_seed(seed);
let approvals = issue_approvals(
assignments,
block_info.hash,
&self.test_authorities.validator_public,
block_info.candidates.clone(),
&self.options,
&mut rand_chacha,
self.test_authorities.keyring.keystore_ref(),
);
self.tx_messages
.send((block_info.hash, approvals))
.await
.expect("Should not fail");
}
})
}
// Builds the messages finger print corresponding to this configuration.
// When the finger print exists already on disk the messages are not re-generated.
fn messages_fingerprint(
configuration: &TestConfiguration,
options: &ApprovalsOptions,
) -> String {
let mut fingerprint = options.fingerprint();
let mut exclude_objective = configuration.clone();
// The objective contains the full content of `ApprovalOptions`, we don't want to put all of
// that in fingerprint, so execlute it because we add it manually see above.
exclude_objective.objective = TestObjective::Unimplemented;
let configuration_bytes = bincode::serialize(&exclude_objective).unwrap();
fingerprint.extend(configuration_bytes);
let mut sha1 = sha1::Sha1::new();
sha1.update(fingerprint);
let result = sha1.finalize();
hex::encode(result)
}
/// Generate all messages(Assignments & Approvals) needed for approving `blocks``.
pub fn generate_messages_if_needed(
configuration: &TestConfiguration,
test_authorities: &TestAuthorities,
options: &ApprovalsOptions,
spawn_task_handle: &SpawnTaskHandle,
) -> PathBuf {
let path_name = format!(
"{}/{}",
options.workdir_prefix,
Self::messages_fingerprint(configuration, options)
);
let path = Path::new(&path_name);
if path.exists() {
return path.to_path_buf();
}
gum::info!("Generate message because file does not exist");
let delta_to_first_slot_under_test = Timestamp::new(BUFFER_FOR_GENERATION_MILLIS);
let initial_slot = Slot::from_timestamp(
(*Timestamp::current() - *delta_to_first_slot_under_test).into(),
SlotDuration::from_millis(SLOT_DURATION_MILLIS),
);
let babe_epoch = generate_babe_epoch(initial_slot, test_authorities.clone());
let session_info = session_info_for_peers(configuration, test_authorities);
let blocks = ApprovalTestState::generate_blocks_information(
configuration,
&babe_epoch,
initial_slot,
);
gum::info!(target: LOG_TARGET, "Generate messages");
let topology = generate_topology(test_authorities);
let random_samplings = random_samplings_to_node(
ValidatorIndex(NODE_UNDER_TEST),
test_authorities.validator_public.len(),
test_authorities.validator_public.len() * 2,
);
let topology_node_under_test =
topology.compute_grid_neighbors_for(ValidatorIndex(NODE_UNDER_TEST)).unwrap();
let (tx, mut rx) = futures::channel::mpsc::unbounded();
// Spawn a thread to generate the messages for each validator, so that we speed up the
// generation.
for current_validator_index in 1..test_authorities.validator_public.len() {
let peer_message_source = PeerMessagesGenerator {
topology_node_under_test: topology_node_under_test.clone(),
topology: topology.clone(),
validator_index: ValidatorIndex(current_validator_index as u32),
test_authorities: test_authorities.clone(),
session_info: session_info.clone(),
blocks: blocks.clone(),
tx_messages: tx.clone(),
random_samplings: random_samplings.clone(),
options: options.clone(),
};
peer_message_source.generate_messages(spawn_task_handle);
}
std::mem::drop(tx);
let seed = [0x32; 32];
let mut rand_chacha = ChaCha20Rng::from_seed(seed);
let mut all_messages: BTreeMap<u64, Vec<MessagesBundle>> = BTreeMap::new();
// Receive all messages and sort them by Tick they have to be sent.
loop {
match rx.try_next() {
Ok(Some((block_hash, messages))) =>
for message in messages {
let block_info = blocks
.iter()
.find(|val| val.hash == block_hash)
.expect("Should find blocks");
let tick_to_send = tranche_to_tick(
SLOT_DURATION_MILLIS,
block_info.slot,
message.tranche_to_send(),
);
let to_add = all_messages.entry(tick_to_send).or_default();
to_add.push(message);
},
Ok(None) => break,
Err(_) => {
std::thread::sleep(Duration::from_millis(50));
},
}
}
let all_messages = all_messages
.into_iter()
.flat_map(|(_, mut messages)| {
// Shuffle the messages inside the same tick, so that we don't priorites messages
// for older nodes. we try to simulate the same behaviour as in real world.
messages.shuffle(&mut rand_chacha);
messages
})
.collect_vec();
gum::info!("Generated a number of {:} unique messages", all_messages.len());
let generated_state = GeneratedState { all_messages: Some(all_messages), initial_slot };
let mut messages_file = fs::OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(path)
.unwrap();
messages_file
.write_all(&generated_state.encode())
.expect("Could not update message file");
path.to_path_buf()
}
/// Generates assignments for the given `current_validator_index`
/// Returns a list of assignments to be sent sorted by tranche.
fn generate_assignments(&self, block_info: &BlockTestData) -> Vec<TestMessageInfo> {
let config = Config::from(&self.session_info);
let leaving_cores = block_info
.candidates
.clone()
.into_iter()
.map(|candidate_event| {
if let CandidateEvent::CandidateIncluded(candidate, _, core_index, group_index) =
candidate_event
{
(candidate.hash(), core_index, group_index)
} else {
todo!("Variant is never created in this benchmark")
}
})
.collect_vec();
let mut assignments_by_tranche = BTreeMap::new();
let bytes = self.validator_index.0.to_be_bytes();
let seed = [
bytes[0], bytes[1], bytes[2], bytes[3], 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
];
let mut rand_chacha = ChaCha20Rng::from_seed(seed);
let to_be_sent_by = neighbours_that_would_sent_message(
&self.test_authorities.peer_ids,
self.validator_index.0,
&self.topology_node_under_test,
&self.topology,
);
let leaving_cores = leaving_cores
.clone()
.into_iter()
.filter(|(_, core_index, _group_index)| core_index.0 != self.validator_index.0)
.collect_vec();
let store = LocalKeystore::in_memory();
let _public = store
.sr25519_generate_new(
ASSIGNMENT_KEY_TYPE_ID,
Some(self.test_authorities.key_seeds[self.validator_index.0 as usize].as_str()),
)
.expect("should not fail");
let assignments = compute_assignments(
&store,
block_info.relay_vrf_story.clone(),
&config,
leaving_cores.clone(),
self.options.enable_assignments_v2,
);
let random_sending_nodes = self
.random_samplings
.get(rand_chacha.next_u32() as usize % self.random_samplings.len())
.unwrap();
let random_sending_peer_ids = random_sending_nodes
.iter()
.map(|validator| (*validator, self.test_authorities.peer_ids[validator.0 as usize]))
.collect_vec();
let mut unique_assignments = HashSet::new();
for (core_index, assignment) in assignments {
let assigned_cores = match &assignment.cert().kind {
approval::v2::AssignmentCertKindV2::RelayVRFModuloCompact { core_bitfield } =>
core_bitfield.iter_ones().map(|val| CoreIndex::from(val as u32)).collect_vec(),
approval::v2::AssignmentCertKindV2::RelayVRFDelay { core_index } =>
vec![*core_index],
approval::v2::AssignmentCertKindV2::RelayVRFModulo { sample: _ } =>
vec![core_index],
};
let bitfiled: CoreBitfield = assigned_cores.clone().try_into().unwrap();
// For the cases where tranch0 assignments are in a single certificate we need to make
// sure we create a single message.
if unique_assignments.insert(bitfiled) {
let this_tranche_assignments =
assignments_by_tranche.entry(assignment.tranche()).or_insert_with(Vec::new);
this_tranche_assignments.push((
IndirectAssignmentCertV2 {
block_hash: block_info.hash,
validator: self.validator_index,
cert: assignment.cert().clone(),
},
block_info
.candidates
.iter()
.enumerate()
.filter(|(_index, candidate)| {
if let CandidateEvent::CandidateIncluded(_, _, core, _) = candidate {
assigned_cores.contains(core)
} else {
panic!("Should not happen");
}
})
.map(|(index, _)| index as u32)
.collect_vec()
.try_into()
.unwrap(),
to_be_sent_by
.iter()
.chain(random_sending_peer_ids.iter())
.copied()
.collect::<HashSet<(ValidatorIndex, PeerId)>>(),
assignment.tranche(),
));
}
}
assignments_by_tranche
.into_values()
.flat_map(|assignments| assignments.into_iter())
.map(|assignment| {
let msg = protocol_v3::ApprovalDistributionMessage::Assignments(vec![(
assignment.0,
assignment.1,
)]);
TestMessageInfo {
msg,
sent_by: assignment
.2
.into_iter()
.map(|(validator_index, _)| validator_index)
.collect_vec(),
tranche: assignment.3,
block_hash: block_info.hash,
}
})
.collect_vec()
}
}
/// A list of random samplings that we use to determine which nodes should send a given message to
/// the node under test.
/// We can not sample every time for all the messages because that would be too expensive to
/// perform, so pre-generate a list of samples for a given network size.
/// - result[i] give us as a list of random nodes that would send a given message to the node under
/// test.
fn random_samplings_to_node(
node_under_test: ValidatorIndex,
num_validators: usize,
num_samplings: usize,
) -> Vec<Vec<ValidatorIndex>> {
let seed = [7u8; 32];
let mut rand_chacha = ChaCha20Rng::from_seed(seed);
(0..num_samplings)
.map(|_| {
(0..num_validators)
.filter(|sending_validator_index| {
*sending_validator_index != NODE_UNDER_TEST as usize
})
.flat_map(|sending_validator_index| {
let mut validators = (0..num_validators).collect_vec();
validators.shuffle(&mut rand_chacha);
let mut random_routing = RandomRouting::default();
validators
.into_iter()
.flat_map(|validator_to_send| {
if random_routing.sample(num_validators, &mut rand_chacha) {
random_routing.inc_sent();
if validator_to_send == node_under_test.0 as usize {
Some(ValidatorIndex(sending_validator_index as u32))
} else {
None
}
} else {
None
}
})
.collect_vec()
})
.collect_vec()
})
.collect_vec()
}
/// Helper function to randomly determine how many approvals we coalesce together in a single
/// message.
fn coalesce_approvals_len(
coalesce_mean: f32,
coalesce_std_dev: f32,
rand_chacha: &mut ChaCha20Rng,
) -> usize {
max(
1,
Normal::new(coalesce_mean, coalesce_std_dev)
.expect("normal distribution parameters are good")
.sample(rand_chacha)
.round() as i32,
) as usize
}
/// Helper function to create approvals signatures for all assignments passed as arguments.
/// Returns a list of Approvals messages that need to be sent.
fn issue_approvals(
assignments: Vec<TestMessageInfo>,
block_hash: Hash,
validator_ids: &[ValidatorId],
candidates: Vec<CandidateEvent>,
options: &ApprovalsOptions,
rand_chacha: &mut ChaCha20Rng,
store: &LocalKeystore,
) -> Vec<MessagesBundle> {
let mut queued_to_sign: Vec<TestSignInfo> = Vec::new();
let mut num_coalesce =
coalesce_approvals_len(options.coalesce_mean, options.coalesce_std_dev, rand_chacha);
let result = assignments
.iter()
.enumerate()
.map(|(_index, message)| match &message.msg {
protocol_v3::ApprovalDistributionMessage::Assignments(assignments) => {
let mut approvals_to_create = Vec::new();
let current_validator_index = queued_to_sign
.first()
.map(|msg| msg.validator_index)
.unwrap_or(ValidatorIndex(99999));
// Invariant for this benchmark.
assert_eq!(assignments.len(), 1);
let assignment = assignments.first().unwrap();
let earliest_tranche = queued_to_sign
.first()
.map(|val| val.assignment.tranche)
.unwrap_or(message.tranche);
if queued_to_sign.len() >= num_coalesce ||
(!queued_to_sign.is_empty() &&
current_validator_index != assignment.0.validator) ||
message.tranche - earliest_tranche >= options.coalesce_tranche_diff
{
approvals_to_create.push(TestSignInfo::sign_candidates(
&mut queued_to_sign,
validator_ids,
block_hash,
num_coalesce,
store,
));
num_coalesce = coalesce_approvals_len(
options.coalesce_mean,
options.coalesce_std_dev,
rand_chacha,
);
}
// If more that one candidate was in the assignment queue all of them for issuing
// approvals
for candidate_index in assignment.1.iter_ones() {
let candidate = candidates.get(candidate_index).unwrap();
if let CandidateEvent::CandidateIncluded(candidate, _, _, _) = candidate {
queued_to_sign.push(TestSignInfo {
candidate_hash: candidate.hash(),
candidate_index: candidate_index as CandidateIndex,
validator_index: assignment.0.validator,
assignment: message.clone(),
});
} else {
todo!("Other enum variants are not used in this benchmark");
}
}
approvals_to_create
},
_ => {
todo!("Other enum variants are not used in this benchmark");
},
})
.collect_vec();
let mut messages = result.into_iter().flatten().collect_vec();
if !queued_to_sign.is_empty() {
messages.push(TestSignInfo::sign_candidates(
&mut queued_to_sign,
validator_ids,
block_hash,
num_coalesce,
store,
));
}
messages
}
/// Helper struct to gather information about more than one candidate an sign it in a single
/// approval message.
struct TestSignInfo {
/// The candidate hash
candidate_hash: CandidateHash,
/// The candidate index
candidate_index: CandidateIndex,
/// The validator sending the assignments
validator_index: ValidatorIndex,
/// The assignments convering this candidate
assignment: TestMessageInfo,
}
impl TestSignInfo {
/// Helper function to create a signture for all candidates in `to_sign` parameter.
/// Returns a TestMessage
fn sign_candidates(
to_sign: &mut Vec<TestSignInfo>,
validator_ids: &[ValidatorId],
block_hash: Hash,
num_coalesce: usize,
store: &LocalKeystore,
) -> MessagesBundle {
let current_validator_index = to_sign.first().map(|val| val.validator_index).unwrap();
let tranche_approval_can_be_sent =
to_sign.iter().map(|val| val.assignment.tranche).max().unwrap();
let validator_id = validator_ids.get(current_validator_index.0 as usize).unwrap().clone();
let unique_assignments: HashSet<TestMessageInfo> =
to_sign.iter().map(|info| info.assignment.clone()).collect();
let mut to_sign = to_sign
.drain(..)
.sorted_by(|val1, val2| val1.candidate_index.cmp(&val2.candidate_index))
.peekable();
let mut bundle = MessagesBundle {
assignments: unique_assignments.into_iter().collect_vec(),
approvals: Vec::new(),
};
while to_sign.peek().is_some() {
let to_sign = to_sign.by_ref().take(num_coalesce).collect_vec();
let hashes = to_sign.iter().map(|val| val.candidate_hash).collect_vec();
let candidate_indices = to_sign.iter().map(|val| val.candidate_index).collect_vec();
let sent_by = to_sign
.iter()
.flat_map(|val| val.assignment.sent_by.iter())
.copied()
.collect::<HashSet<ValidatorIndex>>();
let payload = ApprovalVoteMultipleCandidates(&hashes).signing_payload(1);
let signature = store
.sr25519_sign(ValidatorId::ID, &validator_id.clone().into(), &payload[..])
.unwrap()
.unwrap()
.into();
let indirect = IndirectSignedApprovalVoteV2 {
block_hash,
candidate_indices: candidate_indices.try_into().unwrap(),
validator: current_validator_index,
signature,
};
let msg = protocol_v3::ApprovalDistributionMessage::Approvals(vec![indirect]);
bundle.approvals.push(TestMessageInfo {
msg,
sent_by: sent_by.into_iter().collect_vec(),
tranche: tranche_approval_can_be_sent,
block_hash,
});
}
bundle
}
}
/// Determine what neighbours would send a given message to the node under test.
fn neighbours_that_would_sent_message(
peer_ids: &[PeerId],
current_validator_index: u32,
topology_node_under_test: &GridNeighbors,
topology: &SessionGridTopology,
) -> Vec<(ValidatorIndex, PeerId)> {
let topology_originator = topology
.compute_grid_neighbors_for(ValidatorIndex(current_validator_index))
.unwrap();
let originator_y = topology_originator.validator_indices_y.iter().find(|validator| {
topology_node_under_test.required_routing_by_index(**validator, false) ==
RequiredRouting::GridY
});
assert!(originator_y != Some(&ValidatorIndex(NODE_UNDER_TEST)));
let originator_x = topology_originator.validator_indices_x.iter().find(|validator| {
topology_node_under_test.required_routing_by_index(**validator, false) ==
RequiredRouting::GridX
});
assert!(originator_x != Some(&ValidatorIndex(NODE_UNDER_TEST)));
let is_neighbour = topology_originator
.validator_indices_x
.contains(&ValidatorIndex(NODE_UNDER_TEST)) ||
topology_originator
.validator_indices_y
.contains(&ValidatorIndex(NODE_UNDER_TEST));
let mut to_be_sent_by = originator_y
.into_iter()
.chain(originator_x)
.map(|val| (*val, peer_ids[val.0 as usize]))
.collect_vec();
if is_neighbour {
to_be_sent_by.push((ValidatorIndex(current_validator_index), peer_ids[0]));
}
to_be_sent_by
}
@@ -0,0 +1,66 @@
// Copyright (C) Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use crate::approval::{LOG_TARGET, SLOT_DURATION_MILLIS};
use super::{ApprovalTestState, PastSystemClock};
use futures::FutureExt;
use polkadot_node_core_approval_voting::time::{slot_number_to_tick, Clock, TICK_DURATION_MILLIS};
use polkadot_node_subsystem::{overseer, SpawnedSubsystem, SubsystemError};
use polkadot_node_subsystem_types::messages::ChainSelectionMessage;
/// Mock ChainSelection subsystem used to answer request made by the approval-voting subsystem,
/// during benchmark. All the necessary information to answer the requests is stored in the `state`
pub struct MockChainSelection {
pub state: ApprovalTestState,
pub clock: PastSystemClock,
}
#[overseer::subsystem(ChainSelection, error=SubsystemError, prefix=self::overseer)]
impl<Context> MockChainSelection {
fn start(self, ctx: Context) -> SpawnedSubsystem {
let future = self.run(ctx).map(|_| Ok(())).boxed();
SpawnedSubsystem { name: "mock-chain-subsystem", future }
}
}
#[overseer::contextbounds(ChainSelection, prefix = self::overseer)]
impl MockChainSelection {
async fn run<Context>(self, mut ctx: Context) {
loop {
let msg = ctx.recv().await.expect("Should not fail");
match msg {
orchestra::FromOrchestra::Signal(_) => {},
orchestra::FromOrchestra::Communication { msg } =>
if let ChainSelectionMessage::Approved(hash) = msg {
let block_info = self.state.get_info_by_hash(hash);
let approved_number = block_info.block_number;
block_info.approved.store(true, std::sync::atomic::Ordering::SeqCst);
self.state
.last_approved_block
.store(approved_number, std::sync::atomic::Ordering::SeqCst);
let approved_in_tick = self.clock.tick_now() -
slot_number_to_tick(SLOT_DURATION_MILLIS, block_info.slot);
gum::info!(target: LOG_TARGET, ?hash, "Chain selection approved after {:} ms", approved_in_tick * TICK_DURATION_MILLIS);
},
}
}
}
}
File diff suppressed because it is too large Load Diff
@@ -0,0 +1,304 @@
// Copyright (C) Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use super::{ApprovalsOptions, BlockTestData, CandidateTestData};
use crate::core::configuration::TestAuthorities;
use itertools::Itertools;
use parity_scale_codec::{Decode, Encode};
use polkadot_node_network_protocol::v3 as protocol_v3;
use polkadot_primitives::{CandidateIndex, Hash, ValidatorIndex};
use sc_network::PeerId;
use std::collections::{HashMap, HashSet};
#[derive(Debug, Clone, Encode, Decode, PartialEq, Eq)]
pub struct TestMessageInfo {
/// The actual message
pub msg: protocol_v3::ApprovalDistributionMessage,
/// The list of peers that would sends this message in a real topology.
/// It includes both the peers that would send the message because of the topology
/// or because of randomly chosing so.
pub sent_by: Vec<ValidatorIndex>,
/// The tranche at which this message should be sent.
pub tranche: u32,
/// The block hash this message refers to.
pub block_hash: Hash,
}
impl std::hash::Hash for TestMessageInfo {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
match &self.msg {
protocol_v3::ApprovalDistributionMessage::Assignments(assignments) => {
for (assignment, candidates) in assignments {
(assignment.block_hash, assignment.validator).hash(state);
candidates.hash(state);
}
},
protocol_v3::ApprovalDistributionMessage::Approvals(approvals) => {
for approval in approvals {
(approval.block_hash, approval.validator).hash(state);
approval.candidate_indices.hash(state);
}
},
};
}
}
#[derive(Debug, Clone, Encode, Decode, PartialEq, Eq)]
/// A list of messages that depend of each-other, approvals cover one of the assignments and
/// vice-versa.
pub struct MessagesBundle {
pub assignments: Vec<TestMessageInfo>,
pub approvals: Vec<TestMessageInfo>,
}
impl MessagesBundle {
/// The tranche when this bundle can be sent correctly, so no assignments or approvals will be
/// from the future.
pub fn tranche_to_send(&self) -> u32 {
self.assignments
.iter()
.chain(self.approvals.iter())
.max_by(|a, b| a.tranche.cmp(&b.tranche))
.unwrap()
.tranche
}
/// The min tranche in the bundle.
pub fn min_tranche(&self) -> u32 {
self.assignments
.iter()
.chain(self.approvals.iter())
.min_by(|a, b| a.tranche.cmp(&b.tranche))
.unwrap()
.tranche
}
/// Tells if the bundle is needed for sending.
/// We either send it because we need more assignments and approvals to approve the candidates
/// or because we configured the test to send messages untill a given tranche.
pub fn should_send(
&self,
candidates_test_data: &HashMap<(Hash, CandidateIndex), CandidateTestData>,
options: &ApprovalsOptions,
) -> bool {
self.needed_for_approval(candidates_test_data) ||
(!options.stop_when_approved &&
self.min_tranche() <= options.last_considered_tranche)
}
/// Tells if the bundle is needed because we need more messages to approve the candidates.
pub fn needed_for_approval(
&self,
candidates_test_data: &HashMap<(Hash, CandidateIndex), CandidateTestData>,
) -> bool {
self.assignments
.iter()
.any(|message| message.needed_for_approval(candidates_test_data))
}
/// Mark the assignments in the bundle as sent.
pub fn record_sent_assignment(
&self,
candidates_test_data: &mut HashMap<(Hash, CandidateIndex), CandidateTestData>,
) {
self.assignments
.iter()
.for_each(|assignment| assignment.record_sent_assignment(candidates_test_data));
}
}
impl TestMessageInfo {
/// Tells if the message is an approval.
fn is_approval(&self) -> bool {
match self.msg {
protocol_v3::ApprovalDistributionMessage::Assignments(_) => false,
protocol_v3::ApprovalDistributionMessage::Approvals(_) => true,
}
}
/// Records an approval.
/// We use this to check after all messages have been processed that we didn't loose any
/// message.
pub fn record_vote(&self, state: &BlockTestData) {
if self.is_approval() {
match &self.msg {
protocol_v3::ApprovalDistributionMessage::Assignments(_) => todo!(),
protocol_v3::ApprovalDistributionMessage::Approvals(approvals) =>
for approval in approvals {
for candidate_index in approval.candidate_indices.iter_ones() {
state
.votes
.get(approval.validator.0 as usize)
.unwrap()
.get(candidate_index)
.unwrap()
.store(true, std::sync::atomic::Ordering::SeqCst);
}
},
}
}
}
/// Mark the assignments in the message as sent.
pub fn record_sent_assignment(
&self,
candidates_test_data: &mut HashMap<(Hash, CandidateIndex), CandidateTestData>,
) {
match &self.msg {
protocol_v3::ApprovalDistributionMessage::Assignments(assignments) => {
for (assignment, candidate_indices) in assignments {
for candidate_index in candidate_indices.iter_ones() {
let candidate_test_data = candidates_test_data
.get_mut(&(assignment.block_hash, candidate_index as CandidateIndex))
.unwrap();
candidate_test_data.mark_sent_assignment(self.tranche)
}
}
},
protocol_v3::ApprovalDistributionMessage::Approvals(_approvals) => todo!(),
}
}
/// Returns a list of candidates indicies in this message
pub fn candidate_indices(&self) -> HashSet<usize> {
let mut unique_candidate_indicies = HashSet::new();
match &self.msg {
protocol_v3::ApprovalDistributionMessage::Assignments(assignments) =>
for (_assignment, candidate_indices) in assignments {
for candidate_index in candidate_indices.iter_ones() {
unique_candidate_indicies.insert(candidate_index);
}
},
protocol_v3::ApprovalDistributionMessage::Approvals(approvals) =>
for approval in approvals {
for candidate_index in approval.candidate_indices.iter_ones() {
unique_candidate_indicies.insert(candidate_index);
}
},
}
unique_candidate_indicies
}
/// Marks this message as no-shows if the number of configured no-shows is above the registered
/// no-shows.
/// Returns true if the message is a no-show.
pub fn no_show_if_required(
&self,
assignments: &[TestMessageInfo],
candidates_test_data: &mut HashMap<(Hash, CandidateIndex), CandidateTestData>,
) -> bool {
let mut should_no_show = false;
if self.is_approval() {
let covered_candidates = assignments
.iter()
.map(|assignment| (assignment, assignment.candidate_indices()))
.collect_vec();
match &self.msg {
protocol_v3::ApprovalDistributionMessage::Assignments(_) => todo!(),
protocol_v3::ApprovalDistributionMessage::Approvals(approvals) => {
assert_eq!(approvals.len(), 1);
for approval in approvals {
should_no_show = should_no_show ||
approval.candidate_indices.iter_ones().all(|candidate_index| {
let candidate_test_data = candidates_test_data
.get_mut(&(
approval.block_hash,
candidate_index as CandidateIndex,
))
.unwrap();
let assignment = covered_candidates
.iter()
.find(|(_assignment, candidates)| {
candidates.contains(&candidate_index)
})
.unwrap();
candidate_test_data.should_no_show(assignment.0.tranche)
});
if should_no_show {
for candidate_index in approval.candidate_indices.iter_ones() {
let candidate_test_data = candidates_test_data
.get_mut(&(
approval.block_hash,
candidate_index as CandidateIndex,
))
.unwrap();
let assignment = covered_candidates
.iter()
.find(|(_assignment, candidates)| {
candidates.contains(&candidate_index)
})
.unwrap();
candidate_test_data.record_no_show(assignment.0.tranche)
}
}
}
},
}
}
should_no_show
}
/// Tells if a message is needed for approval
pub fn needed_for_approval(
&self,
candidates_test_data: &HashMap<(Hash, CandidateIndex), CandidateTestData>,
) -> bool {
match &self.msg {
protocol_v3::ApprovalDistributionMessage::Assignments(assignments) =>
assignments.iter().any(|(assignment, candidate_indices)| {
candidate_indices.iter_ones().any(|candidate_index| {
candidates_test_data
.get(&(assignment.block_hash, candidate_index as CandidateIndex))
.map(|data| data.should_send_tranche(self.tranche))
.unwrap_or_default()
})
}),
protocol_v3::ApprovalDistributionMessage::Approvals(approvals) =>
approvals.iter().any(|approval| {
approval.candidate_indices.iter_ones().any(|candidate_index| {
candidates_test_data
.get(&(approval.block_hash, candidate_index as CandidateIndex))
.map(|data| data.should_send_tranche(self.tranche))
.unwrap_or_default()
})
}),
}
}
/// Splits a message into multiple messages based on what peers should send this message.
/// It build a HashMap of messages that should be sent by each peer.
pub fn split_by_peer_id(
self,
authorities: &TestAuthorities,
) -> HashMap<(ValidatorIndex, PeerId), Vec<TestMessageInfo>> {
let mut result: HashMap<(ValidatorIndex, PeerId), Vec<TestMessageInfo>> = HashMap::new();
for validator_index in &self.sent_by {
let peer = authorities.peer_ids.get(validator_index.0 as usize).unwrap();
result.entry((*validator_index, *peer)).or_default().push(TestMessageInfo {
msg: self.msg.clone(),
sent_by: Default::default(),
tranche: self.tranche,
block_hash: self.block_hash,
});
}
result
}
}
@@ -14,6 +14,8 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use crate::core::mock::TestSyncOracle;
use super::*;
use polkadot_node_metrics::metrics::Metrics;
@@ -31,22 +33,10 @@ mod columns {
const TEST_CONFIG: Config = Config { col_data: columns::DATA, col_meta: columns::META };
struct DumbOracle;
impl sp_consensus::SyncOracle for DumbOracle {
fn is_major_syncing(&self) -> bool {
false
}
fn is_offline(&self) -> bool {
unimplemented!("oh no!")
}
}
pub fn new_av_store(dependencies: &TestEnvironmentDependencies) -> AvailabilityStoreSubsystem {
let metrics = Metrics::try_register(&dependencies.registry).unwrap();
AvailabilityStoreSubsystem::new(test_store(), TEST_CONFIG, Box::new(DumbOracle), metrics)
AvailabilityStoreSubsystem::new(test_store(), TEST_CONFIG, Box::new(TestSyncOracle {}), metrics)
}
fn test_store() -> Arc<dyn Database> {
@@ -25,7 +25,7 @@ use polkadot_node_subsystem_types::{
messages::{AvailabilityStoreMessage, NetworkBridgeEvent},
Span,
};
use polkadot_overseer::Handle as OverseerHandle;
use polkadot_overseer::{metrics::Metrics as OverseerMetrics, Handle as OverseerHandle};
use sc_network::{request_responses::ProtocolConfig, PeerId};
use sp_core::H256;
use std::{collections::HashMap, iter::Cycle, ops::Sub, sync::Arc, time::Instant};
@@ -85,9 +85,12 @@ fn build_overseer_for_availability_read(
av_store: MockAvailabilityStore,
network_bridge: (MockNetworkBridgeTx, MockNetworkBridgeRx),
availability_recovery: AvailabilityRecoverySubsystem,
dependencies: &TestEnvironmentDependencies,
) -> (Overseer<SpawnGlue<SpawnTaskHandle>, AlwaysSupportsParachains>, OverseerHandle) {
let overseer_connector = OverseerConnector::with_event_capacity(64000);
let dummy = dummy_builder!(spawn_task_handle);
let overseer_metrics = OverseerMetrics::try_register(&dependencies.registry).unwrap();
let dummy = dummy_builder!(spawn_task_handle, overseer_metrics);
let builder = dummy
.replace_runtime_api(|_| runtime_api)
.replace_availability_store(|_| av_store)
@@ -101,6 +104,7 @@ fn build_overseer_for_availability_read(
(overseer, OverseerHandle::new(raw_handle))
}
#[allow(clippy::too_many_arguments)]
fn build_overseer_for_availability_write(
spawn_task_handle: SpawnTaskHandle,
runtime_api: MockRuntimeApi,
@@ -109,9 +113,12 @@ fn build_overseer_for_availability_write(
chain_api: MockChainApi,
availability_store: AvailabilityStoreSubsystem,
bitfield_distribution: BitfieldDistribution,
dependencies: &TestEnvironmentDependencies,
) -> (Overseer<SpawnGlue<SpawnTaskHandle>, AlwaysSupportsParachains>, OverseerHandle) {
let overseer_connector = OverseerConnector::with_event_capacity(64000);
let dummy = dummy_builder!(spawn_task_handle);
let overseer_metrics = OverseerMetrics::try_register(&dependencies.registry).unwrap();
let dummy = dummy_builder!(spawn_task_handle, overseer_metrics);
let builder = dummy
.replace_runtime_api(|_| runtime_api)
.replace_availability_store(|_| availability_store)
@@ -171,6 +178,9 @@ fn prepare_test_inner(
config.clone(),
test_authorities.clone(),
candidate_hashes,
Default::default(),
Default::default(),
0,
);
let availability_state = NetworkAvailabilityState {
@@ -198,6 +208,7 @@ fn prepare_test_inner(
let network_bridge_tx = network_bridge::MockNetworkBridgeTx::new(
network.clone(),
network_interface.subsystem_sender(),
test_authorities.clone(),
);
let network_bridge_rx =
@@ -231,6 +242,7 @@ fn prepare_test_inner(
av_store,
(network_bridge_tx, network_bridge_rx),
subsystem,
&dependencies,
)
},
TestObjective::DataAvailabilityWrite => {
@@ -240,7 +252,7 @@ fn prepare_test_inner(
Metrics::try_register(&dependencies.registry).unwrap(),
);
let block_headers = (0..=config.num_blocks)
let block_headers = (1..=config.num_blocks)
.map(|block_number| {
(
Hash::repeat_byte(block_number as u8),
@@ -267,6 +279,7 @@ fn prepare_test_inner(
chain_api,
new_av_store(&dependencies),
bitfield_distribution,
&dependencies,
)
},
_ => {
@@ -614,9 +627,10 @@ pub async fn benchmark_availability_write(env: &mut TestEnvironment, mut state:
);
// Wait for all bitfields to be processed.
env.wait_until_metric_eq(
env.wait_until_metric(
"polkadot_parachain_received_availabilty_bitfields_total",
config.connected_count() * block_num,
None,
|value| value == (config.connected_count() * block_num) as f64,
)
.await;
+4
View File
@@ -14,6 +14,7 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use super::availability::DataAvailabilityReadOptions;
use crate::approval::ApprovalsOptions;
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize, clap::Parser)]
@@ -34,6 +35,9 @@ pub enum TestObjective {
DataAvailabilityWrite,
/// Run a test sequence specified in a file
TestSequence(TestSequenceOptions),
/// Benchmark the approval-voting and approval-distribution subsystems.
ApprovalVoting(ApprovalsOptions),
Unimplemented,
}
#[derive(Debug, clap::Parser)]
@@ -16,11 +16,14 @@
//
//! Test configuration definition and helpers.
use super::*;
use itertools::Itertools;
use keyring::Keyring;
use std::path::Path;
use sc_network::PeerId;
use sp_consensus_babe::AuthorityId;
use std::{collections::HashMap, path::Path};
pub use crate::cli::TestObjective;
use polkadot_primitives::{AuthorityDiscoveryId, ValidatorId};
use polkadot_primitives::{AssignmentId, AuthorityDiscoveryId, ValidatorId};
use rand::thread_rng;
use rand_distr::{Distribution, Normal, Uniform};
@@ -65,6 +68,25 @@ fn default_backing_group_size() -> usize {
5
}
// Default needed approvals
fn default_needed_approvals() -> usize {
30
}
fn default_zeroth_delay_tranche_width() -> usize {
0
}
fn default_relay_vrf_modulo_samples() -> usize {
6
}
fn default_n_delay_tranches() -> usize {
89
}
fn default_no_show_slots() -> usize {
3
}
/// The test input parameters
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct TestConfiguration {
@@ -74,6 +96,17 @@ pub struct TestConfiguration {
pub n_validators: usize,
/// Number of cores
pub n_cores: usize,
/// The number of needed votes to approve a candidate.
#[serde(default = "default_needed_approvals")]
pub needed_approvals: usize,
#[serde(default = "default_zeroth_delay_tranche_width")]
pub zeroth_delay_tranche_width: usize,
#[serde(default = "default_relay_vrf_modulo_samples")]
pub relay_vrf_modulo_samples: usize,
#[serde(default = "default_n_delay_tranches")]
pub n_delay_tranches: usize,
#[serde(default = "default_no_show_slots")]
pub no_show_slots: usize,
/// Maximum backing group size
#[serde(default = "default_backing_group_size")]
pub max_validators_per_core: usize,
@@ -139,6 +172,11 @@ pub struct TestAuthorities {
pub keyring: Keyring,
pub validator_public: Vec<ValidatorId>,
pub validator_authority_id: Vec<AuthorityDiscoveryId>,
pub validator_babe_id: Vec<AuthorityId>,
pub validator_assignment_id: Vec<AssignmentId>,
pub key_seeds: Vec<String>,
pub peer_ids: Vec<PeerId>,
pub peer_id_to_authority: HashMap<PeerId, AuthorityDiscoveryId>,
}
impl TestConfiguration {
@@ -162,18 +200,45 @@ impl TestConfiguration {
pub fn generate_authorities(&self) -> TestAuthorities {
let keyring = Keyring::default();
let keys = (0..self.n_validators)
.map(|peer_index| keyring.sr25519_new(format!("Node{}", peer_index)))
let key_seeds = (0..self.n_validators)
.map(|peer_index| format!("//Node{}", peer_index))
.collect_vec();
let keys = key_seeds
.iter()
.map(|seed| keyring.sr25519_new(seed.as_str()))
.collect::<Vec<_>>();
// Generate `AuthorityDiscoveryId`` for each peer
// Generate keys and peers ids in each of the format needed by the tests.
let validator_public: Vec<ValidatorId> =
keys.iter().map(|key| (*key).into()).collect::<Vec<_>>();
let validator_authority_id: Vec<AuthorityDiscoveryId> =
keys.iter().map(|key| (*key).into()).collect::<Vec<_>>();
TestAuthorities { keyring, validator_public, validator_authority_id }
let validator_babe_id: Vec<AuthorityId> =
keys.iter().map(|key| (*key).into()).collect::<Vec<_>>();
let validator_assignment_id: Vec<AssignmentId> =
keys.iter().map(|key| (*key).into()).collect::<Vec<_>>();
let peer_ids: Vec<PeerId> = keys.iter().map(|_| PeerId::random()).collect::<Vec<_>>();
let peer_id_to_authority = peer_ids
.iter()
.zip(validator_authority_id.iter())
.map(|(peer_id, authorithy_id)| (*peer_id, authorithy_id.clone()))
.collect();
TestAuthorities {
keyring,
validator_public,
validator_authority_id,
peer_ids,
validator_babe_id,
validator_assignment_id,
key_seeds,
peer_id_to_authority,
}
}
/// An unconstrained standard configuration matching Polkadot/Kusama
@@ -199,6 +264,11 @@ impl TestConfiguration {
min_pov_size,
max_pov_size,
connectivity: 100,
needed_approvals: default_needed_approvals(),
n_delay_tranches: default_n_delay_tranches(),
no_show_slots: default_no_show_slots(),
relay_vrf_modulo_samples: default_relay_vrf_modulo_samples(),
zeroth_delay_tranche_width: default_zeroth_delay_tranche_width(),
}
}
@@ -223,6 +293,11 @@ impl TestConfiguration {
min_pov_size,
max_pov_size,
connectivity: 95,
needed_approvals: default_needed_approvals(),
n_delay_tranches: default_n_delay_tranches(),
no_show_slots: default_no_show_slots(),
relay_vrf_modulo_samples: default_relay_vrf_modulo_samples(),
zeroth_delay_tranche_width: default_zeroth_delay_tranche_width(),
}
}
@@ -247,6 +322,11 @@ impl TestConfiguration {
min_pov_size,
max_pov_size,
connectivity: 67,
needed_approvals: default_needed_approvals(),
n_delay_tranches: default_n_delay_tranches(),
no_show_slots: default_no_show_slots(),
relay_vrf_modulo_samples: default_relay_vrf_modulo_samples(),
zeroth_delay_tranche_width: default_zeroth_delay_tranche_width(),
}
}
}
@@ -26,7 +26,7 @@ use prometheus::{
};
use std::fmt::Display;
#[derive(Default)]
#[derive(Default, Debug)]
pub struct MetricCollection(Vec<TestMetric>);
impl From<Vec<TestMetric>> for MetricCollection {
@@ -49,6 +49,11 @@ impl MetricCollection {
.sum()
}
/// Tells if entries in bucket metric is lower than `value`
pub fn metric_lower_than(&self, metric_name: &str, value: f64) -> bool {
self.sum_by(metric_name) < value
}
pub fn subset_with_label_value(&self, label_name: &str, label_value: &str) -> MetricCollection {
self.0
.iter()
@@ -163,7 +168,7 @@ pub fn parse_metrics(registry: &Registry) -> MetricCollection {
name: h_name,
label_names,
label_values,
value: h.get_sample_sum(),
value: h.get_sample_count() as f64,
});
},
MetricType::SUMMARY => {
@@ -243,6 +243,11 @@ impl TestEnvironment {
&self.network
}
/// Returns a reference to the overseer handle.
pub fn overseer_handle(&self) -> &OverseerHandle {
&self.overseer_handle
}
/// Returns the Prometheus registry.
pub fn registry(&self) -> &Registry {
&self.dependencies.registry
@@ -311,18 +316,32 @@ impl TestEnvironment {
self.overseer_handle.stop().await;
}
/// Blocks until `metric_name` == `value`
pub async fn wait_until_metric_eq(&self, metric_name: &str, value: usize) {
let value = value as f64;
/// Tells if entries in bucket metric is lower than `value`
pub fn metric_lower_than(registry: &Registry, metric_name: &str, value: f64) -> bool {
let test_metrics = super::display::parse_metrics(registry);
test_metrics.metric_lower_than(metric_name, value)
}
/// Blocks until `metric_name` >= `value`
pub async fn wait_until_metric(
&self,
metric_name: &str,
label: Option<(&str, &str)>,
condition: impl Fn(f64) -> bool,
) {
loop {
let test_metrics = super::display::parse_metrics(self.registry());
let test_metrics = if let Some((label_name, label_value)) = label {
super::display::parse_metrics(self.registry())
.subset_with_label_value(label_name, label_value)
} else {
super::display::parse_metrics(self.registry())
};
let current_value = test_metrics.sum_by(metric_name);
gum::debug!(target: LOG_TARGET, metric_name, current_value, value, "Waiting for metric");
if current_value == value {
gum::debug!(target: LOG_TARGET, metric_name, current_value, "Waiting for metric");
if condition(current_value) {
break
}
// Check value every 50ms.
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
}
@@ -34,13 +34,17 @@ impl Default for Keyring {
}
impl Keyring {
pub fn sr25519_new(&self, name: String) -> Public {
pub fn sr25519_new(&self, seed: &str) -> Public {
self.keystore
.sr25519_generate_new(ValidatorId::ID, Some(&format!("//{}", name)))
.sr25519_generate_new(ValidatorId::ID, Some(seed))
.expect("Insert key into keystore")
}
pub fn keystore(&self) -> Arc<dyn Keystore> {
self.keystore.clone()
}
pub fn keystore_ref(&self) -> &LocalKeystore {
self.keystore.as_ref()
}
}
@@ -16,6 +16,7 @@
//!
//! A generic runtime api subsystem mockup suitable to be used in benchmarks.
use itertools::Itertools;
use polkadot_primitives::Header;
use polkadot_node_subsystem::{
@@ -38,6 +39,12 @@ pub struct MockChainApi {
state: ChainApiState,
}
impl ChainApiState {
fn get_header_by_number(&self, requested_number: u32) -> Option<&Header> {
self.block_headers.values().find(|header| header.number == requested_number)
}
}
impl MockChainApi {
pub fn new(state: ChainApiState) -> MockChainApi {
Self { state }
@@ -77,9 +84,44 @@ impl MockChainApi {
.expect("Relay chain block hashes are known"),
)));
},
ChainApiMessage::Ancestors { hash: _hash, k: _k, response_channel } => {
// For our purposes, no ancestors is fine.
let _ = response_channel.send(Ok(Vec::new()));
ChainApiMessage::FinalizedBlockNumber(val) => {
val.send(Ok(0)).unwrap();
},
ChainApiMessage::FinalizedBlockHash(requested_number, sender) => {
let hash = self
.state
.get_header_by_number(requested_number)
.expect("Unknow block number")
.hash();
sender.send(Ok(Some(hash))).unwrap();
},
ChainApiMessage::BlockNumber(requested_hash, sender) => {
sender
.send(Ok(Some(
self.state
.block_headers
.get(&requested_hash)
.expect("Unknown block hash")
.number,
)))
.unwrap();
},
ChainApiMessage::Ancestors { hash, k: _, response_channel } => {
let block_number = self
.state
.block_headers
.get(&hash)
.expect("Unknown block hash")
.number;
let ancestors = self
.state
.block_headers
.iter()
.filter(|(_, header)| header.number < block_number)
.sorted_by(|a, b| a.1.number.cmp(&b.1.number))
.map(|(hash, _)| *hash)
.collect_vec();
response_channel.send(Ok(ancestors)).unwrap();
},
_ => {
unimplemented!("Unexpected chain-api message")
@@ -37,7 +37,7 @@ impl HeadSupportsParachains for AlwaysSupportsParachains {
// An orchestra with dummy subsystems
macro_rules! dummy_builder {
($spawn_task_handle: ident) => {{
($spawn_task_handle: ident, $metrics: ident) => {{
use super::core::mock::dummy::*;
// Initialize a mock overseer.
@@ -69,10 +69,24 @@ macro_rules! dummy_builder {
.activation_external_listeners(Default::default())
.span_per_active_leaf(Default::default())
.active_leaves(Default::default())
.metrics(Default::default())
.metrics($metrics)
.supports_parachains(AlwaysSupportsParachains {})
.spawner(SpawnGlue($spawn_task_handle))
}};
}
pub(crate) use dummy_builder;
use sp_consensus::SyncOracle;
#[derive(Clone)]
pub struct TestSyncOracle {}
impl SyncOracle for TestSyncOracle {
fn is_major_syncing(&self) -> bool {
false
}
fn is_offline(&self) -> bool {
unimplemented!("not used by subsystem benchmarks")
}
}
@@ -18,11 +18,11 @@
//! the emulated network.
use futures::{channel::mpsc::UnboundedSender, FutureExt, StreamExt};
use polkadot_node_subsystem_types::{
messages::{BitfieldDistributionMessage, NetworkBridgeEvent},
messages::{ApprovalDistributionMessage, BitfieldDistributionMessage, NetworkBridgeEvent},
OverseerSignal,
};
use sc_network::{request_responses::ProtocolConfig, PeerId, RequestFailure};
use sc_network::{request_responses::ProtocolConfig, RequestFailure};
use polkadot_node_subsystem::{
messages::NetworkBridgeTxMessage, overseer, SpawnedSubsystem, SubsystemError,
@@ -30,8 +30,9 @@ use polkadot_node_subsystem::{
use polkadot_node_network_protocol::Versioned;
use crate::core::network::{
NetworkEmulatorHandle, NetworkInterfaceReceiver, NetworkMessage, RequestExt,
use crate::core::{
configuration::TestAuthorities,
network::{NetworkEmulatorHandle, NetworkInterfaceReceiver, NetworkMessage, RequestExt},
};
const LOG_TARGET: &str = "subsystem-bench::network-bridge";
@@ -44,6 +45,8 @@ pub struct MockNetworkBridgeTx {
network: NetworkEmulatorHandle,
/// A channel to the network interface,
to_network_interface: UnboundedSender<NetworkMessage>,
/// Test authorithies
test_authorithies: TestAuthorities,
}
/// A mock of the network bridge tx subsystem.
@@ -58,8 +61,9 @@ impl MockNetworkBridgeTx {
pub fn new(
network: NetworkEmulatorHandle,
to_network_interface: UnboundedSender<NetworkMessage>,
test_authorithies: TestAuthorities,
) -> MockNetworkBridgeTx {
Self { network, to_network_interface }
Self { network, to_network_interface, test_authorithies }
}
}
@@ -126,9 +130,21 @@ impl MockNetworkBridgeTx {
NetworkBridgeTxMessage::ReportPeer(_) => {
// ingore rep changes
},
_ => {
unimplemented!("Unexpected network bridge message")
NetworkBridgeTxMessage::SendValidationMessage(peers, message) => {
for peer in peers {
self.to_network_interface
.unbounded_send(NetworkMessage::MessageFromNode(
self.test_authorithies
.peer_id_to_authority
.get(&peer)
.unwrap()
.clone(),
message.clone(),
))
.expect("Should not fail");
}
},
_ => unimplemented!("Unexpected network bridge message"),
},
}
}
@@ -145,16 +161,23 @@ impl MockNetworkBridgeRx {
maybe_peer_message = from_network_interface.next() => {
if let Some(message) = maybe_peer_message {
match message {
NetworkMessage::MessageFromPeer(message) => match message {
NetworkMessage::MessageFromPeer(peer_id, message) => match message {
Versioned::V2(
polkadot_node_network_protocol::v2::ValidationProtocol::BitfieldDistribution(
bitfield,
),
) => {
ctx.send_message(
BitfieldDistributionMessage::NetworkBridgeUpdate(NetworkBridgeEvent::PeerMessage(PeerId::random(), polkadot_node_network_protocol::Versioned::V2(bitfield)))
BitfieldDistributionMessage::NetworkBridgeUpdate(NetworkBridgeEvent::PeerMessage(peer_id, polkadot_node_network_protocol::Versioned::V2(bitfield)))
).await;
},
Versioned::V3(
polkadot_node_network_protocol::v3::ValidationProtocol::ApprovalDistribution(msg)
) => {
ctx.send_message(
ApprovalDistributionMessage::NetworkBridgeUpdate(NetworkBridgeEvent::PeerMessage(peer_id, polkadot_node_network_protocol::Versioned::V3(msg)))
).await;
}
_ => {
unimplemented!("We only talk v2 network protocol")
},
@@ -16,8 +16,10 @@
//!
//! A generic runtime api subsystem mockup suitable to be used in benchmarks.
use itertools::Itertools;
use polkadot_primitives::{
CandidateReceipt, CoreState, GroupIndex, IndexedVec, OccupiedCore, SessionInfo, ValidatorIndex,
vstaging::NodeFeatures, CandidateEvent, CandidateReceipt, CoreState, GroupIndex, IndexedVec,
OccupiedCore, SessionIndex, SessionInfo, ValidatorIndex,
};
use bitvec::prelude::BitVec;
@@ -26,6 +28,7 @@ use polkadot_node_subsystem::{
overseer, SpawnedSubsystem, SubsystemError,
};
use polkadot_node_subsystem_types::OverseerSignal;
use sp_consensus_babe::Epoch as BabeEpoch;
use sp_core::H256;
use std::collections::HashMap;
@@ -38,8 +41,13 @@ const LOG_TARGET: &str = "subsystem-bench::runtime-api-mock";
pub struct RuntimeApiState {
// All authorities in the test,
authorities: TestAuthorities,
// Candidate
// Candidate hashes per block
candidate_hashes: HashMap<H256, Vec<CandidateReceipt>>,
// Included candidates per bock
included_candidates: HashMap<H256, Vec<CandidateEvent>>,
babe_epoch: Option<BabeEpoch>,
// The session child index,
session_index: SessionIndex,
}
/// A mocked `runtime-api` subsystem.
@@ -53,34 +61,57 @@ impl MockRuntimeApi {
config: TestConfiguration,
authorities: TestAuthorities,
candidate_hashes: HashMap<H256, Vec<CandidateReceipt>>,
included_candidates: HashMap<H256, Vec<CandidateEvent>>,
babe_epoch: Option<BabeEpoch>,
session_index: SessionIndex,
) -> MockRuntimeApi {
Self { state: RuntimeApiState { authorities, candidate_hashes }, config }
Self {
state: RuntimeApiState {
authorities,
candidate_hashes,
included_candidates,
babe_epoch,
session_index,
},
config,
}
}
fn session_info(&self) -> SessionInfo {
let all_validators = (0..self.config.n_validators)
.map(|i| ValidatorIndex(i as _))
.collect::<Vec<_>>();
session_info_for_peers(&self.config, &self.state.authorities)
}
}
let validator_groups = all_validators
.chunks(self.config.max_validators_per_core)
.map(Vec::from)
.collect::<Vec<_>>();
SessionInfo {
validators: self.state.authorities.validator_public.clone().into(),
discovery_keys: self.state.authorities.validator_authority_id.clone(),
validator_groups: IndexedVec::<GroupIndex, Vec<ValidatorIndex>>::from(validator_groups),
assignment_keys: vec![],
n_cores: self.config.n_cores as u32,
zeroth_delay_tranche_width: 0,
relay_vrf_modulo_samples: 0,
n_delay_tranches: 0,
no_show_slots: 0,
needed_approvals: 0,
active_validator_indices: vec![],
dispute_period: 6,
random_seed: [0u8; 32],
}
/// Generates a test session info with all passed authorities as consensus validators.
pub fn session_info_for_peers(
configuration: &TestConfiguration,
authorities: &TestAuthorities,
) -> SessionInfo {
let all_validators = (0..configuration.n_validators)
.map(|i| ValidatorIndex(i as _))
.collect::<Vec<_>>();
let validator_groups = all_validators
.chunks(configuration.max_validators_per_core)
.map(Vec::from)
.collect::<Vec<_>>();
SessionInfo {
validators: authorities.validator_public.iter().cloned().collect(),
discovery_keys: authorities.validator_authority_id.to_vec(),
assignment_keys: authorities.validator_assignment_id.to_vec(),
validator_groups: IndexedVec::<GroupIndex, Vec<ValidatorIndex>>::from(validator_groups),
n_cores: configuration.n_cores as u32,
needed_approvals: configuration.needed_approvals as u32,
zeroth_delay_tranche_width: configuration.zeroth_delay_tranche_width as u32,
relay_vrf_modulo_samples: configuration.relay_vrf_modulo_samples as u32,
n_delay_tranches: configuration.n_delay_tranches as u32,
no_show_slots: configuration.no_show_slots as u32,
active_validator_indices: (0..authorities.validator_authority_id.len())
.map(|index| ValidatorIndex(index as u32))
.collect_vec(),
dispute_period: 6,
random_seed: [0u8; 32],
}
}
@@ -110,6 +141,13 @@ impl MockRuntimeApi {
gum::debug!(target: LOG_TARGET, msg=?msg, "recv message");
match msg {
RuntimeApiMessage::Request(
request,
RuntimeApiRequest::CandidateEvents(sender),
) => {
let candidate_events = self.state.included_candidates.get(&request);
let _ = sender.send(Ok(candidate_events.cloned().unwrap_or_default()));
},
RuntimeApiMessage::Request(
_block_hash,
RuntimeApiRequest::SessionInfo(_session_index, sender),
@@ -122,6 +160,12 @@ impl MockRuntimeApi {
) => {
let _ = sender.send(Ok(Some(Default::default())));
},
RuntimeApiMessage::Request(
_request,
RuntimeApiRequest::NodeFeatures(_session_index, sender),
) => {
let _ = sender.send(Ok(NodeFeatures::EMPTY));
},
RuntimeApiMessage::Request(
_block_hash,
RuntimeApiRequest::Validators(sender),
@@ -129,18 +173,12 @@ impl MockRuntimeApi {
let _ =
sender.send(Ok(self.state.authorities.validator_public.clone()));
},
RuntimeApiMessage::Request(
_block_hash,
RuntimeApiRequest::CandidateEvents(sender),
) => {
let _ = sender.send(Ok(Default::default()));
},
RuntimeApiMessage::Request(
_block_hash,
RuntimeApiRequest::SessionIndexForChild(sender),
) => {
// Session is always the same.
let _ = sender.send(Ok(0));
let _ = sender.send(Ok(self.state.session_index));
},
RuntimeApiMessage::Request(
block_hash,
@@ -176,10 +214,14 @@ impl MockRuntimeApi {
let _ = sender.send(Ok(cores));
},
RuntimeApiMessage::Request(
_block_hash,
RuntimeApiRequest::NodeFeatures(_session_index, sender),
_request,
RuntimeApiRequest::CurrentBabeEpoch(sender),
) => {
let _ = sender.send(Ok(Default::default()));
let _ = sender.send(Ok(self
.state
.babe_epoch
.clone()
.expect("Babe epoch unpopulated")));
},
// Long term TODO: implement more as needed.
message => {
@@ -15,6 +15,8 @@
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
const LOG_TARGET: &str = "subsystem-bench::core";
// The validator index that represent the node that is under test.
pub const NODE_UNDER_TEST: u32 = 0;
pub mod configuration;
pub mod display;
@@ -10,7 +10,6 @@
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//!
@@ -48,17 +47,21 @@ use futures::{
stream::FuturesUnordered,
};
use itertools::Itertools;
use net_protocol::{
peer_set::{ProtocolVersion, ValidationVersion},
request_response::{Recipient, Requests, ResponseSender},
VersionedValidationProtocol,
ObservedRole, VersionedValidationProtocol,
};
use parity_scale_codec::Encode;
use polkadot_node_subsystem_types::messages::{ApprovalDistributionMessage, NetworkBridgeEvent};
use polkadot_overseer::AllMessages;
use polkadot_primitives::AuthorityDiscoveryId;
use prometheus_endpoint::U64;
use rand::{seq::SliceRandom, thread_rng};
use sc_network::{
request_responses::{IncomingRequest, OutgoingResponse},
RequestFailure,
PeerId, RequestFailure,
};
use sc_service::SpawnTaskHandle;
use std::{
@@ -142,7 +145,7 @@ impl RateLimit {
/// peer(`AuthorityDiscoveryId``).
pub enum NetworkMessage {
/// A gossip message from peer to node.
MessageFromPeer(VersionedValidationProtocol),
MessageFromPeer(PeerId, VersionedValidationProtocol),
/// A gossip message from node to a peer.
MessageFromNode(AuthorityDiscoveryId, VersionedValidationProtocol),
/// A request originating from our node
@@ -155,9 +158,9 @@ impl NetworkMessage {
/// Returns the size of the encoded message or request
pub fn size(&self) -> usize {
match &self {
NetworkMessage::MessageFromPeer(Versioned::V2(message)) => message.encoded_size(),
NetworkMessage::MessageFromPeer(Versioned::V1(message)) => message.encoded_size(),
NetworkMessage::MessageFromPeer(Versioned::V3(message)) => message.encoded_size(),
NetworkMessage::MessageFromPeer(_, Versioned::V2(message)) => message.encoded_size(),
NetworkMessage::MessageFromPeer(_, Versioned::V1(message)) => message.encoded_size(),
NetworkMessage::MessageFromPeer(_, Versioned::V3(message)) => message.encoded_size(),
NetworkMessage::MessageFromNode(_peer_id, Versioned::V2(message)) =>
message.encoded_size(),
NetworkMessage::MessageFromNode(_peer_id, Versioned::V1(message)) =>
@@ -430,6 +433,7 @@ pub struct EmulatedPeerHandle {
messages_tx: UnboundedSender<NetworkMessage>,
/// Send actions to be performed by the peer.
actions_tx: UnboundedSender<NetworkMessage>,
peer_id: PeerId,
}
impl EmulatedPeerHandle {
@@ -441,7 +445,7 @@ impl EmulatedPeerHandle {
/// Send a message to the node.
pub fn send_message(&self, message: VersionedValidationProtocol) {
self.actions_tx
.unbounded_send(NetworkMessage::MessageFromPeer(message))
.unbounded_send(NetworkMessage::MessageFromPeer(self.peer_id, message))
.expect("Peer action channel hangup");
}
@@ -613,6 +617,7 @@ pub fn new_peer(
stats: Arc<PeerEmulatorStats>,
to_network_interface: UnboundedSender<NetworkMessage>,
latency_ms: usize,
peer_id: PeerId,
) -> EmulatedPeerHandle {
let (messages_tx, messages_rx) = mpsc::unbounded::<NetworkMessage>();
let (actions_tx, actions_rx) = mpsc::unbounded::<NetworkMessage>();
@@ -641,7 +646,7 @@ pub fn new_peer(
.boxed(),
);
EmulatedPeerHandle { messages_tx, actions_tx }
EmulatedPeerHandle { messages_tx, actions_tx, peer_id }
}
/// Book keeping of sent and received bytes.
@@ -719,6 +724,28 @@ pub struct NetworkEmulatorHandle {
validator_authority_ids: HashMap<AuthorityDiscoveryId, usize>,
}
impl NetworkEmulatorHandle {
/// Generates peer_connected messages for all peers in `test_authorities`
pub fn generate_peer_connected(&self) -> Vec<AllMessages> {
self.peers
.iter()
.filter(|peer| peer.is_connected())
.map(|peer| {
let network = NetworkBridgeEvent::PeerConnected(
peer.handle().peer_id,
ObservedRole::Full,
ProtocolVersion::from(ValidationVersion::V3),
None,
);
AllMessages::ApprovalDistribution(ApprovalDistributionMessage::NetworkBridgeUpdate(
network,
))
})
.collect_vec()
}
}
/// Create a new emulated network based on `config`.
/// Each emulated peer will run the specified `handlers` to process incoming messages.
pub fn new_network(
@@ -753,6 +780,7 @@ pub fn new_network(
stats,
to_network_interface.clone(),
random_latency(config.latency.as_ref()),
*authorities.peer_ids.get(peer_index).unwrap(),
)),
)
})
@@ -760,10 +788,14 @@ pub fn new_network(
let connected_count = config.connected_count();
let (_connected, to_disconnect) = peers.partial_shuffle(&mut thread_rng(), connected_count);
let mut peers_indicies = (0..n_peers).collect_vec();
let (_connected, to_disconnect) =
peers_indicies.partial_shuffle(&mut thread_rng(), connected_count);
for peer in to_disconnect {
peer.disconnect();
// Node under test is always mark as disconnected.
peers[NODE_UNDER_TEST as usize].disconnect();
for peer in to_disconnect.iter().skip(1) {
peers[*peer].disconnect();
}
gum::info!(target: LOG_TARGET, "{}",format!("Network created, connected validator count {}", connected_count).bright_black());
@@ -786,6 +818,7 @@ pub fn new_network(
}
/// Errors that can happen when sending data to emulated peers.
#[derive(Clone, Debug)]
pub enum EmulatedPeerError {
NotConnected,
}
@@ -26,6 +26,7 @@ use pyroscope_pprofrs::{pprof_backend, PprofConfig};
use std::path::Path;
pub(crate) mod approval;
pub(crate) mod availability;
pub(crate) mod cli;
pub(crate) mod core;
@@ -43,7 +44,7 @@ use core::{
use clap_num::number_range;
use crate::core::display::display_configuration;
use crate::{approval::bench_approvals, core::display::display_configuration};
fn le_100(s: &str) -> Result<usize, String> {
number_range(s, 0, 100)
@@ -174,6 +175,12 @@ impl BenchCli {
&mut env, state,
));
},
TestObjective::ApprovalVoting(ref options) => {
let (mut env, state) =
approval::prepare_test(test_config.clone(), options.clone());
env.runtime().block_on(bench_approvals(&mut env, state));
},
TestObjective::DataAvailabilityWrite => {
let mut state = TestState::new(&test_config);
let (mut env, _protocol_config) = prepare_test(test_config, &mut state);
@@ -181,13 +188,16 @@ impl BenchCli {
&mut env, state,
));
},
_ => gum::error!("Invalid test objective in sequence"),
TestObjective::TestSequence(_) => todo!(),
TestObjective::Unimplemented => todo!(),
}
}
return Ok(())
},
TestObjective::DataAvailabilityRead(ref _options) => self.create_test_configuration(),
TestObjective::DataAvailabilityWrite => self.create_test_configuration(),
TestObjective::ApprovalVoting(_) => todo!(),
TestObjective::Unimplemented => todo!(),
};
let mut latency_config = test_config.latency.clone().unwrap_or_default();
@@ -232,6 +242,8 @@ impl BenchCli {
.block_on(availability::benchmark_availability_write(&mut env, state));
},
TestObjective::TestSequence(_options) => {},
TestObjective::ApprovalVoting(_) => todo!(),
TestObjective::Unimplemented => todo!(),
}
if let Some(agent_running) = agent_running {