[subsystem-benchmarks] Fix availability-write regression tests (#3698)

Adds availability-write regression tests.
The results for the `availability-distribution` subsystem are volatile,
so I had to reduce the precision of the test.
This commit is contained in:
Andrei Eres
2024-03-25 16:57:46 +01:00
committed by GitHub
parent 9d122401f1
commit cc1e6ac301
14 changed files with 436 additions and 459 deletions
@@ -136,31 +136,29 @@ impl BenchCli {
let usage = match objective {
TestObjective::DataAvailabilityRead(opts) => {
let mut state = availability::TestState::new(&test_config);
let state = availability::TestState::new(&test_config);
let (mut env, _protocol_config) = availability::prepare_test(
test_config,
&mut state,
&state,
availability::TestDataAvailability::Read(opts),
true,
);
env.runtime().block_on(availability::benchmark_availability_read(
&benchmark_name,
&mut env,
state,
&state,
))
},
TestObjective::DataAvailabilityWrite => {
let mut state = availability::TestState::new(&test_config);
let state = availability::TestState::new(&test_config);
let (mut env, _protocol_config) = availability::prepare_test(
test_config,
&mut state,
&state,
availability::TestDataAvailability::Write,
true,
);
env.runtime().block_on(availability::benchmark_availability_write(
&benchmark_name,
&mut env,
state,
&state,
))
},
TestObjective::ApprovalVoting(ref options) => {
@@ -15,11 +15,11 @@
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use crate::{
configuration::TestConfiguration,
availability::av_store_helpers::new_av_store,
dummy_builder,
environment::{TestEnvironment, TestEnvironmentDependencies, GENESIS_HASH},
mock::{
av_store::{self, MockAvailabilityStore},
av_store::{self, MockAvailabilityStore, NetworkAvailabilityState},
chain_api::{ChainApiState, MockChainApi},
network_bridge::{self, MockNetworkBridgeRx, MockNetworkBridgeTx},
runtime_api::{self, MockRuntimeApi},
@@ -28,12 +28,8 @@ use crate::{
network::new_network,
usage::BenchmarkUsage,
};
use av_store::NetworkAvailabilityState;
use av_store_helpers::new_av_store;
use bitvec::bitvec;
use colored::Colorize;
use futures::{channel::oneshot, stream::FuturesUnordered, StreamExt};
use itertools::Itertools;
use parity_scale_codec::Encode;
use polkadot_availability_bitfield_distribution::BitfieldDistribution;
use polkadot_availability_distribution::{
@@ -43,37 +39,27 @@ use polkadot_availability_recovery::AvailabilityRecoverySubsystem;
use polkadot_node_core_av_store::AvailabilityStoreSubsystem;
use polkadot_node_metrics::metrics::Metrics;
use polkadot_node_network_protocol::{
request_response::{v1::ChunkFetchingRequest, IncomingRequest, ReqProtocolNames},
OurView, Versioned, VersionedValidationProtocol,
request_response::{IncomingRequest, ReqProtocolNames},
OurView,
};
use polkadot_node_primitives::{AvailableData, BlockData, ErasureChunk, PoV};
use polkadot_node_subsystem::{
messages::{AllMessages, AvailabilityRecoveryMessage},
Overseer, OverseerConnector, SpawnGlue,
};
use polkadot_node_subsystem_test_helpers::{
derive_erasure_chunks_with_proofs_and_root, mock::new_block_import_info,
};
use polkadot_node_subsystem_types::{
messages::{AvailabilityStoreMessage, NetworkBridgeEvent},
Span,
};
use polkadot_overseer::{metrics::Metrics as OverseerMetrics, Handle as OverseerHandle};
use polkadot_primitives::{
AvailabilityBitfield, BlockNumber, CandidateHash, CandidateReceipt, GroupIndex, Hash, HeadData,
Header, PersistedValidationData, Signed, SigningContext, ValidatorIndex,
};
use polkadot_primitives_test_helpers::{dummy_candidate_receipt, dummy_hash};
use sc_network::{
request_responses::{IncomingRequest as RawIncomingRequest, ProtocolConfig},
PeerId,
};
use polkadot_primitives::GroupIndex;
use sc_network::request_responses::{IncomingRequest as RawIncomingRequest, ProtocolConfig};
use sc_service::SpawnTaskHandle;
use serde::{Deserialize, Serialize};
use sp_core::H256;
use std::{collections::HashMap, iter::Cycle, ops::Sub, sync::Arc, time::Instant};
use std::{ops::Sub, sync::Arc, time::Instant};
pub use test_state::TestState;
mod av_store_helpers;
mod test_state;
const LOG_TARGET: &str = "subsystem-bench::availability";
@@ -149,94 +135,48 @@ fn build_overseer_for_availability_write(
(overseer, OverseerHandle::new(raw_handle))
}
/// Takes a test configuration and uses it to create the `TestEnvironment`.
pub fn prepare_test(
config: TestConfiguration,
state: &mut TestState,
state: &TestState,
mode: TestDataAvailability,
with_prometheus_endpoint: bool,
) -> (TestEnvironment, Vec<ProtocolConfig>) {
prepare_test_inner(
config,
state,
mode,
TestEnvironmentDependencies::default(),
with_prometheus_endpoint,
)
}
fn prepare_test_inner(
config: TestConfiguration,
state: &mut TestState,
mode: TestDataAvailability,
dependencies: TestEnvironmentDependencies,
with_prometheus_endpoint: bool,
) -> (TestEnvironment, Vec<ProtocolConfig>) {
// Generate test authorities.
let test_authorities = config.generate_authorities();
let mut candidate_hashes: HashMap<H256, Vec<CandidateReceipt>> = HashMap::new();
// Prepare per block candidates.
// Genesis block is always finalized, so we start at 1.
for block_num in 1..=config.num_blocks {
for _ in 0..config.n_cores {
candidate_hashes
.entry(Hash::repeat_byte(block_num as u8))
.or_default()
.push(state.next_candidate().expect("Cycle iterator"))
}
// First candidate is our backed candidate.
state.backed_candidates.push(
candidate_hashes
.get(&Hash::repeat_byte(block_num as u8))
.expect("just inserted above")
.first()
.expect("just inserted above")
.clone(),
);
}
let runtime_api = runtime_api::MockRuntimeApi::new(
config.clone(),
test_authorities.clone(),
candidate_hashes,
Default::default(),
Default::default(),
0,
);
let (collation_req_receiver, collation_req_cfg) =
IncomingRequest::get_config_receiver(&ReqProtocolNames::new(GENESIS_HASH, None));
let (pov_req_receiver, pov_req_cfg) =
IncomingRequest::get_config_receiver(&ReqProtocolNames::new(GENESIS_HASH, None));
let (chunk_req_receiver, chunk_req_cfg) =
IncomingRequest::get_config_receiver(&ReqProtocolNames::new(GENESIS_HASH, None));
let req_cfgs = vec![collation_req_cfg, pov_req_cfg];
let dependencies = TestEnvironmentDependencies::default();
let availability_state = NetworkAvailabilityState {
candidate_hashes: state.candidate_hashes.clone(),
available_data: state.available_data.clone(),
chunks: state.chunks.clone(),
};
let mut req_cfgs = Vec::new();
let (collation_req_receiver, collation_req_cfg) =
IncomingRequest::get_config_receiver(&ReqProtocolNames::new(GENESIS_HASH, None));
req_cfgs.push(collation_req_cfg);
let (pov_req_receiver, pov_req_cfg) =
IncomingRequest::get_config_receiver(&ReqProtocolNames::new(GENESIS_HASH, None));
let (chunk_req_receiver, chunk_req_cfg) =
IncomingRequest::get_config_receiver(&ReqProtocolNames::new(GENESIS_HASH, None));
req_cfgs.push(pov_req_cfg);
let (network, network_interface, network_receiver) =
new_network(&config, &dependencies, &test_authorities, vec![Arc::new(availability_state)]);
let (network, network_interface, network_receiver) = new_network(
&state.config,
&dependencies,
&state.test_authorities,
vec![Arc::new(availability_state.clone())],
);
let network_bridge_tx = network_bridge::MockNetworkBridgeTx::new(
network.clone(),
network_interface.subsystem_sender(),
test_authorities.clone(),
state.test_authorities.clone(),
);
let network_bridge_rx =
network_bridge::MockNetworkBridgeRx::new(network_receiver, Some(chunk_req_cfg.clone()));
network_bridge::MockNetworkBridgeRx::new(network_receiver, Some(chunk_req_cfg));
let runtime_api = runtime_api::MockRuntimeApi::new(
state.config.clone(),
state.test_authorities.clone(),
state.candidate_receipts.clone(),
Default::default(),
Default::default(),
0,
);
let (overseer, overseer_handle) = match &mode {
TestDataAvailability::Read(options) => {
@@ -271,27 +211,12 @@ fn prepare_test_inner(
},
TestDataAvailability::Write => {
let availability_distribution = AvailabilityDistributionSubsystem::new(
test_authorities.keyring.keystore(),
state.test_authorities.keyring.keystore(),
IncomingRequestReceivers { pov_req_receiver, chunk_req_receiver },
Metrics::try_register(&dependencies.registry).unwrap(),
);
let block_headers = (1..=config.num_blocks)
.map(|block_number| {
(
Hash::repeat_byte(block_number as u8),
Header {
digest: Default::default(),
number: block_number as BlockNumber,
parent_hash: Default::default(),
extrinsics_root: Default::default(),
state_root: Default::default(),
},
)
})
.collect::<HashMap<_, _>>();
let chain_api_state = ChainApiState { block_headers };
let chain_api_state = ChainApiState { block_headers: state.block_headers.clone() };
let chain_api = MockChainApi::new(chain_api_state);
let bitfield_distribution =
BitfieldDistribution::new(Metrics::try_register(&dependencies.registry).unwrap());
@@ -311,167 +236,42 @@ fn prepare_test_inner(
(
TestEnvironment::new(
dependencies,
config,
state.config.clone(),
network,
overseer,
overseer_handle,
test_authorities,
state.test_authorities.clone(),
with_prometheus_endpoint,
),
req_cfgs,
)
}
#[derive(Clone)]
pub struct TestState {
// Full test configuration
config: TestConfiguration,
// A cycle iterator on all PoV sizes used in the test.
pov_sizes: Cycle<std::vec::IntoIter<usize>>,
// Generated candidate receipts to be used in the test
candidates: Cycle<std::vec::IntoIter<CandidateReceipt>>,
// Map from pov size to candidate index
pov_size_to_candidate: HashMap<usize, usize>,
// Map from generated candidate hashes to candidate index in `available_data`
// and `chunks`.
candidate_hashes: HashMap<CandidateHash, usize>,
// Per candidate index receipts.
candidate_receipt_templates: Vec<CandidateReceipt>,
// Per candidate index `AvailableData`
available_data: Vec<AvailableData>,
// Per candiadte index chunks
chunks: Vec<Vec<ErasureChunk>>,
// Per relay chain block - candidate backed by our backing group
backed_candidates: Vec<CandidateReceipt>,
}
impl TestState {
pub fn next_candidate(&mut self) -> Option<CandidateReceipt> {
let candidate = self.candidates.next();
let candidate_hash = candidate.as_ref().unwrap().hash();
gum::trace!(target: LOG_TARGET, "Next candidate selected {:?}", candidate_hash);
candidate
}
/// Generate candidates to be used in the test.
fn generate_candidates(&mut self) {
let count = self.config.n_cores * self.config.num_blocks;
gum::info!(target: LOG_TARGET,"{}", format!("Pre-generating {} candidates.", count).bright_blue());
// Generate all candidates
self.candidates = (0..count)
.map(|index| {
let pov_size = self.pov_sizes.next().expect("This is a cycle; qed");
let candidate_index = *self
.pov_size_to_candidate
.get(&pov_size)
.expect("pov_size always exists; qed");
let mut candidate_receipt =
self.candidate_receipt_templates[candidate_index].clone();
// Make it unique.
candidate_receipt.descriptor.relay_parent = Hash::from_low_u64_be(index as u64);
// Store the new candidate in the state
self.candidate_hashes.insert(candidate_receipt.hash(), candidate_index);
gum::debug!(target: LOG_TARGET, candidate_hash = ?candidate_receipt.hash(), "new candidate");
candidate_receipt
})
.collect::<Vec<_>>()
.into_iter()
.cycle();
}
pub fn new(config: &TestConfiguration) -> Self {
let config = config.clone();
let mut chunks = Vec::new();
let mut available_data = Vec::new();
let mut candidate_receipt_templates = Vec::new();
let mut pov_size_to_candidate = HashMap::new();
// we use it for all candidates.
let persisted_validation_data = PersistedValidationData {
parent_head: HeadData(vec![7, 8, 9]),
relay_parent_number: Default::default(),
max_pov_size: 1024,
relay_parent_storage_root: Default::default(),
};
// For each unique pov we create a candidate receipt.
for (index, pov_size) in config.pov_sizes().iter().cloned().unique().enumerate() {
gum::info!(target: LOG_TARGET, index, pov_size, "{}", "Generating template candidate".bright_blue());
let mut candidate_receipt = dummy_candidate_receipt(dummy_hash());
let pov = PoV { block_data: BlockData(vec![index as u8; pov_size]) };
let new_available_data = AvailableData {
validation_data: persisted_validation_data.clone(),
pov: Arc::new(pov),
};
let (new_chunks, erasure_root) = derive_erasure_chunks_with_proofs_and_root(
config.n_validators,
&new_available_data,
|_, _| {},
);
candidate_receipt.descriptor.erasure_root = erasure_root;
chunks.push(new_chunks);
available_data.push(new_available_data);
pov_size_to_candidate.insert(pov_size, index);
candidate_receipt_templates.push(candidate_receipt);
}
gum::info!(target: LOG_TARGET, "{}","Created test environment.".bright_blue());
let mut _self = Self {
available_data,
candidate_receipt_templates,
chunks,
pov_size_to_candidate,
pov_sizes: Vec::from(config.pov_sizes()).into_iter().cycle(),
candidate_hashes: HashMap::new(),
candidates: Vec::new().into_iter().cycle(),
backed_candidates: Vec::new(),
config,
};
_self.generate_candidates();
_self
}
pub fn backed_candidates(&mut self) -> &mut Vec<CandidateReceipt> {
&mut self.backed_candidates
}
}
pub async fn benchmark_availability_read(
benchmark_name: &str,
env: &mut TestEnvironment,
mut state: TestState,
state: &TestState,
) -> BenchmarkUsage {
let config = env.config().clone();
env.import_block(new_block_import_info(Hash::repeat_byte(1), 1)).await;
let test_start = Instant::now();
let mut batch = FuturesUnordered::new();
let mut availability_bytes = 0u128;
env.metrics().set_n_validators(config.n_validators);
env.metrics().set_n_cores(config.n_cores);
for block_num in 1..=env.config().num_blocks {
let mut batch = FuturesUnordered::new();
let mut availability_bytes = 0u128;
let mut candidates = state.candidates.clone();
let test_start = Instant::now();
for block_info in state.block_infos.iter() {
let block_num = block_info.number as usize;
gum::info!(target: LOG_TARGET, "Current block {}/{}", block_num, env.config().num_blocks);
env.metrics().set_current_block(block_num);
let block_start_ts = Instant::now();
env.import_block(block_info.clone()).await;
for candidate_num in 0..config.n_cores as u64 {
let candidate =
state.next_candidate().expect("We always send up to n_cores*num_blocks; qed");
candidates.next().expect("We always send up to n_cores*num_blocks; qed");
let (tx, rx) = oneshot::channel();
batch.push(rx);
@@ -519,7 +319,7 @@ pub async fn benchmark_availability_read(
pub async fn benchmark_availability_write(
benchmark_name: &str,
env: &mut TestEnvironment,
mut state: TestState,
state: &TestState,
) -> BenchmarkUsage {
let config = env.config().clone();
@@ -527,7 +327,7 @@ pub async fn benchmark_availability_write(
env.metrics().set_n_cores(config.n_cores);
gum::info!(target: LOG_TARGET, "Seeding availability store with candidates ...");
for backed_candidate in state.backed_candidates().clone() {
for backed_candidate in state.backed_candidates.clone() {
let candidate_index = *state.candidate_hashes.get(&backed_candidate.hash()).unwrap();
let available_data = state.available_data[candidate_index].clone();
let (tx, rx) = oneshot::channel();
@@ -550,15 +350,14 @@ pub async fn benchmark_availability_write(
gum::info!(target: LOG_TARGET, "Done");
let test_start = Instant::now();
for block_num in 1..=env.config().num_blocks {
for block_info in state.block_infos.iter() {
let block_num = block_info.number as usize;
gum::info!(target: LOG_TARGET, "Current block #{}", block_num);
env.metrics().set_current_block(block_num);
let block_start_ts = Instant::now();
let relay_block_hash = Hash::repeat_byte(block_num as u8);
env.import_block(new_block_import_info(relay_block_hash, block_num as BlockNumber))
.await;
let relay_block_hash = block_info.hash;
env.import_block(block_info.clone()).await;
// Inform bitfield distribution about our view of current test block
let message = polkadot_node_subsystem_types::messages::BitfieldDistributionMessage::NetworkBridgeUpdate(
@@ -569,20 +368,13 @@ pub async fn benchmark_availability_write(
let chunk_fetch_start_ts = Instant::now();
// Request chunks of our own backed candidate from all other validators.
let mut receivers = Vec::new();
for index in 1..config.n_validators {
let payloads = state.chunk_fetching_requests.get(block_num - 1).expect("pregenerated");
let receivers = (1..config.n_validators).filter_map(|index| {
let (pending_response, pending_response_receiver) = oneshot::channel();
let request = RawIncomingRequest {
peer: PeerId::random(),
payload: ChunkFetchingRequest {
candidate_hash: state.backed_candidates()[block_num - 1].hash(),
index: ValidatorIndex(index as u32),
}
.encode(),
pending_response,
};
let peer_id = *env.authorities().peer_ids.get(index).expect("all validators have ids");
let payload = payloads.get(index).expect("pregenerated").clone();
let request = RawIncomingRequest { peer: peer_id, payload, pending_response };
let peer = env
.authorities()
.validator_authority_id
@@ -592,59 +384,39 @@ pub async fn benchmark_availability_write(
if env.network().is_peer_connected(peer) &&
env.network().send_request_from_peer(peer, request).is_ok()
{
receivers.push(pending_response_receiver);
Some(pending_response_receiver)
} else {
None
}
}
});
gum::info!(target: LOG_TARGET, "Waiting for all emulated peers to receive their chunk from us ...");
for receiver in receivers.into_iter() {
let response = receiver.await.expect("Chunk is always served successfully");
// TODO: check if chunk is the one the peer expects to receive.
assert!(response.result.is_ok());
}
let responses = futures::future::try_join_all(receivers)
.await
.expect("Chunk is always served successfully");
// TODO: check if chunk is the one the peer expects to receive.
assert!(responses.iter().all(|v| v.result.is_ok()));
let chunk_fetch_duration = Instant::now().sub(chunk_fetch_start_ts).as_millis();
gum::info!(target: LOG_TARGET, "All chunks received in {}ms", chunk_fetch_duration);
let signing_context = SigningContext { session_index: 0, parent_hash: relay_block_hash };
let network = env.network().clone();
let authorities = env.authorities().clone();
let n_validators = config.n_validators;
// Spawn a task that will generate `n_validator` - 1 signed bitfiends and
// send them from the emulated peers to the subsystem.
// TODO: Implement topology.
env.spawn_blocking("send-bitfields", async move {
for index in 1..n_validators {
let validator_public =
authorities.validator_public.get(index).expect("All validator keys are known");
let messages = state.signed_bitfields.get(&relay_block_hash).expect("pregenerated").clone();
for index in 1..config.n_validators {
let from_peer = &authorities.validator_authority_id[index];
let message = messages.get(index).expect("pregenerated").clone();
// Node has all the chunks in the world.
let payload: AvailabilityBitfield =
AvailabilityBitfield(bitvec![u8, bitvec::order::Lsb0; 1u8; 32]);
// TODO(soon): Use pre-signed messages. This is quite intensive on the CPU.
let signed_bitfield = Signed::<AvailabilityBitfield>::sign(
&authorities.keyring.keystore(),
payload,
&signing_context,
ValidatorIndex(index as u32),
validator_public,
)
.ok()
.flatten()
.expect("should be signed");
let from_peer = &authorities.validator_authority_id[index];
let message = peer_bitfield_message_v2(relay_block_hash, signed_bitfield);
// Send the action from peer only if it is connected to our node.
if network.is_peer_connected(from_peer) {
let _ = network.send_message_from_peer(from_peer, message);
}
// Send the action from peer only if it is connected to our node.
if network.is_peer_connected(from_peer) {
let _ = network.send_message_from_peer(from_peer, message);
}
});
}
gum::info!(
"Waiting for {} bitfields to be received and processed",
@@ -679,17 +451,3 @@ pub async fn benchmark_availability_write(
&["availability-distribution", "bitfield-distribution", "availability-store"],
)
}
pub fn peer_bitfield_message_v2(
relay_hash: H256,
signed_bitfield: Signed<AvailabilityBitfield>,
) -> VersionedValidationProtocol {
let bitfield = polkadot_node_network_protocol::v2::BitfieldDistributionMessage::Bitfield(
relay_hash,
signed_bitfield.into(),
);
Versioned::V2(polkadot_node_network_protocol::v2::ValidationProtocol::BitfieldDistribution(
bitfield,
))
}
@@ -0,0 +1,268 @@
// Copyright (C) Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use crate::configuration::{TestAuthorities, TestConfiguration};
use bitvec::bitvec;
use colored::Colorize;
use itertools::Itertools;
use parity_scale_codec::Encode;
use polkadot_node_network_protocol::{
request_response::v1::ChunkFetchingRequest, Versioned, VersionedValidationProtocol,
};
use polkadot_node_primitives::{AvailableData, BlockData, ErasureChunk, PoV};
use polkadot_node_subsystem_test_helpers::{
derive_erasure_chunks_with_proofs_and_root, mock::new_block_import_info,
};
use polkadot_overseer::BlockInfo;
use polkadot_primitives::{
AvailabilityBitfield, BlockNumber, CandidateHash, CandidateReceipt, Hash, HeadData, Header,
PersistedValidationData, Signed, SigningContext, ValidatorIndex,
};
use polkadot_primitives_test_helpers::{dummy_candidate_receipt, dummy_hash};
use sp_core::H256;
use std::{collections::HashMap, iter::Cycle, sync::Arc};
const LOG_TARGET: &str = "subsystem-bench::availability::test_state";
#[derive(Clone)]
pub struct TestState {
// Full test configuration
pub config: TestConfiguration,
// A cycle iterator on all PoV sizes used in the test.
pub pov_sizes: Cycle<std::vec::IntoIter<usize>>,
// Generated candidate receipts to be used in the test
pub candidates: Cycle<std::vec::IntoIter<CandidateReceipt>>,
// Map from pov size to candidate index
pub pov_size_to_candidate: HashMap<usize, usize>,
// Map from generated candidate hashes to candidate index in `available_data` and `chunks`.
pub candidate_hashes: HashMap<CandidateHash, usize>,
// Per candidate index receipts.
pub candidate_receipt_templates: Vec<CandidateReceipt>,
// Per candidate index `AvailableData`
pub available_data: Vec<AvailableData>,
// Per candiadte index chunks
pub chunks: Vec<Vec<ErasureChunk>>,
// Per relay chain block - candidate backed by our backing group
pub backed_candidates: Vec<CandidateReceipt>,
// Relay chain block infos
pub block_infos: Vec<BlockInfo>,
// Chung fetching requests for backed candidates
pub chunk_fetching_requests: Vec<Vec<Vec<u8>>>,
// Pregenerated signed availability bitfields
pub signed_bitfields: HashMap<H256, Vec<VersionedValidationProtocol>>,
// Relay chain block headers
pub block_headers: HashMap<H256, Header>,
// Authority keys for the network emulation.
pub test_authorities: TestAuthorities,
// Map from generated candidate receipts
pub candidate_receipts: HashMap<H256, Vec<CandidateReceipt>>,
}
impl TestState {
pub fn new(config: &TestConfiguration) -> Self {
let mut test_state = Self {
available_data: Default::default(),
candidate_receipt_templates: Default::default(),
chunks: Default::default(),
pov_size_to_candidate: Default::default(),
pov_sizes: Vec::from(config.pov_sizes()).into_iter().cycle(),
candidate_hashes: HashMap::new(),
candidates: Vec::new().into_iter().cycle(),
backed_candidates: Vec::new(),
config: config.clone(),
block_infos: Default::default(),
chunk_fetching_requests: Default::default(),
signed_bitfields: Default::default(),
candidate_receipts: Default::default(),
block_headers: Default::default(),
test_authorities: config.generate_authorities(),
};
// we use it for all candidates.
let persisted_validation_data = PersistedValidationData {
parent_head: HeadData(vec![7, 8, 9]),
relay_parent_number: Default::default(),
max_pov_size: 1024,
relay_parent_storage_root: Default::default(),
};
// For each unique pov we create a candidate receipt.
for (index, pov_size) in config.pov_sizes().iter().cloned().unique().enumerate() {
gum::info!(target: LOG_TARGET, index, pov_size, "{}", "Generating template candidate".bright_blue());
let mut candidate_receipt = dummy_candidate_receipt(dummy_hash());
let pov = PoV { block_data: BlockData(vec![index as u8; pov_size]) };
let new_available_data = AvailableData {
validation_data: persisted_validation_data.clone(),
pov: Arc::new(pov),
};
let (new_chunks, erasure_root) = derive_erasure_chunks_with_proofs_and_root(
config.n_validators,
&new_available_data,
|_, _| {},
);
candidate_receipt.descriptor.erasure_root = erasure_root;
test_state.chunks.push(new_chunks);
test_state.available_data.push(new_available_data);
test_state.pov_size_to_candidate.insert(pov_size, index);
test_state.candidate_receipt_templates.push(candidate_receipt);
}
test_state.block_infos = (1..=config.num_blocks)
.map(|block_num| {
let relay_block_hash = Hash::repeat_byte(block_num as u8);
new_block_import_info(relay_block_hash, block_num as BlockNumber)
})
.collect();
test_state.block_headers = test_state
.block_infos
.iter()
.map(|info| {
(
info.hash,
Header {
digest: Default::default(),
number: info.number,
parent_hash: info.parent_hash,
extrinsics_root: Default::default(),
state_root: Default::default(),
},
)
})
.collect::<HashMap<_, _>>();
// Generate all candidates
let candidates_count = config.n_cores * config.num_blocks;
gum::info!(target: LOG_TARGET,"{}", format!("Pre-generating {} candidates.", candidates_count).bright_blue());
test_state.candidates = (0..candidates_count)
.map(|index| {
let pov_size = test_state.pov_sizes.next().expect("This is a cycle; qed");
let candidate_index = *test_state
.pov_size_to_candidate
.get(&pov_size)
.expect("pov_size always exists; qed");
let mut candidate_receipt =
test_state.candidate_receipt_templates[candidate_index].clone();
// Make it unique.
candidate_receipt.descriptor.relay_parent = Hash::from_low_u64_be(index as u64);
// Store the new candidate in the state
test_state.candidate_hashes.insert(candidate_receipt.hash(), candidate_index);
gum::debug!(target: LOG_TARGET, candidate_hash = ?candidate_receipt.hash(), "new candidate");
candidate_receipt
})
.collect::<Vec<_>>()
.into_iter()
.cycle();
// Prepare per block candidates.
// Genesis block is always finalized, so we start at 1.
for info in test_state.block_infos.iter() {
for _ in 0..config.n_cores {
let receipt = test_state.candidates.next().expect("Cycle iterator");
test_state.candidate_receipts.entry(info.hash).or_default().push(receipt);
}
// First candidate is our backed candidate.
test_state.backed_candidates.push(
test_state
.candidate_receipts
.get(&info.hash)
.expect("just inserted above")
.first()
.expect("just inserted above")
.clone(),
);
}
test_state.chunk_fetching_requests = test_state
.backed_candidates
.iter()
.map(|candidate| {
(0..config.n_validators)
.map(|index| {
ChunkFetchingRequest {
candidate_hash: candidate.hash(),
index: ValidatorIndex(index as u32),
}
.encode()
})
.collect::<Vec<_>>()
})
.collect::<Vec<_>>();
test_state.signed_bitfields = test_state
.block_infos
.iter()
.map(|block_info| {
let signing_context =
SigningContext { session_index: 0, parent_hash: block_info.hash };
let messages = (0..config.n_validators)
.map(|index| {
let validator_public = test_state
.test_authorities
.validator_public
.get(index)
.expect("All validator keys are known");
// Node has all the chunks in the world.
let payload: AvailabilityBitfield =
AvailabilityBitfield(bitvec![u8, bitvec::order::Lsb0; 1u8; 32]);
let signed_bitfield = Signed::<AvailabilityBitfield>::sign(
&test_state.test_authorities.keyring.keystore(),
payload,
&signing_context,
ValidatorIndex(index as u32),
validator_public,
)
.ok()
.flatten()
.expect("should be signed");
peer_bitfield_message_v2(block_info.hash, signed_bitfield)
})
.collect::<Vec<_>>();
(block_info.hash, messages)
})
.collect();
gum::info!(target: LOG_TARGET, "{}","Created test environment.".bright_blue());
test_state
}
}
fn peer_bitfield_message_v2(
relay_hash: H256,
signed_bitfield: Signed<AvailabilityBitfield>,
) -> VersionedValidationProtocol {
let bitfield = polkadot_node_network_protocol::v2::BitfieldDistributionMessage::Bitfield(
relay_hash,
signed_bitfield.into(),
);
Versioned::V2(polkadot_node_network_protocol::v2::ValidationProtocol::BitfieldDistribution(
bitfield,
))
}
@@ -118,6 +118,7 @@ fn new_runtime() -> tokio::runtime::Runtime {
.thread_name("subsystem-bench")
.enable_all()
.thread_stack_size(3 * 1024 * 1024)
.worker_threads(4)
.build()
.unwrap()
}
@@ -26,4 +26,3 @@ pub(crate) mod keyring;
pub(crate) mod mock;
pub(crate) mod network;
pub mod usage;
pub mod utils;
@@ -41,6 +41,7 @@ const LOG_TARGET: &str = "subsystem-bench::av-store-mock";
/// Mockup helper. Contains Ccunks and full availability data of all parachain blocks
/// used in a test.
#[derive(Clone)]
pub struct NetworkAvailabilityState {
pub candidate_hashes: HashMap<CandidateHash, usize>,
pub available_data: Vec<AvailableData>,
@@ -36,6 +36,7 @@ use std::collections::HashMap;
const LOG_TARGET: &str = "subsystem-bench::runtime-api-mock";
/// Minimal state to answer requests.
#[derive(Clone)]
pub struct RuntimeApiState {
// All authorities in the test,
authorities: TestAuthorities,
@@ -49,6 +50,7 @@ pub struct RuntimeApiState {
}
/// A mocked `runtime-api` subsystem.
#[derive(Clone)]
pub struct MockRuntimeApi {
state: RuntimeApiState,
config: TestConfiguration,
+11 -4
View File
@@ -17,6 +17,7 @@
//! Test usage implementation
use colored::Colorize;
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
@@ -37,10 +38,16 @@ impl std::fmt::Display for BenchmarkUsage {
self.network_usage
.iter()
.map(|v| v.to_string())
.sorted()
.collect::<Vec<String>>()
.join("\n"),
format!("{:<32}{:>12}{:>12}", "CPU usage, seconds", "total", "per block").blue(),
self.cpu_usage.iter().map(|v| v.to_string()).collect::<Vec<String>>().join("\n")
self.cpu_usage
.iter()
.map(|v| v.to_string())
.sorted()
.collect::<Vec<String>>()
.join("\n")
)
}
}
@@ -101,8 +108,8 @@ fn check_resource_usage(
None
} else {
Some(format!(
"The resource `{}` is expected to be equal to {} with a precision {}, but the current value is {}",
resource_name, base, precision, usage.per_block
"The resource `{}` is expected to be equal to {} with a precision {}, but the current value is {} ({})",
resource_name, base, precision, usage.per_block, diff
))
}
} else {
@@ -119,7 +126,7 @@ pub struct ResourceUsage {
impl std::fmt::Display for ResourceUsage {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "{:<32}{:>12.3}{:>12.3}", self.resource_name.cyan(), self.total, self.per_block)
write!(f, "{:<32}{:>12.4}{:>12.4}", self.resource_name.cyan(), self.total, self.per_block)
}
}
@@ -1,76 +0,0 @@
// Copyright (C) Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! Test utils
use crate::usage::BenchmarkUsage;
use std::io::{stdout, Write};
pub struct WarmUpOptions<'a> {
/// The maximum number of runs considered for marming up.
pub warm_up: usize,
/// The number of runs considered for benchmarking.
pub bench: usize,
/// The difference in CPU usage between runs considered as normal
pub precision: f64,
/// The subsystems whose CPU usage is checked during warm-up cycles
pub subsystems: &'a [&'a str],
}
impl<'a> WarmUpOptions<'a> {
pub fn new(subsystems: &'a [&'a str]) -> Self {
Self { warm_up: 100, bench: 3, precision: 0.02, subsystems }
}
}
pub fn warm_up_and_benchmark(
options: WarmUpOptions,
run: impl Fn() -> BenchmarkUsage,
) -> Result<BenchmarkUsage, String> {
println!("Warming up...");
let mut usages = Vec::with_capacity(options.bench);
for n in 1..=options.warm_up {
let curr = run();
if let Some(prev) = usages.last() {
let diffs = options
.subsystems
.iter()
.map(|&v| {
curr.cpu_usage_diff(prev, v)
.ok_or(format!("{} not found in benchmark {:?}", v, prev))
})
.collect::<Result<Vec<f64>, String>>()?;
if !diffs.iter().all(|&v| v < options.precision) {
usages.clear();
}
}
usages.push(curr);
print!("\r{}%", n * 100 / options.warm_up);
if usages.len() == options.bench {
println!("\rTook {} runs to warm up", n.saturating_sub(options.bench));
break;
}
stdout().flush().unwrap();
}
if usages.len() != options.bench {
println!("Didn't warm up after {} runs", options.warm_up);
return Err("Can't warm up".to_string())
}
Ok(BenchmarkUsage::average(&usages))
}