subsystem-bench: add regression tests for availability read and write (#3311)

### What's been done
- `subsystem-bench` has been split into two parts: a cli benchmark
runner and a library.
- The cli runner is quite simple. It just allows us to run `.yaml` based
test sequences. Now it should only be used to run benchmarks during
development.
- The library is used in the cli runner and in regression tests. Some
code is changed to make the library independent of the runner.
- Added first regression tests for availability read and write that
replicate existing test sequences.

### How we run regression tests
- Regression tests are simply rust integration tests without the
harnesses.
- They should only be compiled under the `subsystem-benchmarks` feature
to prevent them from running with other tests.
- This doesn't work when running tests with `nextest` in CI, so
additional filters have been added to the `nextest` runs.
- Each benchmark run takes a different time in the beginning, so we
"warm up" the tests until their CPU usage differs by only 1%.
- After the warm-up, we run the benchmarks a few more times and compare
the average with the exception using a precision.

### What is still wrong?
- I haven't managed to set up approval voting tests. The spread of their
results is too large and can't be narrowed down in a reasonable amount
of time in the warm-up phase.
- The tests start an unconfigurable prometheus endpoint inside, which
causes errors because they use the same 9999 port. I disable it with a
flag, but I think it's better to extract the endpoint launching outside
the test, as we already do with `valgrind` and `pyroscope`. But we still
use `prometheus` inside the tests.

### Future work
* https://github.com/paritytech/polkadot-sdk/issues/3528
* https://github.com/paritytech/polkadot-sdk/issues/3529
* https://github.com/paritytech/polkadot-sdk/issues/3530
* https://github.com/paritytech/polkadot-sdk/issues/3531

---------

Co-authored-by: Alexander Samusev <41779041+alvicsam@users.noreply.github.com>
This commit is contained in:
Andrei Eres
2024-03-01 15:30:43 +01:00
committed by GitHub
parent 6f81a4a092
commit f0e589d72e
35 changed files with 712 additions and 412 deletions
@@ -0,0 +1,207 @@
// Copyright (C) Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use crate::configuration::TestAuthorities;
use itertools::Itertools;
use polkadot_node_core_approval_voting::time::{Clock, SystemClock, Tick};
use polkadot_node_network_protocol::{
grid_topology::{SessionGridTopology, TopologyPeerInfo},
View,
};
use polkadot_node_subsystem_types::messages::{
network_bridge_event::NewGossipTopology, ApprovalDistributionMessage, NetworkBridgeEvent,
};
use polkadot_overseer::AllMessages;
use polkadot_primitives::{
BlockNumber, CandidateEvent, CandidateReceipt, CoreIndex, GroupIndex, Hash, Header,
Id as ParaId, Slot, ValidatorIndex,
};
use polkadot_primitives_test_helpers::dummy_candidate_receipt_bad_sig;
use rand::{seq::SliceRandom, SeedableRng};
use rand_chacha::ChaCha20Rng;
use sc_network::PeerId;
use sp_consensus_babe::{
digests::{CompatibleDigestItem, PreDigest, SecondaryVRFPreDigest},
AllowedSlots, BabeEpochConfiguration, Epoch as BabeEpoch, VrfSignature, VrfTranscript,
};
use sp_core::crypto::VrfSecret;
use sp_keyring::sr25519::Keyring as Sr25519Keyring;
use sp_runtime::{Digest, DigestItem};
use std::sync::{atomic::AtomicU64, Arc};
/// A fake system clock used for driving the approval voting and make
/// it process blocks, assignments and approvals from the past.
#[derive(Clone)]
pub struct PastSystemClock {
/// The real system clock
real_system_clock: SystemClock,
/// The difference in ticks between the real system clock and the current clock.
delta_ticks: Arc<AtomicU64>,
}
impl PastSystemClock {
/// Creates a new fake system clock with `delta_ticks` between the real time and the fake one.
pub fn new(real_system_clock: SystemClock, delta_ticks: Arc<AtomicU64>) -> Self {
PastSystemClock { real_system_clock, delta_ticks }
}
}
impl Clock for PastSystemClock {
fn tick_now(&self) -> Tick {
self.real_system_clock.tick_now() -
self.delta_ticks.load(std::sync::atomic::Ordering::SeqCst)
}
fn wait(
&self,
tick: Tick,
) -> std::pin::Pin<Box<dyn futures::prelude::Future<Output = ()> + Send + 'static>> {
self.real_system_clock
.wait(tick + self.delta_ticks.load(std::sync::atomic::Ordering::SeqCst))
}
}
/// Helper function to generate a babe epoch for this benchmark.
/// It does not change for the duration of the test.
pub fn generate_babe_epoch(current_slot: Slot, authorities: TestAuthorities) -> BabeEpoch {
let authorities = authorities
.validator_babe_id
.into_iter()
.enumerate()
.map(|(index, public)| (public, index as u64))
.collect_vec();
BabeEpoch {
epoch_index: 1,
start_slot: current_slot.saturating_sub(1u64),
duration: 200,
authorities,
randomness: [0xde; 32],
config: BabeEpochConfiguration { c: (1, 4), allowed_slots: AllowedSlots::PrimarySlots },
}
}
/// Generates a topology to be used for this benchmark.
pub fn generate_topology(test_authorities: &TestAuthorities) -> SessionGridTopology {
let keyrings = test_authorities
.validator_authority_id
.clone()
.into_iter()
.zip(test_authorities.peer_ids.clone())
.collect_vec();
let topology = keyrings
.clone()
.into_iter()
.enumerate()
.map(|(index, (discovery_id, peer_id))| TopologyPeerInfo {
peer_ids: vec![peer_id],
validator_index: ValidatorIndex(index as u32),
discovery_id,
})
.collect_vec();
let shuffled = (0..keyrings.len()).collect_vec();
SessionGridTopology::new(shuffled, topology)
}
/// Generates new session topology message.
pub fn generate_new_session_topology(
test_authorities: &TestAuthorities,
test_node: ValidatorIndex,
) -> Vec<AllMessages> {
let topology = generate_topology(test_authorities);
let event = NetworkBridgeEvent::NewGossipTopology(NewGossipTopology {
session: 1,
topology,
local_index: Some(test_node),
});
vec![AllMessages::ApprovalDistribution(ApprovalDistributionMessage::NetworkBridgeUpdate(event))]
}
/// Generates a peer view change for the passed `block_hash`
pub fn generate_peer_view_change_for(block_hash: Hash, peer_id: PeerId) -> AllMessages {
let network = NetworkBridgeEvent::PeerViewChange(peer_id, View::new([block_hash], 0));
AllMessages::ApprovalDistribution(ApprovalDistributionMessage::NetworkBridgeUpdate(network))
}
/// Helper function to create a a signature for the block header.
fn garbage_vrf_signature() -> VrfSignature {
let transcript = VrfTranscript::new(b"test-garbage", &[]);
Sr25519Keyring::Alice.pair().vrf_sign(&transcript.into())
}
/// Helper function to create a block header.
pub fn make_header(parent_hash: Hash, slot: Slot, number: u32) -> Header {
let digest =
{
let mut digest = Digest::default();
let vrf_signature = garbage_vrf_signature();
digest.push(DigestItem::babe_pre_digest(PreDigest::SecondaryVRF(
SecondaryVRFPreDigest { authority_index: 0, slot, vrf_signature },
)));
digest
};
Header {
digest,
extrinsics_root: Default::default(),
number,
state_root: Default::default(),
parent_hash,
}
}
/// Helper function to create a candidate receipt.
fn make_candidate(para_id: ParaId, hash: &Hash) -> CandidateReceipt {
let mut r = dummy_candidate_receipt_bad_sig(*hash, Some(Default::default()));
r.descriptor.para_id = para_id;
r
}
/// Helper function to create a list of candidates that are included in the block
pub fn make_candidates(
block_hash: Hash,
block_number: BlockNumber,
num_cores: u32,
num_candidates: u32,
) -> Vec<CandidateEvent> {
let seed = [block_number as u8; 32];
let mut rand_chacha = ChaCha20Rng::from_seed(seed);
let mut candidates = (0..num_cores)
.map(|core| {
CandidateEvent::CandidateIncluded(
make_candidate(ParaId::from(core), &block_hash),
Vec::new().into(),
CoreIndex(core),
GroupIndex(core),
)
})
.collect_vec();
let (candidates, _) = candidates.partial_shuffle(&mut rand_chacha, num_candidates as usize);
candidates
.iter_mut()
.map(|val| val.clone())
.sorted_by(|a, b| match (a, b) {
(
CandidateEvent::CandidateIncluded(_, _, core_a, _),
CandidateEvent::CandidateIncluded(_, _, core_b, _),
) => core_a.0.cmp(&core_b.0),
(_, _) => todo!("Should not happen"),
})
.collect_vec()
}
@@ -0,0 +1,676 @@
// Copyright (C) Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use crate::{
approval::{
helpers::{generate_babe_epoch, generate_topology},
test_message::{MessagesBundle, TestMessageInfo},
ApprovalTestState, ApprovalsOptions, BlockTestData, GeneratedState,
BUFFER_FOR_GENERATION_MILLIS, LOG_TARGET, SLOT_DURATION_MILLIS,
},
configuration::{TestAuthorities, TestConfiguration},
mock::runtime_api::session_info_for_peers,
NODE_UNDER_TEST,
};
use futures::SinkExt;
use itertools::Itertools;
use parity_scale_codec::Encode;
use polkadot_node_core_approval_voting::{
criteria::{compute_assignments, Config},
time::tranche_to_tick,
};
use polkadot_node_network_protocol::{
grid_topology::{GridNeighbors, RandomRouting, RequiredRouting, SessionGridTopology},
v3 as protocol_v3,
};
use polkadot_node_primitives::approval::{
self,
v2::{CoreBitfield, IndirectAssignmentCertV2, IndirectSignedApprovalVoteV2},
};
use polkadot_primitives::{
vstaging::ApprovalVoteMultipleCandidates, CandidateEvent, CandidateHash, CandidateIndex,
CoreIndex, Hash, SessionInfo, Slot, ValidatorId, ValidatorIndex, ASSIGNMENT_KEY_TYPE_ID,
};
use rand::{seq::SliceRandom, RngCore, SeedableRng};
use rand_chacha::ChaCha20Rng;
use rand_distr::{Distribution, Normal};
use sc_keystore::LocalKeystore;
use sc_network::PeerId;
use sc_service::SpawnTaskHandle;
use sha1::Digest;
use sp_application_crypto::AppCrypto;
use sp_consensus_babe::SlotDuration;
use sp_keystore::Keystore;
use sp_timestamp::Timestamp;
use std::{
cmp::max,
collections::{BTreeMap, HashSet},
fs,
io::Write,
path::{Path, PathBuf},
time::Duration,
};
/// A generator of messages coming from a given Peer/Validator
pub struct PeerMessagesGenerator {
/// The grid neighbors of the node under test.
pub topology_node_under_test: GridNeighbors,
/// The topology of the network for the epoch under test.
pub topology: SessionGridTopology,
/// The validator index for this object generates the messages.
pub validator_index: ValidatorIndex,
/// An array of pre-generated random samplings, that is used to determine, which nodes would
/// send a given assignment, to the node under test because of the random samplings.
/// As an optimization we generate this sampling at the begining of the test and just pick
/// one randomly, because always taking the samples would be too expensive for benchamrk.
pub random_samplings: Vec<Vec<ValidatorIndex>>,
/// Channel for sending the generated messages to the aggregator
pub tx_messages: futures::channel::mpsc::UnboundedSender<(Hash, Vec<MessagesBundle>)>,
/// The list of test authorities
pub test_authorities: TestAuthorities,
//// The session info used for the test.
pub session_info: SessionInfo,
/// The blocks used for testing
pub blocks: Vec<BlockTestData>,
/// Approval options params.
pub options: ApprovalsOptions,
}
impl PeerMessagesGenerator {
/// Generates messages by spawning a blocking task in the background which begins creating
/// the assignments/approvals and peer view changes at the begining of each block.
pub fn generate_messages(mut self, spawn_task_handle: &SpawnTaskHandle) {
spawn_task_handle.spawn("generate-messages", "generate-messages", async move {
for block_info in &self.blocks {
let assignments = self.generate_assignments(block_info);
let bytes = self.validator_index.0.to_be_bytes();
let seed = [
bytes[0], bytes[1], bytes[2], bytes[3], 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
];
let mut rand_chacha = ChaCha20Rng::from_seed(seed);
let approvals = issue_approvals(
assignments,
block_info.hash,
&self.test_authorities.validator_public,
block_info.candidates.clone(),
&self.options,
&mut rand_chacha,
self.test_authorities.keyring.keystore_ref(),
);
self.tx_messages
.send((block_info.hash, approvals))
.await
.expect("Should not fail");
}
})
}
// Builds the messages finger print corresponding to this configuration.
// When the finger print exists already on disk the messages are not re-generated.
fn messages_fingerprint(
configuration: &TestConfiguration,
options: &ApprovalsOptions,
) -> String {
let mut fingerprint = options.fingerprint();
let configuration_bytes = bincode::serialize(&configuration).unwrap();
fingerprint.extend(configuration_bytes);
let mut sha1 = sha1::Sha1::new();
sha1.update(fingerprint);
let result = sha1.finalize();
hex::encode(result)
}
/// Generate all messages(Assignments & Approvals) needed for approving `blocks``.
pub fn generate_messages_if_needed(
configuration: &TestConfiguration,
test_authorities: &TestAuthorities,
options: &ApprovalsOptions,
spawn_task_handle: &SpawnTaskHandle,
) -> PathBuf {
let path_name = format!(
"{}/{}",
options.workdir_prefix,
Self::messages_fingerprint(configuration, options)
);
let path = Path::new(&path_name);
if path.exists() {
return path.to_path_buf();
}
gum::info!("Generate message because file does not exist");
let delta_to_first_slot_under_test = Timestamp::new(BUFFER_FOR_GENERATION_MILLIS);
let initial_slot = Slot::from_timestamp(
(*Timestamp::current() - *delta_to_first_slot_under_test).into(),
SlotDuration::from_millis(SLOT_DURATION_MILLIS),
);
let babe_epoch = generate_babe_epoch(initial_slot, test_authorities.clone());
let session_info = session_info_for_peers(configuration, test_authorities);
let blocks = ApprovalTestState::generate_blocks_information(
configuration,
&babe_epoch,
initial_slot,
);
gum::info!(target: LOG_TARGET, "Generate messages");
let topology = generate_topology(test_authorities);
let random_samplings = random_samplings_to_node(
ValidatorIndex(NODE_UNDER_TEST),
test_authorities.validator_public.len(),
test_authorities.validator_public.len() * 2,
);
let topology_node_under_test =
topology.compute_grid_neighbors_for(ValidatorIndex(NODE_UNDER_TEST)).unwrap();
let (tx, mut rx) = futures::channel::mpsc::unbounded();
// Spawn a thread to generate the messages for each validator, so that we speed up the
// generation.
for current_validator_index in 1..test_authorities.validator_public.len() {
let peer_message_source = PeerMessagesGenerator {
topology_node_under_test: topology_node_under_test.clone(),
topology: topology.clone(),
validator_index: ValidatorIndex(current_validator_index as u32),
test_authorities: test_authorities.clone(),
session_info: session_info.clone(),
blocks: blocks.clone(),
tx_messages: tx.clone(),
random_samplings: random_samplings.clone(),
options: options.clone(),
};
peer_message_source.generate_messages(spawn_task_handle);
}
std::mem::drop(tx);
let seed = [0x32; 32];
let mut rand_chacha = ChaCha20Rng::from_seed(seed);
let mut all_messages: BTreeMap<u64, Vec<MessagesBundle>> = BTreeMap::new();
// Receive all messages and sort them by Tick they have to be sent.
loop {
match rx.try_next() {
Ok(Some((block_hash, messages))) =>
for message in messages {
let block_info = blocks
.iter()
.find(|val| val.hash == block_hash)
.expect("Should find blocks");
let tick_to_send = tranche_to_tick(
SLOT_DURATION_MILLIS,
block_info.slot,
message.tranche_to_send(),
);
let to_add = all_messages.entry(tick_to_send).or_default();
to_add.push(message);
},
Ok(None) => break,
Err(_) => {
std::thread::sleep(Duration::from_millis(50));
},
}
}
let all_messages = all_messages
.into_iter()
.flat_map(|(_, mut messages)| {
// Shuffle the messages inside the same tick, so that we don't priorites messages
// for older nodes. we try to simulate the same behaviour as in real world.
messages.shuffle(&mut rand_chacha);
messages
})
.collect_vec();
gum::info!("Generated a number of {:} unique messages", all_messages.len());
let generated_state = GeneratedState { all_messages: Some(all_messages), initial_slot };
let mut messages_file = fs::OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(path)
.unwrap();
messages_file
.write_all(&generated_state.encode())
.expect("Could not update message file");
path.to_path_buf()
}
/// Generates assignments for the given `current_validator_index`
/// Returns a list of assignments to be sent sorted by tranche.
fn generate_assignments(&self, block_info: &BlockTestData) -> Vec<TestMessageInfo> {
let config = Config::from(&self.session_info);
let leaving_cores = block_info
.candidates
.clone()
.into_iter()
.map(|candidate_event| {
if let CandidateEvent::CandidateIncluded(candidate, _, core_index, group_index) =
candidate_event
{
(candidate.hash(), core_index, group_index)
} else {
todo!("Variant is never created in this benchmark")
}
})
.collect_vec();
let mut assignments_by_tranche = BTreeMap::new();
let bytes = self.validator_index.0.to_be_bytes();
let seed = [
bytes[0], bytes[1], bytes[2], bytes[3], 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
];
let mut rand_chacha = ChaCha20Rng::from_seed(seed);
let to_be_sent_by = neighbours_that_would_sent_message(
&self.test_authorities.peer_ids,
self.validator_index.0,
&self.topology_node_under_test,
&self.topology,
);
let leaving_cores = leaving_cores
.clone()
.into_iter()
.filter(|(_, core_index, _group_index)| core_index.0 != self.validator_index.0)
.collect_vec();
let store = LocalKeystore::in_memory();
let _public = store
.sr25519_generate_new(
ASSIGNMENT_KEY_TYPE_ID,
Some(self.test_authorities.key_seeds[self.validator_index.0 as usize].as_str()),
)
.expect("should not fail");
let assignments = compute_assignments(
&store,
block_info.relay_vrf_story.clone(),
&config,
leaving_cores.clone(),
self.options.enable_assignments_v2,
);
let random_sending_nodes = self
.random_samplings
.get(rand_chacha.next_u32() as usize % self.random_samplings.len())
.unwrap();
let random_sending_peer_ids = random_sending_nodes
.iter()
.map(|validator| (*validator, self.test_authorities.peer_ids[validator.0 as usize]))
.collect_vec();
let mut unique_assignments = HashSet::new();
for (core_index, assignment) in assignments {
let assigned_cores = match &assignment.cert().kind {
approval::v2::AssignmentCertKindV2::RelayVRFModuloCompact { core_bitfield } =>
core_bitfield.iter_ones().map(|val| CoreIndex::from(val as u32)).collect_vec(),
approval::v2::AssignmentCertKindV2::RelayVRFDelay { core_index } =>
vec![*core_index],
approval::v2::AssignmentCertKindV2::RelayVRFModulo { sample: _ } =>
vec![core_index],
};
let bitfiled: CoreBitfield = assigned_cores.clone().try_into().unwrap();
// For the cases where tranch0 assignments are in a single certificate we need to make
// sure we create a single message.
if unique_assignments.insert(bitfiled) {
let this_tranche_assignments =
assignments_by_tranche.entry(assignment.tranche()).or_insert_with(Vec::new);
this_tranche_assignments.push((
IndirectAssignmentCertV2 {
block_hash: block_info.hash,
validator: self.validator_index,
cert: assignment.cert().clone(),
},
block_info
.candidates
.iter()
.enumerate()
.filter(|(_index, candidate)| {
if let CandidateEvent::CandidateIncluded(_, _, core, _) = candidate {
assigned_cores.contains(core)
} else {
panic!("Should not happen");
}
})
.map(|(index, _)| index as u32)
.collect_vec()
.try_into()
.unwrap(),
to_be_sent_by
.iter()
.chain(random_sending_peer_ids.iter())
.copied()
.collect::<HashSet<(ValidatorIndex, PeerId)>>(),
assignment.tranche(),
));
}
}
assignments_by_tranche
.into_values()
.flat_map(|assignments| assignments.into_iter())
.map(|assignment| {
let msg = protocol_v3::ApprovalDistributionMessage::Assignments(vec![(
assignment.0,
assignment.1,
)]);
TestMessageInfo {
msg,
sent_by: assignment
.2
.into_iter()
.map(|(validator_index, _)| validator_index)
.collect_vec(),
tranche: assignment.3,
block_hash: block_info.hash,
}
})
.collect_vec()
}
}
/// A list of random samplings that we use to determine which nodes should send a given message to
/// the node under test.
/// We can not sample every time for all the messages because that would be too expensive to
/// perform, so pre-generate a list of samples for a given network size.
/// - result[i] give us as a list of random nodes that would send a given message to the node under
/// test.
fn random_samplings_to_node(
node_under_test: ValidatorIndex,
num_validators: usize,
num_samplings: usize,
) -> Vec<Vec<ValidatorIndex>> {
let seed = [7u8; 32];
let mut rand_chacha = ChaCha20Rng::from_seed(seed);
(0..num_samplings)
.map(|_| {
(0..num_validators)
.filter(|sending_validator_index| {
*sending_validator_index != NODE_UNDER_TEST as usize
})
.flat_map(|sending_validator_index| {
let mut validators = (0..num_validators).collect_vec();
validators.shuffle(&mut rand_chacha);
let mut random_routing = RandomRouting::default();
validators
.into_iter()
.flat_map(|validator_to_send| {
if random_routing.sample(num_validators, &mut rand_chacha) {
random_routing.inc_sent();
if validator_to_send == node_under_test.0 as usize {
Some(ValidatorIndex(sending_validator_index as u32))
} else {
None
}
} else {
None
}
})
.collect_vec()
})
.collect_vec()
})
.collect_vec()
}
/// Helper function to randomly determine how many approvals we coalesce together in a single
/// message.
fn coalesce_approvals_len(
coalesce_mean: f32,
coalesce_std_dev: f32,
rand_chacha: &mut ChaCha20Rng,
) -> usize {
max(
1,
Normal::new(coalesce_mean, coalesce_std_dev)
.expect("normal distribution parameters are good")
.sample(rand_chacha)
.round() as i32,
) as usize
}
/// Helper function to create approvals signatures for all assignments passed as arguments.
/// Returns a list of Approvals messages that need to be sent.
fn issue_approvals(
assignments: Vec<TestMessageInfo>,
block_hash: Hash,
validator_ids: &[ValidatorId],
candidates: Vec<CandidateEvent>,
options: &ApprovalsOptions,
rand_chacha: &mut ChaCha20Rng,
store: &LocalKeystore,
) -> Vec<MessagesBundle> {
let mut queued_to_sign: Vec<TestSignInfo> = Vec::new();
let mut num_coalesce =
coalesce_approvals_len(options.coalesce_mean, options.coalesce_std_dev, rand_chacha);
let result = assignments
.iter()
.enumerate()
.map(|(_index, message)| match &message.msg {
protocol_v3::ApprovalDistributionMessage::Assignments(assignments) => {
let mut approvals_to_create = Vec::new();
let current_validator_index = queued_to_sign
.first()
.map(|msg| msg.validator_index)
.unwrap_or(ValidatorIndex(99999));
// Invariant for this benchmark.
assert_eq!(assignments.len(), 1);
let assignment = assignments.first().unwrap();
let earliest_tranche = queued_to_sign
.first()
.map(|val| val.assignment.tranche)
.unwrap_or(message.tranche);
if queued_to_sign.len() >= num_coalesce ||
(!queued_to_sign.is_empty() &&
current_validator_index != assignment.0.validator) ||
message.tranche - earliest_tranche >= options.coalesce_tranche_diff
{
approvals_to_create.push(TestSignInfo::sign_candidates(
&mut queued_to_sign,
validator_ids,
block_hash,
num_coalesce,
store,
));
num_coalesce = coalesce_approvals_len(
options.coalesce_mean,
options.coalesce_std_dev,
rand_chacha,
);
}
// If more that one candidate was in the assignment queue all of them for issuing
// approvals
for candidate_index in assignment.1.iter_ones() {
let candidate = candidates.get(candidate_index).unwrap();
if let CandidateEvent::CandidateIncluded(candidate, _, _, _) = candidate {
queued_to_sign.push(TestSignInfo {
candidate_hash: candidate.hash(),
candidate_index: candidate_index as CandidateIndex,
validator_index: assignment.0.validator,
assignment: message.clone(),
});
} else {
todo!("Other enum variants are not used in this benchmark");
}
}
approvals_to_create
},
_ => {
todo!("Other enum variants are not used in this benchmark");
},
})
.collect_vec();
let mut messages = result.into_iter().flatten().collect_vec();
if !queued_to_sign.is_empty() {
messages.push(TestSignInfo::sign_candidates(
&mut queued_to_sign,
validator_ids,
block_hash,
num_coalesce,
store,
));
}
messages
}
/// Helper struct to gather information about more than one candidate an sign it in a single
/// approval message.
struct TestSignInfo {
/// The candidate hash
candidate_hash: CandidateHash,
/// The candidate index
candidate_index: CandidateIndex,
/// The validator sending the assignments
validator_index: ValidatorIndex,
/// The assignments convering this candidate
assignment: TestMessageInfo,
}
impl TestSignInfo {
/// Helper function to create a signture for all candidates in `to_sign` parameter.
/// Returns a TestMessage
fn sign_candidates(
to_sign: &mut Vec<TestSignInfo>,
validator_ids: &[ValidatorId],
block_hash: Hash,
num_coalesce: usize,
store: &LocalKeystore,
) -> MessagesBundle {
let current_validator_index = to_sign.first().map(|val| val.validator_index).unwrap();
let tranche_approval_can_be_sent =
to_sign.iter().map(|val| val.assignment.tranche).max().unwrap();
let validator_id = validator_ids.get(current_validator_index.0 as usize).unwrap().clone();
let unique_assignments: HashSet<TestMessageInfo> =
to_sign.iter().map(|info| info.assignment.clone()).collect();
let mut to_sign = to_sign
.drain(..)
.sorted_by(|val1, val2| val1.candidate_index.cmp(&val2.candidate_index))
.peekable();
let mut bundle = MessagesBundle {
assignments: unique_assignments.into_iter().collect_vec(),
approvals: Vec::new(),
};
while to_sign.peek().is_some() {
let to_sign = to_sign.by_ref().take(num_coalesce).collect_vec();
let hashes = to_sign.iter().map(|val| val.candidate_hash).collect_vec();
let candidate_indices = to_sign.iter().map(|val| val.candidate_index).collect_vec();
let sent_by = to_sign
.iter()
.flat_map(|val| val.assignment.sent_by.iter())
.copied()
.collect::<HashSet<ValidatorIndex>>();
let payload = ApprovalVoteMultipleCandidates(&hashes).signing_payload(1);
let signature = store
.sr25519_sign(ValidatorId::ID, &validator_id.clone().into(), &payload[..])
.unwrap()
.unwrap()
.into();
let indirect = IndirectSignedApprovalVoteV2 {
block_hash,
candidate_indices: candidate_indices.try_into().unwrap(),
validator: current_validator_index,
signature,
};
let msg = protocol_v3::ApprovalDistributionMessage::Approvals(vec![indirect]);
bundle.approvals.push(TestMessageInfo {
msg,
sent_by: sent_by.into_iter().collect_vec(),
tranche: tranche_approval_can_be_sent,
block_hash,
});
}
bundle
}
}
/// Determine what neighbours would send a given message to the node under test.
fn neighbours_that_would_sent_message(
peer_ids: &[PeerId],
current_validator_index: u32,
topology_node_under_test: &GridNeighbors,
topology: &SessionGridTopology,
) -> Vec<(ValidatorIndex, PeerId)> {
let topology_originator = topology
.compute_grid_neighbors_for(ValidatorIndex(current_validator_index))
.unwrap();
let originator_y = topology_originator.validator_indices_y.iter().find(|validator| {
topology_node_under_test.required_routing_by_index(**validator, false) ==
RequiredRouting::GridY
});
assert!(originator_y != Some(&ValidatorIndex(NODE_UNDER_TEST)));
let originator_x = topology_originator.validator_indices_x.iter().find(|validator| {
topology_node_under_test.required_routing_by_index(**validator, false) ==
RequiredRouting::GridX
});
assert!(originator_x != Some(&ValidatorIndex(NODE_UNDER_TEST)));
let is_neighbour = topology_originator
.validator_indices_x
.contains(&ValidatorIndex(NODE_UNDER_TEST)) ||
topology_originator
.validator_indices_y
.contains(&ValidatorIndex(NODE_UNDER_TEST));
let mut to_be_sent_by = originator_y
.into_iter()
.chain(originator_x)
.map(|val| (*val, peer_ids[val.0 as usize]))
.collect_vec();
if is_neighbour {
to_be_sent_by.push((ValidatorIndex(current_validator_index), peer_ids[0]));
}
to_be_sent_by
}
@@ -0,0 +1,64 @@
// Copyright (C) Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use crate::approval::{ApprovalTestState, PastSystemClock, LOG_TARGET, SLOT_DURATION_MILLIS};
use futures::FutureExt;
use polkadot_node_core_approval_voting::time::{slot_number_to_tick, Clock, TICK_DURATION_MILLIS};
use polkadot_node_subsystem::{overseer, SpawnedSubsystem, SubsystemError};
use polkadot_node_subsystem_types::messages::ChainSelectionMessage;
/// Mock ChainSelection subsystem used to answer request made by the approval-voting subsystem,
/// during benchmark. All the necessary information to answer the requests is stored in the `state`
pub struct MockChainSelection {
pub state: ApprovalTestState,
pub clock: PastSystemClock,
}
#[overseer::subsystem(ChainSelection, error=SubsystemError, prefix=self::overseer)]
impl<Context> MockChainSelection {
fn start(self, ctx: Context) -> SpawnedSubsystem {
let future = self.run(ctx).map(|_| Ok(())).boxed();
SpawnedSubsystem { name: "mock-chain-subsystem", future }
}
}
#[overseer::contextbounds(ChainSelection, prefix = self::overseer)]
impl MockChainSelection {
async fn run<Context>(self, mut ctx: Context) {
loop {
let msg = ctx.recv().await.expect("Should not fail");
match msg {
orchestra::FromOrchestra::Signal(_) => {},
orchestra::FromOrchestra::Communication { msg } =>
if let ChainSelectionMessage::Approved(hash) = msg {
let block_info = self.state.get_info_by_hash(hash);
let approved_number = block_info.block_number;
block_info.approved.store(true, std::sync::atomic::Ordering::SeqCst);
self.state
.last_approved_block
.store(approved_number, std::sync::atomic::Ordering::SeqCst);
let approved_in_tick = self.clock.tick_now() -
slot_number_to_tick(SLOT_DURATION_MILLIS, block_info.slot);
gum::info!(target: LOG_TARGET, ?hash, "Chain selection approved after {:} ms", approved_in_tick * TICK_DURATION_MILLIS);
},
}
}
}
}
File diff suppressed because it is too large Load Diff
@@ -0,0 +1,305 @@
// Copyright (C) Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use crate::{
approval::{ApprovalsOptions, BlockTestData, CandidateTestData},
configuration::TestAuthorities,
};
use itertools::Itertools;
use parity_scale_codec::{Decode, Encode};
use polkadot_node_network_protocol::v3 as protocol_v3;
use polkadot_primitives::{CandidateIndex, Hash, ValidatorIndex};
use sc_network::PeerId;
use std::collections::{HashMap, HashSet};
#[derive(Debug, Clone, Encode, Decode, PartialEq, Eq)]
pub struct TestMessageInfo {
/// The actual message
pub msg: protocol_v3::ApprovalDistributionMessage,
/// The list of peers that would sends this message in a real topology.
/// It includes both the peers that would send the message because of the topology
/// or because of randomly chosing so.
pub sent_by: Vec<ValidatorIndex>,
/// The tranche at which this message should be sent.
pub tranche: u32,
/// The block hash this message refers to.
pub block_hash: Hash,
}
impl std::hash::Hash for TestMessageInfo {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
match &self.msg {
protocol_v3::ApprovalDistributionMessage::Assignments(assignments) => {
for (assignment, candidates) in assignments {
(assignment.block_hash, assignment.validator).hash(state);
candidates.hash(state);
}
},
protocol_v3::ApprovalDistributionMessage::Approvals(approvals) => {
for approval in approvals {
(approval.block_hash, approval.validator).hash(state);
approval.candidate_indices.hash(state);
}
},
};
}
}
#[derive(Debug, Clone, Encode, Decode, PartialEq, Eq)]
/// A list of messages that depend of each-other, approvals cover one of the assignments and
/// vice-versa.
pub struct MessagesBundle {
pub assignments: Vec<TestMessageInfo>,
pub approvals: Vec<TestMessageInfo>,
}
impl MessagesBundle {
/// The tranche when this bundle can be sent correctly, so no assignments or approvals will be
/// from the future.
pub fn tranche_to_send(&self) -> u32 {
self.assignments
.iter()
.chain(self.approvals.iter())
.max_by(|a, b| a.tranche.cmp(&b.tranche))
.unwrap()
.tranche
}
/// The min tranche in the bundle.
pub fn min_tranche(&self) -> u32 {
self.assignments
.iter()
.chain(self.approvals.iter())
.min_by(|a, b| a.tranche.cmp(&b.tranche))
.unwrap()
.tranche
}
/// Tells if the bundle is needed for sending.
/// We either send it because we need more assignments and approvals to approve the candidates
/// or because we configured the test to send messages untill a given tranche.
pub fn should_send(
&self,
candidates_test_data: &HashMap<(Hash, CandidateIndex), CandidateTestData>,
options: &ApprovalsOptions,
) -> bool {
self.needed_for_approval(candidates_test_data) ||
(!options.stop_when_approved &&
self.min_tranche() <= options.last_considered_tranche)
}
/// Tells if the bundle is needed because we need more messages to approve the candidates.
pub fn needed_for_approval(
&self,
candidates_test_data: &HashMap<(Hash, CandidateIndex), CandidateTestData>,
) -> bool {
self.assignments
.iter()
.any(|message| message.needed_for_approval(candidates_test_data))
}
/// Mark the assignments in the bundle as sent.
pub fn record_sent_assignment(
&self,
candidates_test_data: &mut HashMap<(Hash, CandidateIndex), CandidateTestData>,
) {
self.assignments
.iter()
.for_each(|assignment| assignment.record_sent_assignment(candidates_test_data));
}
}
impl TestMessageInfo {
/// Tells if the message is an approval.
fn is_approval(&self) -> bool {
match self.msg {
protocol_v3::ApprovalDistributionMessage::Assignments(_) => false,
protocol_v3::ApprovalDistributionMessage::Approvals(_) => true,
}
}
/// Records an approval.
/// We use this to check after all messages have been processed that we didn't loose any
/// message.
pub fn record_vote(&self, state: &BlockTestData) {
if self.is_approval() {
match &self.msg {
protocol_v3::ApprovalDistributionMessage::Assignments(_) => todo!(),
protocol_v3::ApprovalDistributionMessage::Approvals(approvals) =>
for approval in approvals {
for candidate_index in approval.candidate_indices.iter_ones() {
state
.votes
.get(approval.validator.0 as usize)
.unwrap()
.get(candidate_index)
.unwrap()
.store(true, std::sync::atomic::Ordering::SeqCst);
}
},
}
}
}
/// Mark the assignments in the message as sent.
pub fn record_sent_assignment(
&self,
candidates_test_data: &mut HashMap<(Hash, CandidateIndex), CandidateTestData>,
) {
match &self.msg {
protocol_v3::ApprovalDistributionMessage::Assignments(assignments) => {
for (assignment, candidate_indices) in assignments {
for candidate_index in candidate_indices.iter_ones() {
let candidate_test_data = candidates_test_data
.get_mut(&(assignment.block_hash, candidate_index as CandidateIndex))
.unwrap();
candidate_test_data.mark_sent_assignment(self.tranche)
}
}
},
protocol_v3::ApprovalDistributionMessage::Approvals(_approvals) => todo!(),
}
}
/// Returns a list of candidates indicies in this message
pub fn candidate_indices(&self) -> HashSet<usize> {
let mut unique_candidate_indicies = HashSet::new();
match &self.msg {
protocol_v3::ApprovalDistributionMessage::Assignments(assignments) =>
for (_assignment, candidate_indices) in assignments {
for candidate_index in candidate_indices.iter_ones() {
unique_candidate_indicies.insert(candidate_index);
}
},
protocol_v3::ApprovalDistributionMessage::Approvals(approvals) =>
for approval in approvals {
for candidate_index in approval.candidate_indices.iter_ones() {
unique_candidate_indicies.insert(candidate_index);
}
},
}
unique_candidate_indicies
}
/// Marks this message as no-shows if the number of configured no-shows is above the registered
/// no-shows.
/// Returns true if the message is a no-show.
pub fn no_show_if_required(
&self,
assignments: &[TestMessageInfo],
candidates_test_data: &mut HashMap<(Hash, CandidateIndex), CandidateTestData>,
) -> bool {
let mut should_no_show = false;
if self.is_approval() {
let covered_candidates = assignments
.iter()
.map(|assignment| (assignment, assignment.candidate_indices()))
.collect_vec();
match &self.msg {
protocol_v3::ApprovalDistributionMessage::Assignments(_) => todo!(),
protocol_v3::ApprovalDistributionMessage::Approvals(approvals) => {
assert_eq!(approvals.len(), 1);
for approval in approvals {
should_no_show = should_no_show ||
approval.candidate_indices.iter_ones().all(|candidate_index| {
let candidate_test_data = candidates_test_data
.get_mut(&(
approval.block_hash,
candidate_index as CandidateIndex,
))
.unwrap();
let assignment = covered_candidates
.iter()
.find(|(_assignment, candidates)| {
candidates.contains(&candidate_index)
})
.unwrap();
candidate_test_data.should_no_show(assignment.0.tranche)
});
if should_no_show {
for candidate_index in approval.candidate_indices.iter_ones() {
let candidate_test_data = candidates_test_data
.get_mut(&(
approval.block_hash,
candidate_index as CandidateIndex,
))
.unwrap();
let assignment = covered_candidates
.iter()
.find(|(_assignment, candidates)| {
candidates.contains(&candidate_index)
})
.unwrap();
candidate_test_data.record_no_show(assignment.0.tranche)
}
}
}
},
}
}
should_no_show
}
/// Tells if a message is needed for approval
pub fn needed_for_approval(
&self,
candidates_test_data: &HashMap<(Hash, CandidateIndex), CandidateTestData>,
) -> bool {
match &self.msg {
protocol_v3::ApprovalDistributionMessage::Assignments(assignments) =>
assignments.iter().any(|(assignment, candidate_indices)| {
candidate_indices.iter_ones().any(|candidate_index| {
candidates_test_data
.get(&(assignment.block_hash, candidate_index as CandidateIndex))
.map(|data| data.should_send_tranche(self.tranche))
.unwrap_or_default()
})
}),
protocol_v3::ApprovalDistributionMessage::Approvals(approvals) =>
approvals.iter().any(|approval| {
approval.candidate_indices.iter_ones().any(|candidate_index| {
candidates_test_data
.get(&(approval.block_hash, candidate_index as CandidateIndex))
.map(|data| data.should_send_tranche(self.tranche))
.unwrap_or_default()
})
}),
}
}
/// Splits a message into multiple messages based on what peers should send this message.
/// It build a HashMap of messages that should be sent by each peer.
pub fn split_by_peer_id(
self,
authorities: &TestAuthorities,
) -> HashMap<(ValidatorIndex, PeerId), Vec<TestMessageInfo>> {
let mut result: HashMap<(ValidatorIndex, PeerId), Vec<TestMessageInfo>> = HashMap::new();
for validator_index in &self.sent_by {
let peer = authorities.peer_ids.get(validator_index.0 as usize).unwrap();
result.entry((*validator_index, *peer)).or_default().push(TestMessageInfo {
msg: self.msg.clone(),
sent_by: Default::default(),
tranche: self.tranche,
block_hash: self.block_hash,
});
}
result
}
}
@@ -0,0 +1,42 @@
// Copyright (C) Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use crate::{environment::TestEnvironmentDependencies, mock::TestSyncOracle};
use polkadot_node_core_av_store::{AvailabilityStoreSubsystem, Config};
use polkadot_node_metrics::metrics::Metrics;
use polkadot_node_subsystem_util::database::Database;
use std::sync::Arc;
mod columns {
pub const DATA: u32 = 0;
pub const META: u32 = 1;
pub const NUM_COLUMNS: u32 = 2;
}
const TEST_CONFIG: Config = Config { col_data: columns::DATA, col_meta: columns::META };
pub fn new_av_store(dependencies: &TestEnvironmentDependencies) -> AvailabilityStoreSubsystem {
let metrics = Metrics::try_register(&dependencies.registry).unwrap();
AvailabilityStoreSubsystem::new(test_store(), TEST_CONFIG, Box::new(TestSyncOracle {}), metrics)
}
fn test_store() -> Arc<dyn Database> {
let db = kvdb_memorydb::create(columns::NUM_COLUMNS);
let db =
polkadot_node_subsystem_util::database::kvdb_impl::DbAdapter::new(db, &[columns::META]);
Arc::new(db)
}
@@ -0,0 +1,695 @@
// Copyright (C) Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use crate::{
configuration::TestConfiguration,
dummy_builder,
environment::{TestEnvironment, TestEnvironmentDependencies, GENESIS_HASH},
mock::{
av_store::{self, MockAvailabilityStore},
chain_api::{ChainApiState, MockChainApi},
network_bridge::{self, MockNetworkBridgeRx, MockNetworkBridgeTx},
runtime_api::{self, MockRuntimeApi},
AlwaysSupportsParachains,
},
network::new_network,
usage::BenchmarkUsage,
};
use av_store::NetworkAvailabilityState;
use av_store_helpers::new_av_store;
use bitvec::bitvec;
use colored::Colorize;
use futures::{channel::oneshot, stream::FuturesUnordered, StreamExt};
use itertools::Itertools;
use parity_scale_codec::Encode;
use polkadot_availability_bitfield_distribution::BitfieldDistribution;
use polkadot_availability_distribution::{
AvailabilityDistributionSubsystem, IncomingRequestReceivers,
};
use polkadot_availability_recovery::AvailabilityRecoverySubsystem;
use polkadot_node_core_av_store::AvailabilityStoreSubsystem;
use polkadot_node_metrics::metrics::Metrics;
use polkadot_node_network_protocol::{
request_response::{v1::ChunkFetchingRequest, IncomingRequest, ReqProtocolNames},
OurView, Versioned, VersionedValidationProtocol,
};
use polkadot_node_primitives::{AvailableData, BlockData, ErasureChunk, PoV};
use polkadot_node_subsystem::{
messages::{AllMessages, AvailabilityRecoveryMessage},
Overseer, OverseerConnector, SpawnGlue,
};
use polkadot_node_subsystem_test_helpers::{
derive_erasure_chunks_with_proofs_and_root, mock::new_block_import_info,
};
use polkadot_node_subsystem_types::{
messages::{AvailabilityStoreMessage, NetworkBridgeEvent},
Span,
};
use polkadot_overseer::{metrics::Metrics as OverseerMetrics, Handle as OverseerHandle};
use polkadot_primitives::{
AvailabilityBitfield, BlockNumber, CandidateHash, CandidateReceipt, GroupIndex, Hash, HeadData,
Header, PersistedValidationData, Signed, SigningContext, ValidatorIndex,
};
use polkadot_primitives_test_helpers::{dummy_candidate_receipt, dummy_hash};
use sc_network::{
request_responses::{IncomingRequest as RawIncomingRequest, ProtocolConfig},
PeerId,
};
use sc_service::SpawnTaskHandle;
use serde::{Deserialize, Serialize};
use sp_core::H256;
use std::{collections::HashMap, iter::Cycle, ops::Sub, sync::Arc, time::Instant};
mod av_store_helpers;
const LOG_TARGET: &str = "subsystem-bench::availability";
#[derive(Debug, Clone, Serialize, Deserialize, clap::Parser)]
#[clap(rename_all = "kebab-case")]
#[allow(missing_docs)]
pub struct DataAvailabilityReadOptions {
#[clap(short, long, default_value_t = false)]
/// Turbo boost AD Read by fetching the full availability datafrom backers first. Saves CPU as
/// we don't need to re-construct from chunks. Tipically this is only faster if nodes have
/// enough bandwidth.
pub fetch_from_backers: bool,
}
pub enum TestDataAvailability {
Read(DataAvailabilityReadOptions),
Write,
}
fn build_overseer_for_availability_read(
spawn_task_handle: SpawnTaskHandle,
runtime_api: MockRuntimeApi,
av_store: MockAvailabilityStore,
network_bridge: (MockNetworkBridgeTx, MockNetworkBridgeRx),
availability_recovery: AvailabilityRecoverySubsystem,
dependencies: &TestEnvironmentDependencies,
) -> (Overseer<SpawnGlue<SpawnTaskHandle>, AlwaysSupportsParachains>, OverseerHandle) {
let overseer_connector = OverseerConnector::with_event_capacity(64000);
let overseer_metrics = OverseerMetrics::try_register(&dependencies.registry).unwrap();
let dummy = dummy_builder!(spawn_task_handle, overseer_metrics);
let builder = dummy
.replace_runtime_api(|_| runtime_api)
.replace_availability_store(|_| av_store)
.replace_network_bridge_tx(|_| network_bridge.0)
.replace_network_bridge_rx(|_| network_bridge.1)
.replace_availability_recovery(|_| availability_recovery);
let (overseer, raw_handle) =
builder.build_with_connector(overseer_connector).expect("Should not fail");
(overseer, OverseerHandle::new(raw_handle))
}
#[allow(clippy::too_many_arguments)]
fn build_overseer_for_availability_write(
spawn_task_handle: SpawnTaskHandle,
runtime_api: MockRuntimeApi,
network_bridge: (MockNetworkBridgeTx, MockNetworkBridgeRx),
availability_distribution: AvailabilityDistributionSubsystem,
chain_api: MockChainApi,
availability_store: AvailabilityStoreSubsystem,
bitfield_distribution: BitfieldDistribution,
dependencies: &TestEnvironmentDependencies,
) -> (Overseer<SpawnGlue<SpawnTaskHandle>, AlwaysSupportsParachains>, OverseerHandle) {
let overseer_connector = OverseerConnector::with_event_capacity(64000);
let overseer_metrics = OverseerMetrics::try_register(&dependencies.registry).unwrap();
let dummy = dummy_builder!(spawn_task_handle, overseer_metrics);
let builder = dummy
.replace_runtime_api(|_| runtime_api)
.replace_availability_store(|_| availability_store)
.replace_network_bridge_tx(|_| network_bridge.0)
.replace_network_bridge_rx(|_| network_bridge.1)
.replace_chain_api(|_| chain_api)
.replace_bitfield_distribution(|_| bitfield_distribution)
// This is needed to test own chunk recovery for `n_cores`.
.replace_availability_distribution(|_| availability_distribution);
let (overseer, raw_handle) =
builder.build_with_connector(overseer_connector).expect("Should not fail");
(overseer, OverseerHandle::new(raw_handle))
}
/// Takes a test configuration and uses it to create the `TestEnvironment`.
pub fn prepare_test(
config: TestConfiguration,
state: &mut TestState,
mode: TestDataAvailability,
with_prometheus_endpoint: bool,
) -> (TestEnvironment, Vec<ProtocolConfig>) {
prepare_test_inner(
config,
state,
mode,
TestEnvironmentDependencies::default(),
with_prometheus_endpoint,
)
}
fn prepare_test_inner(
config: TestConfiguration,
state: &mut TestState,
mode: TestDataAvailability,
dependencies: TestEnvironmentDependencies,
with_prometheus_endpoint: bool,
) -> (TestEnvironment, Vec<ProtocolConfig>) {
// Generate test authorities.
let test_authorities = config.generate_authorities();
let mut candidate_hashes: HashMap<H256, Vec<CandidateReceipt>> = HashMap::new();
// Prepare per block candidates.
// Genesis block is always finalized, so we start at 1.
for block_num in 1..=config.num_blocks {
for _ in 0..config.n_cores {
candidate_hashes
.entry(Hash::repeat_byte(block_num as u8))
.or_default()
.push(state.next_candidate().expect("Cycle iterator"))
}
// First candidate is our backed candidate.
state.backed_candidates.push(
candidate_hashes
.get(&Hash::repeat_byte(block_num as u8))
.expect("just inserted above")
.first()
.expect("just inserted above")
.clone(),
);
}
let runtime_api = runtime_api::MockRuntimeApi::new(
config.clone(),
test_authorities.clone(),
candidate_hashes,
Default::default(),
Default::default(),
0,
);
let availability_state = NetworkAvailabilityState {
candidate_hashes: state.candidate_hashes.clone(),
available_data: state.available_data.clone(),
chunks: state.chunks.clone(),
};
let mut req_cfgs = Vec::new();
let (collation_req_receiver, collation_req_cfg) =
IncomingRequest::get_config_receiver(&ReqProtocolNames::new(GENESIS_HASH, None));
req_cfgs.push(collation_req_cfg);
let (pov_req_receiver, pov_req_cfg) =
IncomingRequest::get_config_receiver(&ReqProtocolNames::new(GENESIS_HASH, None));
let (chunk_req_receiver, chunk_req_cfg) =
IncomingRequest::get_config_receiver(&ReqProtocolNames::new(GENESIS_HASH, None));
req_cfgs.push(pov_req_cfg);
let (network, network_interface, network_receiver) =
new_network(&config, &dependencies, &test_authorities, vec![Arc::new(availability_state)]);
let network_bridge_tx = network_bridge::MockNetworkBridgeTx::new(
network.clone(),
network_interface.subsystem_sender(),
test_authorities.clone(),
);
let network_bridge_rx =
network_bridge::MockNetworkBridgeRx::new(network_receiver, Some(chunk_req_cfg.clone()));
let (overseer, overseer_handle) = match &mode {
TestDataAvailability::Read(options) => {
let use_fast_path = options.fetch_from_backers;
let subsystem = if use_fast_path {
AvailabilityRecoverySubsystem::with_fast_path(
collation_req_receiver,
Metrics::try_register(&dependencies.registry).unwrap(),
)
} else {
AvailabilityRecoverySubsystem::with_chunks_only(
collation_req_receiver,
Metrics::try_register(&dependencies.registry).unwrap(),
)
};
// Use a mocked av-store.
let av_store = av_store::MockAvailabilityStore::new(
state.chunks.clone(),
state.candidate_hashes.clone(),
);
build_overseer_for_availability_read(
dependencies.task_manager.spawn_handle(),
runtime_api,
av_store,
(network_bridge_tx, network_bridge_rx),
subsystem,
&dependencies,
)
},
TestDataAvailability::Write => {
let availability_distribution = AvailabilityDistributionSubsystem::new(
test_authorities.keyring.keystore(),
IncomingRequestReceivers { pov_req_receiver, chunk_req_receiver },
Metrics::try_register(&dependencies.registry).unwrap(),
);
let block_headers = (1..=config.num_blocks)
.map(|block_number| {
(
Hash::repeat_byte(block_number as u8),
Header {
digest: Default::default(),
number: block_number as BlockNumber,
parent_hash: Default::default(),
extrinsics_root: Default::default(),
state_root: Default::default(),
},
)
})
.collect::<HashMap<_, _>>();
let chain_api_state = ChainApiState { block_headers };
let chain_api = MockChainApi::new(chain_api_state);
let bitfield_distribution =
BitfieldDistribution::new(Metrics::try_register(&dependencies.registry).unwrap());
build_overseer_for_availability_write(
dependencies.task_manager.spawn_handle(),
runtime_api,
(network_bridge_tx, network_bridge_rx),
availability_distribution,
chain_api,
new_av_store(&dependencies),
bitfield_distribution,
&dependencies,
)
},
};
(
TestEnvironment::new(
dependencies,
config,
network,
overseer,
overseer_handle,
test_authorities,
with_prometheus_endpoint,
),
req_cfgs,
)
}
#[derive(Clone)]
pub struct TestState {
// Full test configuration
config: TestConfiguration,
// A cycle iterator on all PoV sizes used in the test.
pov_sizes: Cycle<std::vec::IntoIter<usize>>,
// Generated candidate receipts to be used in the test
candidates: Cycle<std::vec::IntoIter<CandidateReceipt>>,
// Map from pov size to candidate index
pov_size_to_candidate: HashMap<usize, usize>,
// Map from generated candidate hashes to candidate index in `available_data`
// and `chunks`.
candidate_hashes: HashMap<CandidateHash, usize>,
// Per candidate index receipts.
candidate_receipt_templates: Vec<CandidateReceipt>,
// Per candidate index `AvailableData`
available_data: Vec<AvailableData>,
// Per candiadte index chunks
chunks: Vec<Vec<ErasureChunk>>,
// Per relay chain block - candidate backed by our backing group
backed_candidates: Vec<CandidateReceipt>,
}
impl TestState {
pub fn next_candidate(&mut self) -> Option<CandidateReceipt> {
let candidate = self.candidates.next();
let candidate_hash = candidate.as_ref().unwrap().hash();
gum::trace!(target: LOG_TARGET, "Next candidate selected {:?}", candidate_hash);
candidate
}
/// Generate candidates to be used in the test.
fn generate_candidates(&mut self) {
let count = self.config.n_cores * self.config.num_blocks;
gum::info!(target: LOG_TARGET,"{}", format!("Pre-generating {} candidates.", count).bright_blue());
// Generate all candidates
self.candidates = (0..count)
.map(|index| {
let pov_size = self.pov_sizes.next().expect("This is a cycle; qed");
let candidate_index = *self
.pov_size_to_candidate
.get(&pov_size)
.expect("pov_size always exists; qed");
let mut candidate_receipt =
self.candidate_receipt_templates[candidate_index].clone();
// Make it unique.
candidate_receipt.descriptor.relay_parent = Hash::from_low_u64_be(index as u64);
// Store the new candidate in the state
self.candidate_hashes.insert(candidate_receipt.hash(), candidate_index);
gum::debug!(target: LOG_TARGET, candidate_hash = ?candidate_receipt.hash(), "new candidate");
candidate_receipt
})
.collect::<Vec<_>>()
.into_iter()
.cycle();
}
pub fn new(config: &TestConfiguration) -> Self {
let config = config.clone();
let mut chunks = Vec::new();
let mut available_data = Vec::new();
let mut candidate_receipt_templates = Vec::new();
let mut pov_size_to_candidate = HashMap::new();
// we use it for all candidates.
let persisted_validation_data = PersistedValidationData {
parent_head: HeadData(vec![7, 8, 9]),
relay_parent_number: Default::default(),
max_pov_size: 1024,
relay_parent_storage_root: Default::default(),
};
// For each unique pov we create a candidate receipt.
for (index, pov_size) in config.pov_sizes().iter().cloned().unique().enumerate() {
gum::info!(target: LOG_TARGET, index, pov_size, "{}", "Generating template candidate".bright_blue());
let mut candidate_receipt = dummy_candidate_receipt(dummy_hash());
let pov = PoV { block_data: BlockData(vec![index as u8; pov_size]) };
let new_available_data = AvailableData {
validation_data: persisted_validation_data.clone(),
pov: Arc::new(pov),
};
let (new_chunks, erasure_root) = derive_erasure_chunks_with_proofs_and_root(
config.n_validators,
&new_available_data,
|_, _| {},
);
candidate_receipt.descriptor.erasure_root = erasure_root;
chunks.push(new_chunks);
available_data.push(new_available_data);
pov_size_to_candidate.insert(pov_size, index);
candidate_receipt_templates.push(candidate_receipt);
}
gum::info!(target: LOG_TARGET, "{}","Created test environment.".bright_blue());
let mut _self = Self {
available_data,
candidate_receipt_templates,
chunks,
pov_size_to_candidate,
pov_sizes: Vec::from(config.pov_sizes()).into_iter().cycle(),
candidate_hashes: HashMap::new(),
candidates: Vec::new().into_iter().cycle(),
backed_candidates: Vec::new(),
config,
};
_self.generate_candidates();
_self
}
pub fn backed_candidates(&mut self) -> &mut Vec<CandidateReceipt> {
&mut self.backed_candidates
}
}
pub async fn benchmark_availability_read(
benchmark_name: &str,
env: &mut TestEnvironment,
mut state: TestState,
) -> BenchmarkUsage {
let config = env.config().clone();
env.import_block(new_block_import_info(Hash::repeat_byte(1), 1)).await;
let test_start = Instant::now();
let mut batch = FuturesUnordered::new();
let mut availability_bytes = 0u128;
env.metrics().set_n_validators(config.n_validators);
env.metrics().set_n_cores(config.n_cores);
for block_num in 1..=env.config().num_blocks {
gum::info!(target: LOG_TARGET, "Current block {}/{}", block_num, env.config().num_blocks);
env.metrics().set_current_block(block_num);
let block_start_ts = Instant::now();
for candidate_num in 0..config.n_cores as u64 {
let candidate =
state.next_candidate().expect("We always send up to n_cores*num_blocks; qed");
let (tx, rx) = oneshot::channel();
batch.push(rx);
let message = AllMessages::AvailabilityRecovery(
AvailabilityRecoveryMessage::RecoverAvailableData(
candidate.clone(),
1,
Some(GroupIndex(
candidate_num as u32 % (std::cmp::max(5, config.n_cores) / 5) as u32,
)),
tx,
),
);
env.send_message(message).await;
}
gum::info!(target: LOG_TARGET, "{}", format!("{} recoveries pending", batch.len()).bright_black());
while let Some(completed) = batch.next().await {
let available_data = completed.unwrap().unwrap();
env.metrics().on_pov_size(available_data.encoded_size());
availability_bytes += available_data.encoded_size() as u128;
}
let block_time = Instant::now().sub(block_start_ts).as_millis() as u64;
env.metrics().set_block_time(block_time);
gum::info!(target: LOG_TARGET, "All work for block completed in {}", format!("{:?}ms", block_time).cyan());
}
let duration: u128 = test_start.elapsed().as_millis();
let availability_bytes = availability_bytes / 1024;
gum::info!(target: LOG_TARGET, "All blocks processed in {}", format!("{:?}ms", duration).cyan());
gum::info!(target: LOG_TARGET,
"Throughput: {}",
format!("{} KiB/block", availability_bytes / env.config().num_blocks as u128).bright_red()
);
gum::info!(target: LOG_TARGET,
"Avg block time: {}",
format!("{} ms", test_start.elapsed().as_millis() / env.config().num_blocks as u128).red()
);
env.stop().await;
env.collect_resource_usage(benchmark_name, &["availability-recovery"])
}
pub async fn benchmark_availability_write(
benchmark_name: &str,
env: &mut TestEnvironment,
mut state: TestState,
) -> BenchmarkUsage {
let config = env.config().clone();
env.metrics().set_n_validators(config.n_validators);
env.metrics().set_n_cores(config.n_cores);
gum::info!(target: LOG_TARGET, "Seeding availability store with candidates ...");
for backed_candidate in state.backed_candidates().clone() {
let candidate_index = *state.candidate_hashes.get(&backed_candidate.hash()).unwrap();
let available_data = state.available_data[candidate_index].clone();
let (tx, rx) = oneshot::channel();
env.send_message(AllMessages::AvailabilityStore(
AvailabilityStoreMessage::StoreAvailableData {
candidate_hash: backed_candidate.hash(),
n_validators: config.n_validators as u32,
available_data,
expected_erasure_root: backed_candidate.descriptor().erasure_root,
tx,
},
))
.await;
rx.await
.unwrap()
.expect("Test candidates are stored nicely in availability store");
}
gum::info!(target: LOG_TARGET, "Done");
let test_start = Instant::now();
for block_num in 1..=env.config().num_blocks {
gum::info!(target: LOG_TARGET, "Current block #{}", block_num);
env.metrics().set_current_block(block_num);
let block_start_ts = Instant::now();
let relay_block_hash = Hash::repeat_byte(block_num as u8);
env.import_block(new_block_import_info(relay_block_hash, block_num as BlockNumber))
.await;
// Inform bitfield distribution about our view of current test block
let message = polkadot_node_subsystem_types::messages::BitfieldDistributionMessage::NetworkBridgeUpdate(
NetworkBridgeEvent::OurViewChange(OurView::new(vec![(relay_block_hash, Arc::new(Span::Disabled))], 0))
);
env.send_message(AllMessages::BitfieldDistribution(message)).await;
let chunk_fetch_start_ts = Instant::now();
// Request chunks of our own backed candidate from all other validators.
let mut receivers = Vec::new();
for index in 1..config.n_validators {
let (pending_response, pending_response_receiver) = oneshot::channel();
let request = RawIncomingRequest {
peer: PeerId::random(),
payload: ChunkFetchingRequest {
candidate_hash: state.backed_candidates()[block_num - 1].hash(),
index: ValidatorIndex(index as u32),
}
.encode(),
pending_response,
};
let peer = env
.authorities()
.validator_authority_id
.get(index)
.expect("all validators have keys");
if env.network().is_peer_connected(peer) &&
env.network().send_request_from_peer(peer, request).is_ok()
{
receivers.push(pending_response_receiver);
}
}
gum::info!(target: LOG_TARGET, "Waiting for all emulated peers to receive their chunk from us ...");
for receiver in receivers.into_iter() {
let response = receiver.await.expect("Chunk is always served succesfully");
// TODO: check if chunk is the one the peer expects to receive.
assert!(response.result.is_ok());
}
let chunk_fetch_duration = Instant::now().sub(chunk_fetch_start_ts).as_millis();
gum::info!(target: LOG_TARGET, "All chunks received in {}ms", chunk_fetch_duration);
let signing_context = SigningContext { session_index: 0, parent_hash: relay_block_hash };
let network = env.network().clone();
let authorities = env.authorities().clone();
let n_validators = config.n_validators;
// Spawn a task that will generate `n_validator` - 1 signed bitfiends and
// send them from the emulated peers to the subsystem.
// TODO: Implement topology.
env.spawn_blocking("send-bitfields", async move {
for index in 1..n_validators {
let validator_public =
authorities.validator_public.get(index).expect("All validator keys are known");
// Node has all the chunks in the world.
let payload: AvailabilityBitfield =
AvailabilityBitfield(bitvec![u8, bitvec::order::Lsb0; 1u8; 32]);
// TODO(soon): Use pre-signed messages. This is quite intensive on the CPU.
let signed_bitfield = Signed::<AvailabilityBitfield>::sign(
&authorities.keyring.keystore(),
payload,
&signing_context,
ValidatorIndex(index as u32),
validator_public,
)
.ok()
.flatten()
.expect("should be signed");
let from_peer = &authorities.validator_authority_id[index];
let message = peer_bitfield_message_v2(relay_block_hash, signed_bitfield);
// Send the action from peer only if it is connected to our node.
if network.is_peer_connected(from_peer) {
let _ = network.send_message_from_peer(from_peer, message);
}
}
});
gum::info!(
"Waiting for {} bitfields to be received and processed",
config.connected_count()
);
// Wait for all bitfields to be processed.
env.wait_until_metric(
"polkadot_parachain_received_availabilty_bitfields_total",
None,
|value| value == (config.connected_count() * block_num) as f64,
)
.await;
gum::info!(target: LOG_TARGET, "All bitfields processed");
let block_time = Instant::now().sub(block_start_ts).as_millis() as u64;
env.metrics().set_block_time(block_time);
gum::info!(target: LOG_TARGET, "All work for block completed in {}", format!("{:?}ms", block_time).cyan());
}
let duration: u128 = test_start.elapsed().as_millis();
gum::info!(target: LOG_TARGET, "All blocks processed in {}", format!("{:?}ms", duration).cyan());
gum::info!(target: LOG_TARGET,
"Avg block time: {}",
format!("{} ms", test_start.elapsed().as_millis() / env.config().num_blocks as u128).red()
);
env.stop().await;
env.collect_resource_usage(
benchmark_name,
&["availability-distribution", "bitfield-distribution", "availability-store"],
)
}
pub fn peer_bitfield_message_v2(
relay_hash: H256,
signed_bitfield: Signed<AvailabilityBitfield>,
) -> VersionedValidationProtocol {
let bitfield = polkadot_node_network_protocol::v2::BitfieldDistributionMessage::Bitfield(
relay_hash,
signed_bitfield.into(),
);
Versioned::V2(polkadot_node_network_protocol::v2::ValidationProtocol::BitfieldDistribution(
bitfield,
))
}
@@ -0,0 +1,243 @@
// Copyright (C) Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! Test configuration definition and helpers.
use crate::keyring::Keyring;
use itertools::Itertools;
use polkadot_primitives::{AssignmentId, AuthorityDiscoveryId, ValidatorId};
use rand::thread_rng;
use rand_distr::{Distribution, Normal, Uniform};
use sc_network::PeerId;
use serde::{Deserialize, Serialize};
use sp_consensus_babe::AuthorityId;
use std::collections::HashMap;
/// Peer networking latency configuration.
#[derive(Clone, Debug, Default, Serialize, Deserialize)]
pub struct PeerLatency {
/// The mean latency(milliseconds) of the peers.
pub mean_latency_ms: usize,
/// The standard deviation
pub std_dev: f64,
}
// Default PoV size in KiB.
fn default_pov_size() -> usize {
5120
}
// Default bandwidth in bytes
fn default_bandwidth() -> usize {
52428800
}
// Default connectivity percentage
fn default_connectivity() -> usize {
100
}
// Default backing group size
fn default_backing_group_size() -> usize {
5
}
// Default needed approvals
fn default_needed_approvals() -> usize {
30
}
fn default_zeroth_delay_tranche_width() -> usize {
0
}
fn default_relay_vrf_modulo_samples() -> usize {
6
}
fn default_n_delay_tranches() -> usize {
89
}
fn default_no_show_slots() -> usize {
3
}
/// The test input parameters
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct TestConfiguration {
/// Number of validators
pub n_validators: usize,
/// Number of cores
pub n_cores: usize,
/// The number of needed votes to approve a candidate.
#[serde(default = "default_needed_approvals")]
pub needed_approvals: usize,
#[serde(default = "default_zeroth_delay_tranche_width")]
pub zeroth_delay_tranche_width: usize,
#[serde(default = "default_relay_vrf_modulo_samples")]
pub relay_vrf_modulo_samples: usize,
#[serde(default = "default_n_delay_tranches")]
pub n_delay_tranches: usize,
#[serde(default = "default_no_show_slots")]
pub no_show_slots: usize,
/// Maximum backing group size
#[serde(default = "default_backing_group_size")]
pub max_validators_per_core: usize,
/// The min PoV size
#[serde(default = "default_pov_size")]
pub min_pov_size: usize,
/// The max PoV size,
#[serde(default = "default_pov_size")]
pub max_pov_size: usize,
/// Randomly sampled pov_sizes
#[serde(skip)]
pub pov_sizes: Vec<usize>,
/// The amount of bandiwdth remote validators have.
#[serde(default = "default_bandwidth")]
pub peer_bandwidth: usize,
/// The amount of bandiwdth our node has.
#[serde(default = "default_bandwidth")]
pub bandwidth: usize,
/// Optional peer emulation latency (round trip time) wrt node under test
#[serde(default)]
pub latency: Option<PeerLatency>,
/// Connectivity ratio, the percentage of peers we are not connected to, but ar part of
/// the topology.
#[serde(default = "default_connectivity")]
pub connectivity: usize,
/// Number of blocks to run the test for
pub num_blocks: usize,
}
impl Default for TestConfiguration {
fn default() -> Self {
Self {
n_validators: Default::default(),
n_cores: Default::default(),
needed_approvals: default_needed_approvals(),
zeroth_delay_tranche_width: default_zeroth_delay_tranche_width(),
relay_vrf_modulo_samples: default_relay_vrf_modulo_samples(),
n_delay_tranches: default_n_delay_tranches(),
no_show_slots: default_no_show_slots(),
max_validators_per_core: default_backing_group_size(),
min_pov_size: default_pov_size(),
max_pov_size: default_pov_size(),
pov_sizes: Default::default(),
peer_bandwidth: default_bandwidth(),
bandwidth: default_bandwidth(),
latency: Default::default(),
connectivity: default_connectivity(),
num_blocks: Default::default(),
}
}
}
impl TestConfiguration {
pub fn generate_pov_sizes(&mut self) {
self.pov_sizes = generate_pov_sizes(self.n_cores, self.min_pov_size, self.max_pov_size);
}
pub fn pov_sizes(&self) -> &[usize] {
&self.pov_sizes
}
/// Return the number of peers connected to our node.
pub fn connected_count(&self) -> usize {
((self.n_validators - 1) as f64 / (100.0 / self.connectivity as f64)) as usize
}
/// Generates the authority keys we need for the network emulation.
pub fn generate_authorities(&self) -> TestAuthorities {
let keyring = Keyring::default();
let key_seeds = (0..self.n_validators)
.map(|peer_index| format!("//Node{}", peer_index))
.collect_vec();
let keys = key_seeds
.iter()
.map(|seed| keyring.sr25519_new(seed.as_str()))
.collect::<Vec<_>>();
// Generate keys and peers ids in each of the format needed by the tests.
let validator_public: Vec<ValidatorId> =
keys.iter().map(|key| (*key).into()).collect::<Vec<_>>();
let validator_authority_id: Vec<AuthorityDiscoveryId> =
keys.iter().map(|key| (*key).into()).collect::<Vec<_>>();
let validator_babe_id: Vec<AuthorityId> =
keys.iter().map(|key| (*key).into()).collect::<Vec<_>>();
let validator_assignment_id: Vec<AssignmentId> =
keys.iter().map(|key| (*key).into()).collect::<Vec<_>>();
let peer_ids: Vec<PeerId> = keys.iter().map(|_| PeerId::random()).collect::<Vec<_>>();
let peer_id_to_authority = peer_ids
.iter()
.zip(validator_authority_id.iter())
.map(|(peer_id, authorithy_id)| (*peer_id, authorithy_id.clone()))
.collect();
TestAuthorities {
keyring,
validator_public,
validator_authority_id,
peer_ids,
validator_babe_id,
validator_assignment_id,
key_seeds,
peer_id_to_authority,
}
}
}
fn random_uniform_sample<T: Into<usize> + From<usize>>(min_value: T, max_value: T) -> T {
Uniform::from(min_value.into()..=max_value.into())
.sample(&mut thread_rng())
.into()
}
fn random_pov_size(min_pov_size: usize, max_pov_size: usize) -> usize {
random_uniform_sample(min_pov_size, max_pov_size)
}
fn generate_pov_sizes(count: usize, min_kib: usize, max_kib: usize) -> Vec<usize> {
(0..count).map(|_| random_pov_size(min_kib * 1024, max_kib * 1024)).collect()
}
/// Helper struct for authority related state.
#[derive(Clone)]
pub struct TestAuthorities {
pub keyring: Keyring,
pub validator_public: Vec<ValidatorId>,
pub validator_authority_id: Vec<AuthorityDiscoveryId>,
pub validator_babe_id: Vec<AuthorityId>,
pub validator_assignment_id: Vec<AssignmentId>,
pub key_seeds: Vec<String>,
pub peer_ids: Vec<PeerId>,
pub peer_id_to_authority: HashMap<PeerId, AuthorityDiscoveryId>,
}
/// Sample latency (in milliseconds) from a normal distribution with parameters
/// specified in `maybe_peer_latency`.
pub fn random_latency(maybe_peer_latency: Option<&PeerLatency>) -> usize {
maybe_peer_latency
.map(|latency_config| {
Normal::new(latency_config.mean_latency_ms as f64, latency_config.std_dev)
.expect("normal distribution parameters are good")
.sample(&mut thread_rng())
})
.unwrap_or(0.0) as usize
}
@@ -0,0 +1,202 @@
// Copyright (C) Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! Display implementations and helper methods for parsing prometheus metrics
//! to a format that can be displayed in the CLI.
//!
//! Currently histogram buckets are skipped.
use crate::configuration::TestConfiguration;
use colored::Colorize;
use prometheus::{
proto::{MetricFamily, MetricType},
Registry,
};
use std::fmt::Display;
const LOG_TARGET: &str = "subsystem-bench::display";
#[derive(Default, Debug)]
pub struct MetricCollection(Vec<TestMetric>);
impl From<Vec<TestMetric>> for MetricCollection {
fn from(metrics: Vec<TestMetric>) -> Self {
MetricCollection(metrics)
}
}
impl MetricCollection {
pub fn all(&self) -> &Vec<TestMetric> {
&self.0
}
/// Sums up all metrics with the given name in the collection
pub fn sum_by(&self, name: &str) -> f64 {
self.all()
.iter()
.filter(|metric| metric.name == name)
.map(|metric| metric.value)
.sum()
}
/// Tells if entries in bucket metric is lower than `value`
pub fn metric_lower_than(&self, metric_name: &str, value: f64) -> bool {
self.sum_by(metric_name) < value
}
pub fn subset_with_label_value(&self, label_name: &str, label_value: &str) -> MetricCollection {
self.0
.iter()
.filter_map(|metric| {
if let Some(index) = metric.label_names.iter().position(|label| label == label_name)
{
if Some(&String::from(label_value)) == metric.label_values.get(index) {
Some(metric.clone())
} else {
None
}
} else {
None
}
})
.collect::<Vec<_>>()
.into()
}
}
impl Display for MetricCollection {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
writeln!(f)?;
let metrics = self.all();
for metric in metrics {
writeln!(f, "{}", metric)?;
}
Ok(())
}
}
#[derive(Debug, Clone)]
pub struct TestMetric {
name: String,
label_names: Vec<String>,
label_values: Vec<String>,
value: f64,
}
impl Display for TestMetric {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"({} = {}) [{:?}, {:?}]",
self.name.cyan(),
format!("{}", self.value).white(),
self.label_names,
self.label_values
)
}
}
// Returns `false` if metric should be skipped.
fn check_metric_family(mf: &MetricFamily) -> bool {
if mf.get_metric().is_empty() {
gum::error!(target: LOG_TARGET, "MetricFamily has no metrics: {:?}", mf);
return false
}
if mf.get_name().is_empty() {
gum::error!(target: LOG_TARGET, "MetricFamily has no name: {:?}", mf);
return false
}
true
}
pub fn parse_metrics(registry: &Registry) -> MetricCollection {
let metric_families = registry.gather();
let mut test_metrics = Vec::new();
for mf in metric_families {
if !check_metric_family(&mf) {
continue
}
let name: String = mf.get_name().into();
let metric_type = mf.get_field_type();
for m in mf.get_metric() {
let (label_names, label_values): (Vec<String>, Vec<String>) = m
.get_label()
.iter()
.map(|pair| (String::from(pair.get_name()), String::from(pair.get_value())))
.unzip();
match metric_type {
MetricType::COUNTER => {
test_metrics.push(TestMetric {
name: name.clone(),
label_names,
label_values,
value: m.get_counter().get_value(),
});
},
MetricType::GAUGE => {
test_metrics.push(TestMetric {
name: name.clone(),
label_names,
label_values,
value: m.get_gauge().get_value(),
});
},
MetricType::HISTOGRAM => {
let h = m.get_histogram();
let h_name = name.clone() + "_sum";
test_metrics.push(TestMetric {
name: h_name,
label_names: label_names.clone(),
label_values: label_values.clone(),
value: h.get_sample_sum(),
});
let h_name = name.clone() + "_count";
test_metrics.push(TestMetric {
name: h_name,
label_names,
label_values,
value: h.get_sample_count() as f64,
});
},
MetricType::SUMMARY => {
unimplemented!();
},
MetricType::UNTYPED => {
unimplemented!();
},
}
}
}
test_metrics.into()
}
impl Display for TestConfiguration {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{}, {}, {}, {}, {}",
format!("n_validators = {}", self.n_validators).blue(),
format!("n_cores = {}", self.n_cores).blue(),
format!("pov_size = {} - {}", self.min_pov_size, self.max_pov_size).bright_black(),
format!("connectivity = {}", self.connectivity).bright_black(),
format!("latency = {:?}", self.latency).bright_black(),
)
}
}
@@ -0,0 +1,413 @@
// Copyright (C) Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! Test environment implementation
use crate::{
configuration::{TestAuthorities, TestConfiguration},
mock::AlwaysSupportsParachains,
network::NetworkEmulatorHandle,
usage::{BenchmarkUsage, ResourceUsage},
};
use core::time::Duration;
use futures::{Future, FutureExt};
use polkadot_node_subsystem::{messages::AllMessages, Overseer, SpawnGlue, TimeoutExt};
use polkadot_node_subsystem_types::Hash;
use polkadot_node_subsystem_util::metrics::prometheus::{
self, Gauge, Histogram, PrometheusError, Registry, U64,
};
use polkadot_overseer::{BlockInfo, Handle as OverseerHandle};
use sc_service::{SpawnTaskHandle, TaskManager};
use std::net::{Ipv4Addr, SocketAddr};
use tokio::runtime::Handle;
const LOG_TARGET: &str = "subsystem-bench::environment";
/// Test environment/configuration metrics
#[derive(Clone)]
pub struct TestEnvironmentMetrics {
/// Number of bytes sent per peer.
n_validators: Gauge<U64>,
/// Number of received sent per peer.
n_cores: Gauge<U64>,
/// PoV size
pov_size: Histogram,
/// Current block
current_block: Gauge<U64>,
/// Current block
block_time: Gauge<U64>,
}
impl TestEnvironmentMetrics {
pub fn new(registry: &Registry) -> Result<Self, PrometheusError> {
let buckets = prometheus::exponential_buckets(16384.0, 2.0, 9)
.expect("arguments are always valid; qed");
Ok(Self {
n_validators: prometheus::register(
Gauge::new(
"subsystem_benchmark_n_validators",
"Total number of validators in the test",
)?,
registry,
)?,
n_cores: prometheus::register(
Gauge::new(
"subsystem_benchmark_n_cores",
"Number of cores we fetch availability for each block",
)?,
registry,
)?,
current_block: prometheus::register(
Gauge::new("subsystem_benchmark_current_block", "The current test block")?,
registry,
)?,
block_time: prometheus::register(
Gauge::new("subsystem_benchmark_block_time", "The time it takes for the target subsystems(s) to complete all the requests in a block")?,
registry,
)?,
pov_size: prometheus::register(
Histogram::with_opts(
prometheus::HistogramOpts::new(
"subsystem_benchmark_pov_size",
"The compressed size of the proof of validity of a candidate",
)
.buckets(buckets),
)?,
registry,
)?,
})
}
pub fn set_n_validators(&self, n_validators: usize) {
self.n_validators.set(n_validators as u64);
}
pub fn set_n_cores(&self, n_cores: usize) {
self.n_cores.set(n_cores as u64);
}
pub fn set_current_block(&self, current_block: usize) {
self.current_block.set(current_block as u64);
}
pub fn set_block_time(&self, block_time_ms: u64) {
self.block_time.set(block_time_ms);
}
pub fn on_pov_size(&self, pov_size: usize) {
self.pov_size.observe(pov_size as f64);
}
}
fn new_runtime() -> tokio::runtime::Runtime {
tokio::runtime::Builder::new_multi_thread()
.thread_name("subsystem-bench")
.enable_all()
.thread_stack_size(3 * 1024 * 1024)
.build()
.unwrap()
}
/// Wrapper for dependencies
pub struct TestEnvironmentDependencies {
pub registry: Registry,
pub task_manager: TaskManager,
pub runtime: tokio::runtime::Runtime,
}
impl Default for TestEnvironmentDependencies {
fn default() -> Self {
let runtime = new_runtime();
let registry = Registry::new();
let task_manager: TaskManager =
TaskManager::new(runtime.handle().clone(), Some(&registry)).unwrap();
Self { runtime, registry, task_manager }
}
}
// A dummy genesis hash
pub const GENESIS_HASH: Hash = Hash::repeat_byte(0xff);
// We use this to bail out sending messages to the subsystem if it is overloaded such that
// the time of flight is breaches 5s.
// This should eventually be a test parameter.
pub const MAX_TIME_OF_FLIGHT: Duration = Duration::from_millis(5000);
/// The test environment is the high level wrapper of all things required to test
/// a certain subsystem.
///
/// ## Mockups
/// The overseer is passed in during construction and it can host an arbitrary number of
/// real subsystems instances and the corresponding mocked instances such that the real
/// subsystems can get their messages answered.
///
/// As the subsystem's performance depends on network connectivity, the test environment
/// emulates validator nodes on the network, see `NetworkEmulator`. The network emulation
/// is configurable in terms of peer bandwidth, latency and connection error rate using
/// uniform distribution sampling.
///
///
/// ## Usage
/// `TestEnvironment` is used in tests to send `Overseer` messages or signals to the subsystem
/// under test.
///
/// ## Collecting test metrics
///
/// ### Prometheus
/// A prometheus endpoint is exposed while the test is running. A local Prometheus instance
/// can scrape it every 1s and a Grafana dashboard is the preferred way of visualizing
/// the performance characteristics of the subsystem.
///
/// ### CLI
/// A subset of the Prometheus metrics are printed at the end of the test.
pub struct TestEnvironment {
/// Test dependencies
dependencies: TestEnvironmentDependencies,
/// A runtime handle
runtime_handle: tokio::runtime::Handle,
/// A handle to the lovely overseer
overseer_handle: OverseerHandle,
/// The test configuration.
config: TestConfiguration,
/// A handle to the network emulator.
network: NetworkEmulatorHandle,
/// Configuration/env metrics
metrics: TestEnvironmentMetrics,
/// Test authorities generated from the configuration.
authorities: TestAuthorities,
}
impl TestEnvironment {
/// Create a new test environment
pub fn new(
dependencies: TestEnvironmentDependencies,
config: TestConfiguration,
network: NetworkEmulatorHandle,
overseer: Overseer<SpawnGlue<SpawnTaskHandle>, AlwaysSupportsParachains>,
overseer_handle: OverseerHandle,
authorities: TestAuthorities,
with_prometheus_endpoint: bool,
) -> Self {
let metrics = TestEnvironmentMetrics::new(&dependencies.registry)
.expect("Metrics need to be registered");
let spawn_handle = dependencies.task_manager.spawn_handle();
spawn_handle.spawn_blocking("overseer", "overseer", overseer.run().boxed());
if with_prometheus_endpoint {
let registry_clone = dependencies.registry.clone();
dependencies.task_manager.spawn_handle().spawn_blocking(
"prometheus",
"test-environment",
async move {
prometheus_endpoint::init_prometheus(
SocketAddr::new(std::net::IpAddr::V4(Ipv4Addr::LOCALHOST), 9999),
registry_clone,
)
.await
.unwrap();
},
);
}
TestEnvironment {
runtime_handle: dependencies.runtime.handle().clone(),
dependencies,
overseer_handle,
config,
network,
metrics,
authorities,
}
}
/// Returns the test configuration.
pub fn config(&self) -> &TestConfiguration {
&self.config
}
/// Returns a reference to the inner network emulator handle.
pub fn network(&self) -> &NetworkEmulatorHandle {
&self.network
}
/// Returns a reference to the overseer handle.
pub fn overseer_handle(&self) -> &OverseerHandle {
&self.overseer_handle
}
/// Returns the Prometheus registry.
pub fn registry(&self) -> &Registry {
&self.dependencies.registry
}
/// Spawn a named task in the `test-environment` task group.
#[allow(unused)]
pub fn spawn(&self, name: &'static str, task: impl Future<Output = ()> + Send + 'static) {
self.dependencies
.task_manager
.spawn_handle()
.spawn(name, "test-environment", task);
}
/// Spawn a blocking named task in the `test-environment` task group.
pub fn spawn_blocking(
&self,
name: &'static str,
task: impl Future<Output = ()> + Send + 'static,
) {
self.dependencies.task_manager.spawn_handle().spawn_blocking(
name,
"test-environment",
task,
);
}
/// Returns a reference to the test environment metrics instance
pub fn metrics(&self) -> &TestEnvironmentMetrics {
&self.metrics
}
/// Returns a handle to the tokio runtime.
pub fn runtime(&self) -> Handle {
self.runtime_handle.clone()
}
/// Returns a reference to the authority keys used in the test.
pub fn authorities(&self) -> &TestAuthorities {
&self.authorities
}
/// Send a message to the subsystem under test environment.
pub async fn send_message(&mut self, msg: AllMessages) {
self.overseer_handle
.send_msg(msg, LOG_TARGET)
.timeout(MAX_TIME_OF_FLIGHT)
.await
.unwrap_or_else(|| {
panic!("{}ms maximum time of flight breached", MAX_TIME_OF_FLIGHT.as_millis())
});
}
/// Send an `ActiveLeavesUpdate` signal to all subsystems under test.
pub async fn import_block(&mut self, block: BlockInfo) {
self.overseer_handle
.block_imported(block)
.timeout(MAX_TIME_OF_FLIGHT)
.await
.unwrap_or_else(|| {
panic!("{}ms maximum time of flight breached", MAX_TIME_OF_FLIGHT.as_millis())
});
}
/// Stop overseer and subsystems.
pub async fn stop(&mut self) {
self.overseer_handle.stop().await;
}
/// Tells if entries in bucket metric is lower than `value`
pub fn metric_lower_than(registry: &Registry, metric_name: &str, value: f64) -> bool {
let test_metrics = super::display::parse_metrics(registry);
test_metrics.metric_lower_than(metric_name, value)
}
/// Blocks until `metric_name` >= `value`
pub async fn wait_until_metric(
&self,
metric_name: &str,
label: Option<(&str, &str)>,
condition: impl Fn(f64) -> bool,
) {
loop {
let test_metrics = if let Some((label_name, label_value)) = label {
super::display::parse_metrics(self.registry())
.subset_with_label_value(label_name, label_value)
} else {
super::display::parse_metrics(self.registry())
};
let current_value = test_metrics.sum_by(metric_name);
gum::debug!(target: LOG_TARGET, metric_name, current_value, "Waiting for metric");
if condition(current_value) {
break
}
// Check value every 50ms.
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
}
}
pub fn collect_resource_usage(
&self,
benchmark_name: &str,
subsystems_under_test: &[&str],
) -> BenchmarkUsage {
BenchmarkUsage {
benchmark_name: benchmark_name.to_string(),
network_usage: self.network_usage(),
cpu_usage: self.cpu_usage(subsystems_under_test),
}
}
fn network_usage(&self) -> Vec<ResourceUsage> {
let stats = self.network().peer_stats(0);
let total_node_received = (stats.received() / 1024) as f64;
let total_node_sent = (stats.sent() / 1024) as f64;
let num_blocks = self.config().num_blocks as f64;
vec![
ResourceUsage {
resource_name: "Received from peers".to_string(),
total: total_node_received,
per_block: total_node_received / num_blocks,
},
ResourceUsage {
resource_name: "Sent to peers".to_string(),
total: total_node_sent,
per_block: total_node_sent / num_blocks,
},
]
}
fn cpu_usage(&self, subsystems_under_test: &[&str]) -> Vec<ResourceUsage> {
let test_metrics = super::display::parse_metrics(self.registry());
let mut usage = vec![];
let num_blocks = self.config().num_blocks as f64;
for subsystem in subsystems_under_test.iter() {
let subsystem_cpu_metrics =
test_metrics.subset_with_label_value("task_group", subsystem);
let total_cpu = subsystem_cpu_metrics.sum_by("substrate_tasks_polling_duration_sum");
usage.push(ResourceUsage {
resource_name: subsystem.to_string(),
total: total_cpu,
per_block: total_cpu / num_blocks,
});
}
let test_env_cpu_metrics =
test_metrics.subset_with_label_value("task_group", "test-environment");
let total_cpu = test_env_cpu_metrics.sum_by("substrate_tasks_polling_duration_sum");
usage.push(ResourceUsage {
resource_name: "Test environment".to_string(),
total: total_cpu,
per_block: total_cpu / num_blocks,
});
usage
}
}
@@ -0,0 +1,50 @@
// Copyright (C) Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use polkadot_primitives::ValidatorId;
use sc_keystore::LocalKeystore;
use sp_application_crypto::AppCrypto;
use sp_core::sr25519::Public;
use sp_keystore::Keystore;
use std::sync::Arc;
/// Set of test accounts generated and kept safe by a keystore.
#[derive(Clone)]
pub struct Keyring {
keystore: Arc<LocalKeystore>,
}
impl Default for Keyring {
fn default() -> Self {
Self { keystore: Arc::new(LocalKeystore::in_memory()) }
}
}
impl Keyring {
pub fn sr25519_new(&self, seed: &str) -> Public {
self.keystore
.sr25519_generate_new(ValidatorId::ID, Some(seed))
.expect("Insert key into keystore")
}
pub fn keystore(&self) -> Arc<dyn Keystore> {
self.keystore.clone()
}
pub fn keystore_ref(&self) -> &LocalKeystore {
self.keystore.as_ref()
}
}
@@ -0,0 +1,28 @@
// Copyright (C) Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
// The validator index that represent the node that is under test.
pub const NODE_UNDER_TEST: u32 = 0;
pub mod approval;
pub mod availability;
pub mod configuration;
pub(crate) mod display;
pub(crate) mod environment;
pub(crate) mod keyring;
pub(crate) mod mock;
pub(crate) mod network;
pub mod usage;
@@ -0,0 +1,211 @@
// Copyright (C) Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! A generic av store subsystem mockup suitable to be used in benchmarks.
use crate::network::{HandleNetworkMessage, NetworkMessage};
use futures::{channel::oneshot, FutureExt};
use parity_scale_codec::Encode;
use polkadot_node_network_protocol::request_response::{
v1::{AvailableDataFetchingResponse, ChunkFetchingResponse, ChunkResponse},
Requests,
};
use polkadot_node_primitives::{AvailableData, ErasureChunk};
use polkadot_node_subsystem::{
messages::AvailabilityStoreMessage, overseer, SpawnedSubsystem, SubsystemError,
};
use polkadot_node_subsystem_types::OverseerSignal;
use polkadot_primitives::CandidateHash;
use sc_network::ProtocolName;
use std::collections::HashMap;
pub struct AvailabilityStoreState {
candidate_hashes: HashMap<CandidateHash, usize>,
chunks: Vec<Vec<ErasureChunk>>,
}
const LOG_TARGET: &str = "subsystem-bench::av-store-mock";
/// Mockup helper. Contains Ccunks and full availability data of all parachain blocks
/// used in a test.
pub struct NetworkAvailabilityState {
pub candidate_hashes: HashMap<CandidateHash, usize>,
pub available_data: Vec<AvailableData>,
pub chunks: Vec<Vec<ErasureChunk>>,
}
// Implement access to the state.
impl HandleNetworkMessage for NetworkAvailabilityState {
fn handle(
&self,
message: NetworkMessage,
_node_sender: &mut futures::channel::mpsc::UnboundedSender<NetworkMessage>,
) -> Option<NetworkMessage> {
match message {
NetworkMessage::RequestFromNode(peer, request) => match request {
Requests::ChunkFetchingV1(outgoing_request) => {
gum::debug!(target: LOG_TARGET, request = ?outgoing_request, "Received `RequestFromNode`");
let validator_index: usize = outgoing_request.payload.index.0 as usize;
let candidate_hash = outgoing_request.payload.candidate_hash;
let candidate_index = self
.candidate_hashes
.get(&candidate_hash)
.expect("candidate was generated previously; qed");
gum::warn!(target: LOG_TARGET, ?candidate_hash, candidate_index, "Candidate mapped to index");
let chunk: ChunkResponse =
self.chunks.get(*candidate_index).unwrap()[validator_index].clone().into();
let response = Ok((
ChunkFetchingResponse::from(Some(chunk)).encode(),
ProtocolName::Static("dummy"),
));
if let Err(err) = outgoing_request.pending_response.send(response) {
gum::error!(target: LOG_TARGET, ?err, "Failed to send `ChunkFetchingResponse`");
}
None
},
Requests::AvailableDataFetchingV1(outgoing_request) => {
let candidate_hash = outgoing_request.payload.candidate_hash;
let candidate_index = self
.candidate_hashes
.get(&candidate_hash)
.expect("candidate was generated previously; qed");
gum::debug!(target: LOG_TARGET, ?candidate_hash, candidate_index, "Candidate mapped to index");
let available_data = self.available_data.get(*candidate_index).unwrap().clone();
let response = Ok((
AvailableDataFetchingResponse::from(Some(available_data)).encode(),
ProtocolName::Static("dummy"),
));
outgoing_request
.pending_response
.send(response)
.expect("Response is always sent succesfully");
None
},
_ => Some(NetworkMessage::RequestFromNode(peer, request)),
},
message => Some(message),
}
}
}
/// A mock of the availability store subsystem. This one also generates all the
/// candidates that a
pub struct MockAvailabilityStore {
state: AvailabilityStoreState,
}
impl MockAvailabilityStore {
pub fn new(
chunks: Vec<Vec<ErasureChunk>>,
candidate_hashes: HashMap<CandidateHash, usize>,
) -> MockAvailabilityStore {
Self { state: AvailabilityStoreState { chunks, candidate_hashes } }
}
async fn respond_to_query_all_request(
&self,
candidate_hash: CandidateHash,
send_chunk: impl Fn(usize) -> bool,
tx: oneshot::Sender<Vec<ErasureChunk>>,
) {
let candidate_index = self
.state
.candidate_hashes
.get(&candidate_hash)
.expect("candidate was generated previously; qed");
gum::debug!(target: LOG_TARGET, ?candidate_hash, candidate_index, "Candidate mapped to index");
let v = self
.state
.chunks
.get(*candidate_index)
.unwrap()
.iter()
.filter(|c| send_chunk(c.index.0 as usize))
.cloned()
.collect();
let _ = tx.send(v);
}
}
#[overseer::subsystem(AvailabilityStore, error=SubsystemError, prefix=self::overseer)]
impl<Context> MockAvailabilityStore {
fn start(self, ctx: Context) -> SpawnedSubsystem {
let future = self.run(ctx).map(|_| Ok(())).boxed();
SpawnedSubsystem { name: "test-environment", future }
}
}
#[overseer::contextbounds(AvailabilityStore, prefix = self::overseer)]
impl MockAvailabilityStore {
async fn run<Context>(self, mut ctx: Context) {
gum::debug!(target: LOG_TARGET, "Subsystem running");
loop {
let msg = ctx.recv().await.expect("Overseer never fails us");
match msg {
orchestra::FromOrchestra::Signal(signal) =>
if signal == OverseerSignal::Conclude {
return
},
orchestra::FromOrchestra::Communication { msg } => match msg {
AvailabilityStoreMessage::QueryAvailableData(candidate_hash, tx) => {
gum::debug!(target: LOG_TARGET, candidate_hash = ?candidate_hash, "Responding to QueryAvailableData");
// We never have the full available data.
let _ = tx.send(None);
},
AvailabilityStoreMessage::QueryAllChunks(candidate_hash, tx) => {
// We always have our own chunk.
gum::debug!(target: LOG_TARGET, candidate_hash = ?candidate_hash, "Responding to QueryAllChunks");
self.respond_to_query_all_request(candidate_hash, |index| index == 0, tx)
.await;
},
AvailabilityStoreMessage::QueryChunkSize(candidate_hash, tx) => {
gum::debug!(target: LOG_TARGET, candidate_hash = ?candidate_hash, "Responding to QueryChunkSize");
let candidate_index = self
.state
.candidate_hashes
.get(&candidate_hash)
.expect("candidate was generated previously; qed");
gum::debug!(target: LOG_TARGET, ?candidate_hash, candidate_index, "Candidate mapped to index");
let chunk_size =
self.state.chunks.get(*candidate_index).unwrap()[0].encoded_size();
let _ = tx.send(Some(chunk_size));
},
AvailabilityStoreMessage::StoreChunk { candidate_hash, chunk, tx } => {
gum::debug!(target: LOG_TARGET, chunk_index = ?chunk.index ,candidate_hash = ?candidate_hash, "Responding to StoreChunk");
let _ = tx.send(Ok(()));
},
_ => {
unimplemented!("Unexpected av-store message")
},
},
}
}
}
}
@@ -0,0 +1,132 @@
// Copyright (C) Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! A generic runtime api subsystem mockup suitable to be used in benchmarks.
use futures::FutureExt;
use itertools::Itertools;
use polkadot_node_subsystem::{
messages::ChainApiMessage, overseer, SpawnedSubsystem, SubsystemError,
};
use polkadot_node_subsystem_types::OverseerSignal;
use polkadot_primitives::Header;
use sp_core::H256;
use std::collections::HashMap;
const LOG_TARGET: &str = "subsystem-bench::chain-api-mock";
/// State used to respond to `BlockHeader` requests.
pub struct ChainApiState {
pub block_headers: HashMap<H256, Header>,
}
pub struct MockChainApi {
state: ChainApiState,
}
impl ChainApiState {
fn get_header_by_number(&self, requested_number: u32) -> Option<&Header> {
self.block_headers.values().find(|header| header.number == requested_number)
}
}
impl MockChainApi {
pub fn new(state: ChainApiState) -> MockChainApi {
Self { state }
}
}
#[overseer::subsystem(ChainApi, error=SubsystemError, prefix=self::overseer)]
impl<Context> MockChainApi {
fn start(self, ctx: Context) -> SpawnedSubsystem {
let future = self.run(ctx).map(|_| Ok(())).boxed();
SpawnedSubsystem { name: "test-environment", future }
}
}
#[overseer::contextbounds(ChainApi, prefix = self::overseer)]
impl MockChainApi {
async fn run<Context>(self, mut ctx: Context) {
loop {
let msg = ctx.recv().await.expect("Overseer never fails us");
match msg {
orchestra::FromOrchestra::Signal(signal) =>
if signal == OverseerSignal::Conclude {
return
},
orchestra::FromOrchestra::Communication { msg } => {
gum::debug!(target: LOG_TARGET, msg=?msg, "recv message");
match msg {
ChainApiMessage::BlockHeader(hash, response_channel) => {
let _ = response_channel.send(Ok(Some(
self.state
.block_headers
.get(&hash)
.cloned()
.expect("Relay chain block hashes are known"),
)));
},
ChainApiMessage::FinalizedBlockNumber(val) => {
val.send(Ok(0)).unwrap();
},
ChainApiMessage::FinalizedBlockHash(requested_number, sender) => {
let hash = self
.state
.get_header_by_number(requested_number)
.expect("Unknow block number")
.hash();
sender.send(Ok(Some(hash))).unwrap();
},
ChainApiMessage::BlockNumber(requested_hash, sender) => {
sender
.send(Ok(Some(
self.state
.block_headers
.get(&requested_hash)
.expect("Unknown block hash")
.number,
)))
.unwrap();
},
ChainApiMessage::Ancestors { hash, k: _, response_channel } => {
let block_number = self
.state
.block_headers
.get(&hash)
.expect("Unknown block hash")
.number;
let ancestors = self
.state
.block_headers
.iter()
.filter(|(_, header)| header.number < block_number)
.sorted_by(|a, b| a.1.number.cmp(&b.1.number))
.map(|(hash, _)| *hash)
.collect_vec();
response_channel.send(Ok(ancestors)).unwrap();
},
_ => {
unimplemented!("Unexpected chain-api message")
},
}
},
}
}
}
}
@@ -0,0 +1,100 @@
// Copyright (C) Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! Dummy subsystem mocks.
use futures::FutureExt;
use paste::paste;
use polkadot_node_subsystem::{overseer, SpawnedSubsystem, SubsystemError};
use std::time::Duration;
use tokio::time::sleep;
const LOG_TARGET: &str = "subsystem-bench::mockery";
macro_rules! mock {
// Just query by relay parent
($subsystem_name:ident) => {
paste! {
pub struct [<Mock $subsystem_name >] {}
#[overseer::subsystem($subsystem_name, error=SubsystemError, prefix=self::overseer)]
impl<Context> [<Mock $subsystem_name >] {
fn start(self, ctx: Context) -> SpawnedSubsystem {
let future = self.run(ctx).map(|_| Ok(())).boxed();
// The name will appear in substrate CPU task metrics as `task_group`.`
SpawnedSubsystem { name: "test-environment", future }
}
}
#[overseer::contextbounds($subsystem_name, prefix = self::overseer)]
impl [<Mock $subsystem_name >] {
async fn run<Context>(self, mut ctx: Context) {
let mut count_total_msg = 0;
loop {
futures::select!{
msg = ctx.recv().fuse() => {
match msg.unwrap() {
orchestra::FromOrchestra::Signal(signal) => {
match signal {
polkadot_node_subsystem_types::OverseerSignal::Conclude => {return},
_ => {}
}
},
orchestra::FromOrchestra::Communication { msg } => {
gum::debug!(target: LOG_TARGET, msg = ?msg, "mocked subsystem received message");
}
}
count_total_msg +=1;
}
_ = sleep(Duration::from_secs(6)).fuse() => {
if count_total_msg > 0 {
gum::trace!(target: LOG_TARGET, "Subsystem {} processed {} messages since last time", stringify!($subsystem_name), count_total_msg);
}
count_total_msg = 0;
}
}
}
}
}
}
};
}
// Generate dummy implementation for all subsystems
mock!(AvailabilityStore);
mock!(StatementDistribution);
mock!(BitfieldSigning);
mock!(BitfieldDistribution);
mock!(Provisioner);
mock!(NetworkBridgeRx);
mock!(CollationGeneration);
mock!(CollatorProtocol);
mock!(GossipSupport);
mock!(DisputeDistribution);
mock!(DisputeCoordinator);
mock!(ProspectiveParachains);
mock!(PvfChecker);
mock!(CandidateBacking);
mock!(AvailabilityDistribution);
mock!(CandidateValidation);
mock!(AvailabilityRecovery);
mock!(NetworkBridgeTx);
mock!(ChainApi);
mock!(ChainSelection);
mock!(ApprovalVoting);
mock!(ApprovalDistribution);
mock!(RuntimeApi);
@@ -0,0 +1,88 @@
// Copyright (C) Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use polkadot_node_subsystem::HeadSupportsParachains;
use polkadot_node_subsystem_types::Hash;
use sp_consensus::SyncOracle;
pub mod av_store;
pub mod chain_api;
pub mod dummy;
pub mod network_bridge;
pub mod runtime_api;
pub struct AlwaysSupportsParachains {}
#[async_trait::async_trait]
impl HeadSupportsParachains for AlwaysSupportsParachains {
async fn head_supports_parachains(&self, _head: &Hash) -> bool {
true
}
}
// An orchestra with dummy subsystems
#[macro_export]
macro_rules! dummy_builder {
($spawn_task_handle: ident, $metrics: ident) => {{
use $crate::mock::dummy::*;
// Initialize a mock overseer.
// All subsystem except approval_voting and approval_distribution are mock subsystems.
Overseer::builder()
.approval_voting(MockApprovalVoting {})
.approval_distribution(MockApprovalDistribution {})
.availability_recovery(MockAvailabilityRecovery {})
.candidate_validation(MockCandidateValidation {})
.chain_api(MockChainApi {})
.chain_selection(MockChainSelection {})
.dispute_coordinator(MockDisputeCoordinator {})
.runtime_api(MockRuntimeApi {})
.network_bridge_tx(MockNetworkBridgeTx {})
.availability_distribution(MockAvailabilityDistribution {})
.availability_store(MockAvailabilityStore {})
.pvf_checker(MockPvfChecker {})
.candidate_backing(MockCandidateBacking {})
.statement_distribution(MockStatementDistribution {})
.bitfield_signing(MockBitfieldSigning {})
.bitfield_distribution(MockBitfieldDistribution {})
.provisioner(MockProvisioner {})
.network_bridge_rx(MockNetworkBridgeRx {})
.collation_generation(MockCollationGeneration {})
.collator_protocol(MockCollatorProtocol {})
.gossip_support(MockGossipSupport {})
.dispute_distribution(MockDisputeDistribution {})
.prospective_parachains(MockProspectiveParachains {})
.activation_external_listeners(Default::default())
.span_per_active_leaf(Default::default())
.active_leaves(Default::default())
.metrics($metrics)
.supports_parachains(AlwaysSupportsParachains {})
.spawner(SpawnGlue($spawn_task_handle))
}};
}
#[derive(Clone)]
pub struct TestSyncOracle {}
impl SyncOracle for TestSyncOracle {
fn is_major_syncing(&self) -> bool {
false
}
fn is_offline(&self) -> bool {
unimplemented!("not used by subsystem benchmarks")
}
}
@@ -0,0 +1,210 @@
// Copyright (C) Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! Mocked `network-bridge` subsystems that uses a `NetworkInterface` to access
//! the emulated network.
use crate::{
configuration::TestAuthorities,
network::{NetworkEmulatorHandle, NetworkInterfaceReceiver, NetworkMessage, RequestExt},
};
use futures::{channel::mpsc::UnboundedSender, FutureExt, StreamExt};
use polkadot_node_network_protocol::Versioned;
use polkadot_node_subsystem::{
messages::NetworkBridgeTxMessage, overseer, SpawnedSubsystem, SubsystemError,
};
use polkadot_node_subsystem_types::{
messages::{ApprovalDistributionMessage, BitfieldDistributionMessage, NetworkBridgeEvent},
OverseerSignal,
};
use sc_network::{request_responses::ProtocolConfig, RequestFailure};
const LOG_TARGET: &str = "subsystem-bench::network-bridge";
const CHUNK_REQ_PROTOCOL_NAME_V1: &str =
"/ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff/req_chunk/1";
/// A mock of the network bridge tx subsystem.
pub struct MockNetworkBridgeTx {
/// A network emulator handle
network: NetworkEmulatorHandle,
/// A channel to the network interface,
to_network_interface: UnboundedSender<NetworkMessage>,
/// Test authorithies
test_authorithies: TestAuthorities,
}
/// A mock of the network bridge tx subsystem.
pub struct MockNetworkBridgeRx {
/// A network interface receiver
network_receiver: NetworkInterfaceReceiver,
/// Chunk request sender
chunk_request_sender: Option<ProtocolConfig>,
}
impl MockNetworkBridgeTx {
pub fn new(
network: NetworkEmulatorHandle,
to_network_interface: UnboundedSender<NetworkMessage>,
test_authorithies: TestAuthorities,
) -> MockNetworkBridgeTx {
Self { network, to_network_interface, test_authorithies }
}
}
impl MockNetworkBridgeRx {
pub fn new(
network_receiver: NetworkInterfaceReceiver,
chunk_request_sender: Option<ProtocolConfig>,
) -> MockNetworkBridgeRx {
Self { network_receiver, chunk_request_sender }
}
}
#[overseer::subsystem(NetworkBridgeTx, error=SubsystemError, prefix=self::overseer)]
impl<Context> MockNetworkBridgeTx {
fn start(self, ctx: Context) -> SpawnedSubsystem {
let future = self.run(ctx).map(|_| Ok(())).boxed();
SpawnedSubsystem { name: "network-bridge-tx", future }
}
}
#[overseer::subsystem(NetworkBridgeRx, error=SubsystemError, prefix=self::overseer)]
impl<Context> MockNetworkBridgeRx {
fn start(self, ctx: Context) -> SpawnedSubsystem {
let future = self.run(ctx).map(|_| Ok(())).boxed();
SpawnedSubsystem { name: "network-bridge-rx", future }
}
}
#[overseer::contextbounds(NetworkBridgeTx, prefix = self::overseer)]
impl MockNetworkBridgeTx {
async fn run<Context>(self, mut ctx: Context) {
// Main subsystem loop.
loop {
let subsystem_message = ctx.recv().await.expect("Overseer never fails us");
match subsystem_message {
orchestra::FromOrchestra::Signal(signal) =>
if signal == OverseerSignal::Conclude {
return
},
orchestra::FromOrchestra::Communication { msg } => match msg {
NetworkBridgeTxMessage::SendRequests(requests, _if_disconnected) => {
for request in requests {
gum::debug!(target: LOG_TARGET, request = ?request, "Processing request");
let peer_id =
request.authority_id().expect("all nodes are authorities").clone();
if !self.network.is_peer_connected(&peer_id) {
// Attempting to send a request to a disconnected peer.
request
.into_response_sender()
.send(Err(RequestFailure::NotConnected))
.expect("send never fails");
continue
}
let peer_message =
NetworkMessage::RequestFromNode(peer_id.clone(), request);
let _ = self.to_network_interface.unbounded_send(peer_message);
}
},
NetworkBridgeTxMessage::ReportPeer(_) => {
// ingore rep changes
},
NetworkBridgeTxMessage::SendValidationMessage(peers, message) => {
for peer in peers {
self.to_network_interface
.unbounded_send(NetworkMessage::MessageFromNode(
self.test_authorithies
.peer_id_to_authority
.get(&peer)
.unwrap()
.clone(),
message.clone(),
))
.expect("Should not fail");
}
},
_ => unimplemented!("Unexpected network bridge message"),
},
}
}
}
}
#[overseer::contextbounds(NetworkBridgeRx, prefix = self::overseer)]
impl MockNetworkBridgeRx {
async fn run<Context>(mut self, mut ctx: Context) {
// Main subsystem loop.
let mut from_network_interface = self.network_receiver.0;
loop {
futures::select! {
maybe_peer_message = from_network_interface.next() => {
if let Some(message) = maybe_peer_message {
match message {
NetworkMessage::MessageFromPeer(peer_id, message) => match message {
Versioned::V2(
polkadot_node_network_protocol::v2::ValidationProtocol::BitfieldDistribution(
bitfield,
),
) => {
ctx.send_message(
BitfieldDistributionMessage::NetworkBridgeUpdate(NetworkBridgeEvent::PeerMessage(peer_id, polkadot_node_network_protocol::Versioned::V2(bitfield)))
).await;
},
Versioned::V3(
polkadot_node_network_protocol::v3::ValidationProtocol::ApprovalDistribution(msg)
) => {
ctx.send_message(
ApprovalDistributionMessage::NetworkBridgeUpdate(NetworkBridgeEvent::PeerMessage(peer_id, polkadot_node_network_protocol::Versioned::V3(msg)))
).await;
}
_ => {
unimplemented!("We only talk v2 network protocol")
},
},
NetworkMessage::RequestFromPeer(request) => {
if let Some(protocol) = self.chunk_request_sender.as_mut() {
assert_eq!(&*protocol.name, CHUNK_REQ_PROTOCOL_NAME_V1);
if let Some(inbound_queue) = protocol.inbound_queue.as_ref() {
inbound_queue
.send(request)
.await
.expect("Forwarding requests to subsystem never fails");
}
}
},
_ => {
panic!("NetworkMessage::RequestFromNode is not expected to be received from a peer")
}
}
}
},
subsystem_message = ctx.recv().fuse() => {
match subsystem_message.expect("Overseer never fails us") {
orchestra::FromOrchestra::Signal(signal) => if signal == OverseerSignal::Conclude { return },
_ => {
unimplemented!("Unexpected network bridge rx message")
},
}
}
}
}
}
}
@@ -0,0 +1,233 @@
// Copyright (C) Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! A generic runtime api subsystem mockup suitable to be used in benchmarks.
use crate::configuration::{TestAuthorities, TestConfiguration};
use bitvec::prelude::BitVec;
use futures::FutureExt;
use itertools::Itertools;
use polkadot_node_subsystem::{
messages::{RuntimeApiMessage, RuntimeApiRequest},
overseer, SpawnedSubsystem, SubsystemError,
};
use polkadot_node_subsystem_types::OverseerSignal;
use polkadot_primitives::{
vstaging::NodeFeatures, CandidateEvent, CandidateReceipt, CoreState, GroupIndex, IndexedVec,
OccupiedCore, SessionIndex, SessionInfo, ValidatorIndex,
};
use sp_consensus_babe::Epoch as BabeEpoch;
use sp_core::H256;
use std::collections::HashMap;
const LOG_TARGET: &str = "subsystem-bench::runtime-api-mock";
/// Minimal state to answer requests.
pub struct RuntimeApiState {
// All authorities in the test,
authorities: TestAuthorities,
// Candidate hashes per block
candidate_hashes: HashMap<H256, Vec<CandidateReceipt>>,
// Included candidates per bock
included_candidates: HashMap<H256, Vec<CandidateEvent>>,
babe_epoch: Option<BabeEpoch>,
// The session child index,
session_index: SessionIndex,
}
/// A mocked `runtime-api` subsystem.
pub struct MockRuntimeApi {
state: RuntimeApiState,
config: TestConfiguration,
}
impl MockRuntimeApi {
pub fn new(
config: TestConfiguration,
authorities: TestAuthorities,
candidate_hashes: HashMap<H256, Vec<CandidateReceipt>>,
included_candidates: HashMap<H256, Vec<CandidateEvent>>,
babe_epoch: Option<BabeEpoch>,
session_index: SessionIndex,
) -> MockRuntimeApi {
Self {
state: RuntimeApiState {
authorities,
candidate_hashes,
included_candidates,
babe_epoch,
session_index,
},
config,
}
}
fn session_info(&self) -> SessionInfo {
session_info_for_peers(&self.config, &self.state.authorities)
}
}
/// Generates a test session info with all passed authorities as consensus validators.
pub fn session_info_for_peers(
configuration: &TestConfiguration,
authorities: &TestAuthorities,
) -> SessionInfo {
let all_validators = (0..configuration.n_validators)
.map(|i| ValidatorIndex(i as _))
.collect::<Vec<_>>();
let validator_groups = all_validators
.chunks(configuration.max_validators_per_core)
.map(Vec::from)
.collect::<Vec<_>>();
SessionInfo {
validators: authorities.validator_public.iter().cloned().collect(),
discovery_keys: authorities.validator_authority_id.to_vec(),
assignment_keys: authorities.validator_assignment_id.to_vec(),
validator_groups: IndexedVec::<GroupIndex, Vec<ValidatorIndex>>::from(validator_groups),
n_cores: configuration.n_cores as u32,
needed_approvals: configuration.needed_approvals as u32,
zeroth_delay_tranche_width: configuration.zeroth_delay_tranche_width as u32,
relay_vrf_modulo_samples: configuration.relay_vrf_modulo_samples as u32,
n_delay_tranches: configuration.n_delay_tranches as u32,
no_show_slots: configuration.no_show_slots as u32,
active_validator_indices: (0..authorities.validator_authority_id.len())
.map(|index| ValidatorIndex(index as u32))
.collect_vec(),
dispute_period: 6,
random_seed: [0u8; 32],
}
}
#[overseer::subsystem(RuntimeApi, error=SubsystemError, prefix=self::overseer)]
impl<Context> MockRuntimeApi {
fn start(self, ctx: Context) -> SpawnedSubsystem {
let future = self.run(ctx).map(|_| Ok(())).boxed();
SpawnedSubsystem { name: "test-environment", future }
}
}
#[overseer::contextbounds(RuntimeApi, prefix = self::overseer)]
impl MockRuntimeApi {
async fn run<Context>(self, mut ctx: Context) {
let validator_group_count = self.session_info().validator_groups.len();
loop {
let msg = ctx.recv().await.expect("Overseer never fails us");
match msg {
orchestra::FromOrchestra::Signal(signal) =>
if signal == OverseerSignal::Conclude {
return
},
orchestra::FromOrchestra::Communication { msg } => {
gum::debug!(target: LOG_TARGET, msg=?msg, "recv message");
match msg {
RuntimeApiMessage::Request(
request,
RuntimeApiRequest::CandidateEvents(sender),
) => {
let candidate_events = self.state.included_candidates.get(&request);
let _ = sender.send(Ok(candidate_events.cloned().unwrap_or_default()));
},
RuntimeApiMessage::Request(
_block_hash,
RuntimeApiRequest::SessionInfo(_session_index, sender),
) => {
let _ = sender.send(Ok(Some(self.session_info())));
},
RuntimeApiMessage::Request(
_block_hash,
RuntimeApiRequest::SessionExecutorParams(_session_index, sender),
) => {
let _ = sender.send(Ok(Some(Default::default())));
},
RuntimeApiMessage::Request(
_request,
RuntimeApiRequest::NodeFeatures(_session_index, sender),
) => {
let _ = sender.send(Ok(NodeFeatures::EMPTY));
},
RuntimeApiMessage::Request(
_block_hash,
RuntimeApiRequest::Validators(sender),
) => {
let _ =
sender.send(Ok(self.state.authorities.validator_public.clone()));
},
RuntimeApiMessage::Request(
_block_hash,
RuntimeApiRequest::SessionIndexForChild(sender),
) => {
// Session is always the same.
let _ = sender.send(Ok(self.state.session_index));
},
RuntimeApiMessage::Request(
block_hash,
RuntimeApiRequest::AvailabilityCores(sender),
) => {
let candidate_hashes = self
.state
.candidate_hashes
.get(&block_hash)
.expect("Relay chain block hashes are generated at test start");
// All cores are always occupied.
let cores = candidate_hashes
.iter()
.enumerate()
.map(|(index, candidate_receipt)| {
// Ensure test breaks if badly configured.
assert!(index < validator_group_count);
CoreState::Occupied(OccupiedCore {
next_up_on_available: None,
occupied_since: 0,
time_out_at: 0,
next_up_on_time_out: None,
availability: BitVec::default(),
group_responsible: GroupIndex(index as u32),
candidate_hash: candidate_receipt.hash(),
candidate_descriptor: candidate_receipt.descriptor.clone(),
})
})
.collect::<Vec<_>>();
let _ = sender.send(Ok(cores));
},
RuntimeApiMessage::Request(
_request,
RuntimeApiRequest::CurrentBabeEpoch(sender),
) => {
let _ = sender.send(Ok(self
.state
.babe_epoch
.clone()
.expect("Babe epoch unpopulated")));
},
// Long term TODO: implement more as needed.
message => {
unimplemented!("Unexpected runtime-api message: {:?}", message)
},
}
},
}
}
}
}
File diff suppressed because it is too large Load Diff
@@ -0,0 +1,146 @@
// Copyright (C) Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! Test usage implementation
use colored::Colorize;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
#[derive(Debug, Serialize, Deserialize)]
pub struct BenchmarkUsage {
pub benchmark_name: String,
pub network_usage: Vec<ResourceUsage>,
pub cpu_usage: Vec<ResourceUsage>,
}
impl std::fmt::Display for BenchmarkUsage {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(
f,
"\n{}\n\n{}\n{}\n\n{}\n{}\n",
self.benchmark_name.purple(),
format!("{:<32}{:>12}{:>12}", "Network usage, KiB", "total", "per block").blue(),
self.network_usage
.iter()
.map(|v| v.to_string())
.collect::<Vec<String>>()
.join("\n"),
format!("{:<32}{:>12}{:>12}", "CPU usage, seconds", "total", "per block").blue(),
self.cpu_usage.iter().map(|v| v.to_string()).collect::<Vec<String>>().join("\n")
)
}
}
impl BenchmarkUsage {
pub fn average(usages: &[Self]) -> Self {
let all_network_usages: Vec<&ResourceUsage> =
usages.iter().flat_map(|v| &v.network_usage).collect();
let all_cpu_usage: Vec<&ResourceUsage> = usages.iter().flat_map(|v| &v.cpu_usage).collect();
Self {
benchmark_name: usages.first().map(|v| v.benchmark_name.clone()).unwrap_or_default(),
network_usage: ResourceUsage::average_by_resource_name(&all_network_usages),
cpu_usage: ResourceUsage::average_by_resource_name(&all_cpu_usage),
}
}
pub fn check_network_usage(&self, checks: &[ResourceUsageCheck]) -> Vec<String> {
check_usage(&self.benchmark_name, &self.network_usage, checks)
}
pub fn check_cpu_usage(&self, checks: &[ResourceUsageCheck]) -> Vec<String> {
check_usage(&self.benchmark_name, &self.cpu_usage, checks)
}
pub fn cpu_usage_diff(&self, other: &Self, resource_name: &str) -> Option<f64> {
let self_res = self.cpu_usage.iter().find(|v| v.resource_name == resource_name);
let other_res = other.cpu_usage.iter().find(|v| v.resource_name == resource_name);
match (self_res, other_res) {
(Some(self_res), Some(other_res)) => Some(self_res.diff(other_res)),
_ => None,
}
}
}
fn check_usage(
benchmark_name: &str,
usage: &[ResourceUsage],
checks: &[ResourceUsageCheck],
) -> Vec<String> {
checks
.iter()
.filter_map(|check| {
check_resource_usage(usage, check)
.map(|message| format!("{}: {}", benchmark_name, message))
})
.collect()
}
fn check_resource_usage(
usage: &[ResourceUsage],
(resource_name, base, precision): &ResourceUsageCheck,
) -> Option<String> {
if let Some(usage) = usage.iter().find(|v| v.resource_name == *resource_name) {
let diff = (base - usage.per_block).abs() / base;
if diff < *precision {
None
} else {
Some(format!(
"The resource `{}` is expected to be equal to {} with a precision {}, but the current value is {}",
resource_name, base, precision, usage.per_block
))
}
} else {
Some(format!("The resource `{}` is not found", resource_name))
}
}
#[derive(Debug, Serialize, Deserialize)]
pub struct ResourceUsage {
pub resource_name: String,
pub total: f64,
pub per_block: f64,
}
impl std::fmt::Display for ResourceUsage {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "{:<32}{:>12.3}{:>12.3}", self.resource_name.cyan(), self.total, self.per_block)
}
}
impl ResourceUsage {
fn average_by_resource_name(usages: &[&Self]) -> Vec<Self> {
let mut by_name: HashMap<String, Vec<&Self>> = Default::default();
for usage in usages {
by_name.entry(usage.resource_name.clone()).or_default().push(usage);
}
let mut average = vec![];
for (resource_name, values) in by_name {
let total = values.iter().map(|v| v.total).sum::<f64>() / values.len() as f64;
let per_block = values.iter().map(|v| v.per_block).sum::<f64>() / values.len() as f64;
average.push(Self { resource_name, total, per_block });
}
average
}
fn diff(&self, other: &Self) -> f64 {
(self.per_block - other.per_block).abs() / self.per_block
}
}
type ResourceUsageCheck<'a> = (&'a str, f64, f64);