Add subsystem benchmarks for availability-distribution and biftield-distribution (availability write) (#2970)

Introduce a new test objective : `DataAvailabilityWrite`.

The new benchmark measures the network and cpu usage of
`availability-distribution`, `biftield-distribution` and
`availability-store` subsystems from the perspective of a validator node
during the process when candidates are made available.

Additionally I refactored the networking emulation to support bandwidth
acounting and limits of incoming and outgoing requests.

Screenshot of succesful run


<img width="1293" alt="Screenshot 2024-01-17 at 19 17 44"
src="https://github.com/paritytech/polkadot-sdk/assets/54316454/fde11280-e25b-4dc3-9dc9-d4b9752f9b7a">

---------

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>
This commit is contained in:
Andrei Sandu
2024-01-25 19:02:24 +02:00
committed by GitHub
parent 73fd8cd717
commit 47e46d178b
22 changed files with 1967 additions and 801 deletions
@@ -17,11 +17,13 @@
//! Test configuration definition and helpers.
use super::*;
use keyring::Keyring;
use std::{path::Path, time::Duration};
use std::path::Path;
pub use crate::cli::TestObjective;
use polkadot_primitives::{AuthorityDiscoveryId, ValidatorId};
use rand::{distributions::Uniform, prelude::Distribution, thread_rng};
use rand::thread_rng;
use rand_distr::{Distribution, Normal, Uniform};
use serde::{Deserialize, Serialize};
pub fn random_pov_size(min_pov_size: usize, max_pov_size: usize) -> usize {
@@ -34,13 +36,13 @@ fn random_uniform_sample<T: Into<usize> + From<usize>>(min_value: T, max_value:
.into()
}
/// Peer response latency configuration.
/// Peer networking latency configuration.
#[derive(Clone, Debug, Default, Serialize, Deserialize)]
pub struct PeerLatency {
/// Min latency for `NetworkAction` completion.
pub min_latency: Duration,
/// Max latency or `NetworkAction` completion.
pub max_latency: Duration,
/// The mean latency(milliseconds) of the peers.
pub mean_latency_ms: usize,
/// The standard deviation
pub std_dev: f64,
}
// Default PoV size in KiB.
@@ -58,6 +60,11 @@ fn default_connectivity() -> usize {
100
}
// Default backing group size
fn default_backing_group_size() -> usize {
5
}
/// The test input parameters
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct TestConfiguration {
@@ -67,6 +74,9 @@ pub struct TestConfiguration {
pub n_validators: usize,
/// Number of cores
pub n_cores: usize,
/// Maximum backing group size
#[serde(default = "default_backing_group_size")]
pub max_validators_per_core: usize,
/// The min PoV size
#[serde(default = "default_pov_size")]
pub min_pov_size: usize,
@@ -82,12 +92,9 @@ pub struct TestConfiguration {
/// The amount of bandiwdth our node has.
#[serde(default = "default_bandwidth")]
pub bandwidth: usize,
/// Optional peer emulation latency
/// Optional peer emulation latency (round trip time) wrt node under test
#[serde(default)]
pub latency: Option<PeerLatency>,
/// Error probability, applies to sending messages to the emulated network peers
#[serde(default)]
pub error: usize,
/// Connectivity ratio, the percentage of peers we are not connected to, but ar part of
/// the topology.
#[serde(default = "default_connectivity")]
@@ -129,7 +136,7 @@ impl TestSequence {
/// Helper struct for authority related state.
#[derive(Clone)]
pub struct TestAuthorities {
pub keyrings: Vec<Keyring>,
pub keyring: Keyring,
pub validator_public: Vec<ValidatorId>,
pub validator_authority_id: Vec<AuthorityDiscoveryId>,
}
@@ -146,25 +153,27 @@ impl TestConfiguration {
pub fn pov_sizes(&self) -> &[usize] {
&self.pov_sizes
}
/// Return the number of peers connected to our node.
pub fn connected_count(&self) -> usize {
((self.n_validators - 1) as f64 / (100.0 / self.connectivity as f64)) as usize
}
/// Generates the authority keys we need for the network emulation.
pub fn generate_authorities(&self) -> TestAuthorities {
let keyrings = (0..self.n_validators)
.map(|peer_index| Keyring::new(format!("Node{}", peer_index)))
let keyring = Keyring::default();
let keys = (0..self.n_validators)
.map(|peer_index| keyring.sr25519_new(format!("Node{}", peer_index)))
.collect::<Vec<_>>();
// Generate `AuthorityDiscoveryId`` for each peer
let validator_public: Vec<ValidatorId> = keyrings
.iter()
.map(|keyring: &Keyring| keyring.clone().public().into())
.collect::<Vec<_>>();
let validator_public: Vec<ValidatorId> =
keys.iter().map(|key| (*key).into()).collect::<Vec<_>>();
let validator_authority_id: Vec<AuthorityDiscoveryId> = keyrings
.iter()
.map(|keyring| keyring.clone().public().into())
.collect::<Vec<_>>();
let validator_authority_id: Vec<AuthorityDiscoveryId> =
keys.iter().map(|key| (*key).into()).collect::<Vec<_>>();
TestAuthorities { keyrings, validator_public, validator_authority_id }
TestAuthorities { keyring, validator_public, validator_authority_id }
}
/// An unconstrained standard configuration matching Polkadot/Kusama
@@ -180,12 +189,12 @@ impl TestConfiguration {
objective,
n_cores,
n_validators,
max_validators_per_core: 5,
pov_sizes: generate_pov_sizes(n_cores, min_pov_size, max_pov_size),
bandwidth: 50 * 1024 * 1024,
peer_bandwidth: 50 * 1024 * 1024,
// No latency
latency: None,
error: 0,
num_blocks,
min_pov_size,
max_pov_size,
@@ -205,14 +214,11 @@ impl TestConfiguration {
objective,
n_cores,
n_validators,
max_validators_per_core: 5,
pov_sizes: generate_pov_sizes(n_cores, min_pov_size, max_pov_size),
bandwidth: 50 * 1024 * 1024,
peer_bandwidth: 50 * 1024 * 1024,
latency: Some(PeerLatency {
min_latency: Duration::from_millis(1),
max_latency: Duration::from_millis(100),
}),
error: 3,
latency: Some(PeerLatency { mean_latency_ms: 50, std_dev: 12.5 }),
num_blocks,
min_pov_size,
max_pov_size,
@@ -232,14 +238,11 @@ impl TestConfiguration {
objective,
n_cores,
n_validators,
max_validators_per_core: 5,
pov_sizes: generate_pov_sizes(n_cores, min_pov_size, max_pov_size),
bandwidth: 50 * 1024 * 1024,
peer_bandwidth: 50 * 1024 * 1024,
latency: Some(PeerLatency {
min_latency: Duration::from_millis(10),
max_latency: Duration::from_millis(500),
}),
error: 33,
latency: Some(PeerLatency { mean_latency_ms: 150, std_dev: 40.0 }),
num_blocks,
min_pov_size,
max_pov_size,
@@ -248,15 +251,14 @@ impl TestConfiguration {
}
}
/// Produce a randomized duration between `min` and `max`.
pub fn random_latency(maybe_peer_latency: Option<&PeerLatency>) -> Option<Duration> {
maybe_peer_latency.map(|peer_latency| {
Uniform::from(peer_latency.min_latency..=peer_latency.max_latency).sample(&mut thread_rng())
})
}
/// Generate a random error based on `probability`.
/// `probability` should be a number between 0 and 100.
pub fn random_error(probability: usize) -> bool {
Uniform::from(0..=99).sample(&mut thread_rng()) < probability
/// Sample latency (in milliseconds) from a normal distribution with parameters
/// specified in `maybe_peer_latency`.
pub fn random_latency(maybe_peer_latency: Option<&PeerLatency>) -> usize {
maybe_peer_latency
.map(|latency_config| {
Normal::new(latency_config.mean_latency_ms as f64, latency_config.std_dev)
.expect("normal distribution parameters are good")
.sample(&mut thread_rng())
})
.unwrap_or(0.0) as usize
}
@@ -180,12 +180,13 @@ pub fn parse_metrics(registry: &Registry) -> MetricCollection {
pub fn display_configuration(test_config: &TestConfiguration) {
gum::info!(
"{}, {}, {}, {}, {}",
"[{}] {}, {}, {}, {}, {}",
format!("objective = {:?}", test_config.objective).green(),
format!("n_validators = {}", test_config.n_validators).blue(),
format!("n_cores = {}", test_config.n_cores).blue(),
format!("pov_size = {} - {}", test_config.min_pov_size, test_config.max_pov_size)
.bright_black(),
format!("error = {}", test_config.error).bright_black(),
format!("connectivity = {}", test_config.connectivity).bright_black(),
format!("latency = {:?}", test_config.latency).bright_black(),
);
}
@@ -15,12 +15,12 @@
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! Test environment implementation
use crate::{
core::{mock::AlwaysSupportsParachains, network::NetworkEmulator},
core::{mock::AlwaysSupportsParachains, network::NetworkEmulatorHandle},
TestConfiguration,
};
use colored::Colorize;
use core::time::Duration;
use futures::FutureExt;
use futures::{Future, FutureExt};
use polkadot_overseer::{BlockInfo, Handle as OverseerHandle};
use polkadot_node_subsystem::{messages::AllMessages, Overseer, SpawnGlue, TimeoutExt};
@@ -29,15 +29,12 @@ use polkadot_node_subsystem_util::metrics::prometheus::{
self, Gauge, Histogram, PrometheusError, Registry, U64,
};
use sc_network::peer_store::LOG_TARGET;
use sc_service::{SpawnTaskHandle, TaskManager};
use std::{
fmt::Display,
net::{Ipv4Addr, SocketAddr},
};
use std::net::{Ipv4Addr, SocketAddr};
use tokio::runtime::Handle;
const MIB: f64 = 1024.0 * 1024.0;
const LOG_TARGET: &str = "subsystem-bench::environment";
use super::configuration::TestAuthorities;
/// Test environment/configuration metrics
#[derive(Clone)]
@@ -56,9 +53,8 @@ pub struct TestEnvironmentMetrics {
impl TestEnvironmentMetrics {
pub fn new(registry: &Registry) -> Result<Self, PrometheusError> {
let mut buckets = prometheus::exponential_buckets(16384.0, 2.0, 9)
let buckets = prometheus::exponential_buckets(16384.0, 2.0, 9)
.expect("arguments are always valid; qed");
buckets.extend(vec![5.0 * MIB, 6.0 * MIB, 7.0 * MIB, 8.0 * MIB, 9.0 * MIB, 10.0 * MIB]);
Ok(Self {
n_validators: prometheus::register(
@@ -150,7 +146,7 @@ pub const GENESIS_HASH: Hash = Hash::repeat_byte(0xff);
// We use this to bail out sending messages to the subsystem if it is overloaded such that
// the time of flight is breaches 5s.
// This should eventually be a test parameter.
const MAX_TIME_OF_FLIGHT: Duration = Duration::from_millis(5000);
pub const MAX_TIME_OF_FLIGHT: Duration = Duration::from_millis(5000);
/// The test environment is the high level wrapper of all things required to test
/// a certain subsystem.
@@ -189,9 +185,11 @@ pub struct TestEnvironment {
/// The test configuration.
config: TestConfiguration,
/// A handle to the network emulator.
network: NetworkEmulator,
network: NetworkEmulatorHandle,
/// Configuration/env metrics
metrics: TestEnvironmentMetrics,
/// Test authorities generated from the configuration.
authorities: TestAuthorities,
}
impl TestEnvironment {
@@ -199,9 +197,10 @@ impl TestEnvironment {
pub fn new(
dependencies: TestEnvironmentDependencies,
config: TestConfiguration,
network: NetworkEmulator,
network: NetworkEmulatorHandle,
overseer: Overseer<SpawnGlue<SpawnTaskHandle>, AlwaysSupportsParachains>,
overseer_handle: OverseerHandle,
authorities: TestAuthorities,
) -> Self {
let metrics = TestEnvironmentMetrics::new(&dependencies.registry)
.expect("Metrics need to be registered");
@@ -230,30 +229,62 @@ impl TestEnvironment {
config,
network,
metrics,
authorities,
}
}
/// Returns the test configuration.
pub fn config(&self) -> &TestConfiguration {
&self.config
}
pub fn network(&self) -> &NetworkEmulator {
/// Returns a reference to the inner network emulator handle.
pub fn network(&self) -> &NetworkEmulatorHandle {
&self.network
}
/// Returns the Prometheus registry.
pub fn registry(&self) -> &Registry {
&self.dependencies.registry
}
/// Spawn a named task in the `test-environment` task group.
#[allow(unused)]
pub fn spawn(&self, name: &'static str, task: impl Future<Output = ()> + Send + 'static) {
self.dependencies
.task_manager
.spawn_handle()
.spawn(name, "test-environment", task);
}
/// Spawn a blocking named task in the `test-environment` task group.
pub fn spawn_blocking(
&self,
name: &'static str,
task: impl Future<Output = ()> + Send + 'static,
) {
self.dependencies.task_manager.spawn_handle().spawn_blocking(
name,
"test-environment",
task,
);
}
/// Returns a reference to the test environment metrics instance
pub fn metrics(&self) -> &TestEnvironmentMetrics {
&self.metrics
}
/// Returns a handle to the tokio runtime.
pub fn runtime(&self) -> Handle {
self.runtime_handle.clone()
}
// Send a message to the subsystem under test environment.
/// Returns a reference to the authority keys used in the test.
pub fn authorities(&self) -> &TestAuthorities {
&self.authorities
}
/// Send a message to the subsystem under test environment.
pub async fn send_message(&mut self, msg: AllMessages) {
self.overseer_handle
.send_msg(msg, LOG_TARGET)
@@ -264,7 +295,7 @@ impl TestEnvironment {
});
}
// Send an `ActiveLeavesUpdate` signal to all subsystems under test.
/// Send an `ActiveLeavesUpdate` signal to all subsystems under test.
pub async fn import_block(&mut self, block: BlockInfo) {
self.overseer_handle
.block_imported(block)
@@ -275,59 +306,79 @@ impl TestEnvironment {
});
}
// Stop overseer and subsystems.
/// Stop overseer and subsystems.
pub async fn stop(&mut self) {
self.overseer_handle.stop().await;
}
}
impl Display for TestEnvironment {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let stats = self.network().stats();
/// Blocks until `metric_name` == `value`
pub async fn wait_until_metric_eq(&self, metric_name: &str, value: usize) {
let value = value as f64;
loop {
let test_metrics = super::display::parse_metrics(self.registry());
let current_value = test_metrics.sum_by(metric_name);
writeln!(f, "\n")?;
writeln!(
f,
"Total received from network: {}",
format!(
"{} MiB",
stats
.iter()
.enumerate()
.map(|(_index, stats)| stats.tx_bytes_total as u128)
.sum::<u128>() / (1024 * 1024)
)
.cyan()
)?;
writeln!(
f,
"Total sent to network: {}",
format!("{} KiB", stats[0].tx_bytes_total / (1024)).cyan()
)?;
gum::debug!(target: LOG_TARGET, metric_name, current_value, value, "Waiting for metric");
if current_value == value {
break
}
// Check value every 50ms.
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
}
}
/// Display network usage stats.
pub fn display_network_usage(&self) {
let stats = self.network().peer_stats(0);
let total_node_received = stats.received() / 1024;
let total_node_sent = stats.sent() / 1024;
println!(
"\nPayload bytes received from peers: {}, {}",
format!("{:.2} KiB total", total_node_received).blue(),
format!("{:.2} KiB/block", total_node_received / self.config().num_blocks)
.bright_blue()
);
println!(
"Payload bytes sent to peers: {}, {}",
format!("{:.2} KiB total", total_node_sent).blue(),
format!("{:.2} KiB/block", total_node_sent / self.config().num_blocks).bright_blue()
);
}
/// Print CPU usage stats in the CLI.
pub fn display_cpu_usage(&self, subsystems_under_test: &[&str]) {
let test_metrics = super::display::parse_metrics(self.registry());
let subsystem_cpu_metrics =
test_metrics.subset_with_label_value("task_group", "availability-recovery");
let total_cpu = subsystem_cpu_metrics.sum_by("substrate_tasks_polling_duration_sum");
writeln!(f, "Total subsystem CPU usage {}", format!("{:.2}s", total_cpu).bright_purple())?;
writeln!(
f,
"CPU usage per block {}",
format!("{:.2}s", total_cpu / self.config().num_blocks as f64).bright_purple()
)?;
for subsystem in subsystems_under_test.iter() {
let subsystem_cpu_metrics =
test_metrics.subset_with_label_value("task_group", subsystem);
let total_cpu = subsystem_cpu_metrics.sum_by("substrate_tasks_polling_duration_sum");
println!(
"{} CPU usage {}",
subsystem.to_string().bright_green(),
format!("{:.3}s", total_cpu).bright_purple()
);
println!(
"{} CPU usage per block {}",
subsystem.to_string().bright_green(),
format!("{:.3}s", total_cpu / self.config().num_blocks as f64).bright_purple()
);
}
let test_env_cpu_metrics =
test_metrics.subset_with_label_value("task_group", "test-environment");
let total_cpu = test_env_cpu_metrics.sum_by("substrate_tasks_polling_duration_sum");
writeln!(
f,
println!(
"Total test environment CPU usage {}",
format!("{:.2}s", total_cpu).bright_purple()
)?;
writeln!(
f,
"CPU usage per block {}",
format!("{:.2}s", total_cpu / self.config().num_blocks as f64).bright_purple()
format!("{:.3}s", total_cpu).bright_purple()
);
println!(
"Test environment CPU usage per block {}",
format!("{:.3}s", total_cpu / self.config().num_blocks as f64).bright_purple()
)
}
}
@@ -14,26 +14,34 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use sp_core::{
sr25519::{Pair, Public},
Pair as PairT,
};
/// Set of test accounts.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
use polkadot_primitives::ValidatorId;
use sc_keystore::LocalKeystore;
use sp_application_crypto::AppCrypto;
pub use sp_core::sr25519;
use sp_core::sr25519::Public;
use sp_keystore::Keystore;
use std::sync::Arc;
/// Set of test accounts generated and kept safe by a keystore.
#[derive(Clone)]
pub struct Keyring {
name: String,
keystore: Arc<LocalKeystore>,
}
impl Default for Keyring {
fn default() -> Self {
Self { keystore: Arc::new(LocalKeystore::in_memory()) }
}
}
impl Keyring {
pub fn new(name: String) -> Keyring {
Self { name }
pub fn sr25519_new(&self, name: String) -> Public {
self.keystore
.sr25519_generate_new(ValidatorId::ID, Some(&format!("//{}", name)))
.expect("Insert key into keystore")
}
pub fn pair(self) -> Pair {
Pair::from_string(&format!("//{}", self.name), None).expect("input is always good; qed")
}
pub fn public(self) -> Public {
self.pair().public()
pub fn keystore(&self) -> Arc<dyn Keystore> {
self.keystore.clone()
}
}
@@ -17,13 +17,18 @@
//! A generic av store subsystem mockup suitable to be used in benchmarks.
use parity_scale_codec::Encode;
use polkadot_node_network_protocol::request_response::{
v1::{AvailableDataFetchingResponse, ChunkFetchingResponse, ChunkResponse},
Requests,
};
use polkadot_primitives::CandidateHash;
use sc_network::ProtocolName;
use std::collections::HashMap;
use futures::{channel::oneshot, FutureExt};
use polkadot_node_primitives::ErasureChunk;
use polkadot_node_primitives::{AvailableData, ErasureChunk};
use polkadot_node_subsystem::{
messages::AvailabilityStoreMessage, overseer, SpawnedSubsystem, SubsystemError,
@@ -31,6 +36,8 @@ use polkadot_node_subsystem::{
use polkadot_node_subsystem_types::OverseerSignal;
use crate::core::network::{HandleNetworkMessage, NetworkMessage};
pub struct AvailabilityStoreState {
candidate_hashes: HashMap<CandidateHash, usize>,
chunks: Vec<Vec<ErasureChunk>>,
@@ -38,6 +45,75 @@ pub struct AvailabilityStoreState {
const LOG_TARGET: &str = "subsystem-bench::av-store-mock";
/// Mockup helper. Contains Ccunks and full availability data of all parachain blocks
/// used in a test.
pub struct NetworkAvailabilityState {
pub candidate_hashes: HashMap<CandidateHash, usize>,
pub available_data: Vec<AvailableData>,
pub chunks: Vec<Vec<ErasureChunk>>,
}
// Implement access to the state.
impl HandleNetworkMessage for NetworkAvailabilityState {
fn handle(
&self,
message: NetworkMessage,
_node_sender: &mut futures::channel::mpsc::UnboundedSender<NetworkMessage>,
) -> Option<NetworkMessage> {
match message {
NetworkMessage::RequestFromNode(peer, request) => match request {
Requests::ChunkFetchingV1(outgoing_request) => {
gum::debug!(target: LOG_TARGET, request = ?outgoing_request, "Received `RequestFromNode`");
let validator_index: usize = outgoing_request.payload.index.0 as usize;
let candidate_hash = outgoing_request.payload.candidate_hash;
let candidate_index = self
.candidate_hashes
.get(&candidate_hash)
.expect("candidate was generated previously; qed");
gum::warn!(target: LOG_TARGET, ?candidate_hash, candidate_index, "Candidate mapped to index");
let chunk: ChunkResponse =
self.chunks.get(*candidate_index).unwrap()[validator_index].clone().into();
let response = Ok((
ChunkFetchingResponse::from(Some(chunk)).encode(),
ProtocolName::Static("dummy"),
));
if let Err(err) = outgoing_request.pending_response.send(response) {
gum::error!(target: LOG_TARGET, ?err, "Failed to send `ChunkFetchingResponse`");
}
None
},
Requests::AvailableDataFetchingV1(outgoing_request) => {
let candidate_hash = outgoing_request.payload.candidate_hash;
let candidate_index = self
.candidate_hashes
.get(&candidate_hash)
.expect("candidate was generated previously; qed");
gum::debug!(target: LOG_TARGET, ?candidate_hash, candidate_index, "Candidate mapped to index");
let available_data = self.available_data.get(*candidate_index).unwrap().clone();
let response = Ok((
AvailableDataFetchingResponse::from(Some(available_data)).encode(),
ProtocolName::Static("dummy"),
));
outgoing_request
.pending_response
.send(response)
.expect("Response is always sent succesfully");
None
},
_ => Some(NetworkMessage::RequestFromNode(peer, request)),
},
message => Some(message),
}
}
}
/// A mock of the availability store subsystem. This one also generates all the
/// candidates that a
pub struct MockAvailabilityStore {
@@ -127,6 +203,10 @@ impl MockAvailabilityStore {
self.state.chunks.get(*candidate_index).unwrap()[0].encoded_size();
let _ = tx.send(Some(chunk_size));
},
AvailabilityStoreMessage::StoreChunk { candidate_hash, chunk, tx } => {
gum::debug!(target: LOG_TARGET, chunk_index = ?chunk.index ,candidate_hash = ?candidate_hash, "Responding to StoreChunk");
let _ = tx.send(Ok(()));
},
_ => {
unimplemented!("Unexpected av-store message")
},
@@ -0,0 +1,92 @@
// Copyright (C) Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//!
//! A generic runtime api subsystem mockup suitable to be used in benchmarks.
use polkadot_primitives::Header;
use polkadot_node_subsystem::{
messages::ChainApiMessage, overseer, SpawnedSubsystem, SubsystemError,
};
use polkadot_node_subsystem_types::OverseerSignal;
use sp_core::H256;
use std::collections::HashMap;
use futures::FutureExt;
const LOG_TARGET: &str = "subsystem-bench::chain-api-mock";
/// State used to respond to `BlockHeader` requests.
pub struct ChainApiState {
pub block_headers: HashMap<H256, Header>,
}
pub struct MockChainApi {
state: ChainApiState,
}
impl MockChainApi {
pub fn new(state: ChainApiState) -> MockChainApi {
Self { state }
}
}
#[overseer::subsystem(ChainApi, error=SubsystemError, prefix=self::overseer)]
impl<Context> MockChainApi {
fn start(self, ctx: Context) -> SpawnedSubsystem {
let future = self.run(ctx).map(|_| Ok(())).boxed();
SpawnedSubsystem { name: "test-environment", future }
}
}
#[overseer::contextbounds(ChainApi, prefix = self::overseer)]
impl MockChainApi {
async fn run<Context>(self, mut ctx: Context) {
loop {
let msg = ctx.recv().await.expect("Overseer never fails us");
match msg {
orchestra::FromOrchestra::Signal(signal) =>
if signal == OverseerSignal::Conclude {
return
},
orchestra::FromOrchestra::Communication { msg } => {
gum::debug!(target: LOG_TARGET, msg=?msg, "recv message");
match msg {
ChainApiMessage::BlockHeader(hash, response_channel) => {
let _ = response_channel.send(Ok(Some(
self.state
.block_headers
.get(&hash)
.cloned()
.expect("Relay chain block hashes are known"),
)));
},
ChainApiMessage::Ancestors { hash: _hash, k: _k, response_channel } => {
// For our purposes, no ancestors is fine.
let _ = response_channel.send(Ok(Vec::new()));
},
_ => {
unimplemented!("Unexpected chain-api message")
},
}
},
}
}
}
}
@@ -73,6 +73,7 @@ macro_rules! mock {
};
}
// Generate dummy implementation for all subsystems
mock!(AvailabilityStore);
mock!(StatementDistribution);
mock!(BitfieldSigning);
@@ -18,11 +18,14 @@ use polkadot_node_subsystem::HeadSupportsParachains;
use polkadot_node_subsystem_types::Hash;
pub mod av_store;
pub mod chain_api;
pub mod dummy;
pub mod network_bridge;
pub mod runtime_api;
pub use av_store::*;
pub use chain_api::*;
pub use network_bridge::*;
pub use runtime_api::*;
pub struct AlwaysSupportsParachains {}
@@ -14,244 +14,61 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//!
//! A generic av store subsystem mockup suitable to be used in benchmarks.
//! Mocked `network-bridge` subsystems that uses a `NetworkInterface` to access
//! the emulated network.
use futures::{channel::mpsc::UnboundedSender, FutureExt, StreamExt};
use polkadot_node_subsystem_types::{
messages::{BitfieldDistributionMessage, NetworkBridgeEvent},
OverseerSignal,
};
use futures::Future;
use parity_scale_codec::Encode;
use polkadot_node_subsystem_types::OverseerSignal;
use std::{collections::HashMap, pin::Pin};
use futures::FutureExt;
use polkadot_node_primitives::{AvailableData, ErasureChunk};
use polkadot_primitives::CandidateHash;
use sc_network::{OutboundFailure, RequestFailure};
use sc_network::{request_responses::ProtocolConfig, PeerId, RequestFailure};
use polkadot_node_subsystem::{
messages::NetworkBridgeTxMessage, overseer, SpawnedSubsystem, SubsystemError,
};
use polkadot_node_network_protocol::request_response::{
self as req_res,
v1::{AvailableDataFetchingRequest, ChunkFetchingRequest, ChunkResponse},
IsRequest, Requests,
};
use polkadot_primitives::AuthorityDiscoveryId;
use polkadot_node_network_protocol::Versioned;
use crate::core::{
configuration::{random_error, random_latency, TestConfiguration},
network::{NetworkAction, NetworkEmulator, RateLimit},
use crate::core::network::{
NetworkEmulatorHandle, NetworkInterfaceReceiver, NetworkMessage, RequestExt,
};
/// The availability store state of all emulated peers.
/// The network bridge tx mock will respond to requests as if the request is being serviced
/// by a remote peer on the network
pub struct NetworkAvailabilityState {
pub candidate_hashes: HashMap<CandidateHash, usize>,
pub available_data: Vec<AvailableData>,
pub chunks: Vec<Vec<ErasureChunk>>,
}
const LOG_TARGET: &str = "subsystem-bench::network-bridge-tx-mock";
const LOG_TARGET: &str = "subsystem-bench::network-bridge";
const CHUNK_REQ_PROTOCOL_NAME_V1: &str =
"/ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff/req_chunk/1";
/// A mock of the network bridge tx subsystem.
pub struct MockNetworkBridgeTx {
/// The test configurationg
config: TestConfiguration,
/// The network availability state
availabilty: NetworkAvailabilityState,
/// A network emulator instance
network: NetworkEmulator,
/// A network emulator handle
network: NetworkEmulatorHandle,
/// A channel to the network interface,
to_network_interface: UnboundedSender<NetworkMessage>,
}
/// A mock of the network bridge tx subsystem.
pub struct MockNetworkBridgeRx {
/// A network interface receiver
network_receiver: NetworkInterfaceReceiver,
/// Chunk request sender
chunk_request_sender: Option<ProtocolConfig>,
}
impl MockNetworkBridgeTx {
pub fn new(
config: TestConfiguration,
availabilty: NetworkAvailabilityState,
network: NetworkEmulator,
network: NetworkEmulatorHandle,
to_network_interface: UnboundedSender<NetworkMessage>,
) -> MockNetworkBridgeTx {
Self { config, availabilty, network }
Self { network, to_network_interface }
}
}
fn not_connected_response(
&self,
authority_discovery_id: &AuthorityDiscoveryId,
future: Pin<Box<dyn Future<Output = ()> + Send>>,
) -> NetworkAction {
// The network action will send the error after a random delay expires.
return NetworkAction::new(
authority_discovery_id.clone(),
future,
0,
// Generate a random latency based on configuration.
random_latency(self.config.latency.as_ref()),
)
}
/// Returns an `NetworkAction` corresponding to the peer sending the response. If
/// the peer is connected, the error is sent with a randomized latency as defined in
/// configuration.
fn respond_to_send_request(
&mut self,
request: Requests,
ingress_tx: &mut tokio::sync::mpsc::UnboundedSender<NetworkAction>,
) -> NetworkAction {
let ingress_tx = ingress_tx.clone();
match request {
Requests::ChunkFetchingV1(outgoing_request) => {
let authority_discovery_id = match outgoing_request.peer {
req_res::Recipient::Authority(authority_discovery_id) => authority_discovery_id,
_ => unimplemented!("Peer recipient not supported yet"),
};
// Account our sent request bytes.
self.network.peer_stats(0).inc_sent(outgoing_request.payload.encoded_size());
// If peer is disconnected return an error
if !self.network.is_peer_connected(&authority_discovery_id) {
// We always send `NotConnected` error and we ignore `IfDisconnected` value in
// the caller.
let future = async move {
let _ = outgoing_request
.pending_response
.send(Err(RequestFailure::NotConnected));
}
.boxed();
return self.not_connected_response(&authority_discovery_id, future)
}
// Account for remote received request bytes.
self.network
.peer_stats_by_id(&authority_discovery_id)
.inc_received(outgoing_request.payload.encoded_size());
let validator_index: usize = outgoing_request.payload.index.0 as usize;
let candidate_hash = outgoing_request.payload.candidate_hash;
let candidate_index = self
.availabilty
.candidate_hashes
.get(&candidate_hash)
.expect("candidate was generated previously; qed");
gum::warn!(target: LOG_TARGET, ?candidate_hash, candidate_index, "Candidate mapped to index");
let chunk: ChunkResponse = self.availabilty.chunks.get(*candidate_index).unwrap()
[validator_index]
.clone()
.into();
let mut size = chunk.encoded_size();
let response = if random_error(self.config.error) {
// Error will not account to any bandwidth used.
size = 0;
Err(RequestFailure::Network(OutboundFailure::ConnectionClosed))
} else {
Ok((
req_res::v1::ChunkFetchingResponse::from(Some(chunk)).encode(),
self.network.req_protocol_names().get_name(ChunkFetchingRequest::PROTOCOL),
))
};
let authority_discovery_id_clone = authority_discovery_id.clone();
let future = async move {
let _ = outgoing_request.pending_response.send(response);
}
.boxed();
let future_wrapper = async move {
// Forward the response to the ingress channel of our node.
// On receive side we apply our node receiving rate limit.
let action =
NetworkAction::new(authority_discovery_id_clone, future, size, None);
ingress_tx.send(action).unwrap();
}
.boxed();
NetworkAction::new(
authority_discovery_id,
future_wrapper,
size,
// Generate a random latency based on configuration.
random_latency(self.config.latency.as_ref()),
)
},
Requests::AvailableDataFetchingV1(outgoing_request) => {
let candidate_hash = outgoing_request.payload.candidate_hash;
let candidate_index = self
.availabilty
.candidate_hashes
.get(&candidate_hash)
.expect("candidate was generated previously; qed");
gum::debug!(target: LOG_TARGET, ?candidate_hash, candidate_index, "Candidate mapped to index");
let authority_discovery_id = match outgoing_request.peer {
req_res::Recipient::Authority(authority_discovery_id) => authority_discovery_id,
_ => unimplemented!("Peer recipient not supported yet"),
};
// Account our sent request bytes.
self.network.peer_stats(0).inc_sent(outgoing_request.payload.encoded_size());
// If peer is disconnected return an error
if !self.network.is_peer_connected(&authority_discovery_id) {
let future = async move {
let _ = outgoing_request
.pending_response
.send(Err(RequestFailure::NotConnected));
}
.boxed();
return self.not_connected_response(&authority_discovery_id, future)
}
// Account for remote received request bytes.
self.network
.peer_stats_by_id(&authority_discovery_id)
.inc_received(outgoing_request.payload.encoded_size());
let available_data =
self.availabilty.available_data.get(*candidate_index).unwrap().clone();
let size = available_data.encoded_size();
let response = if random_error(self.config.error) {
Err(RequestFailure::Network(OutboundFailure::ConnectionClosed))
} else {
Ok((
req_res::v1::AvailableDataFetchingResponse::from(Some(available_data))
.encode(),
self.network
.req_protocol_names()
.get_name(AvailableDataFetchingRequest::PROTOCOL),
))
};
let future = async move {
let _ = outgoing_request.pending_response.send(response);
}
.boxed();
let authority_discovery_id_clone = authority_discovery_id.clone();
let future_wrapper = async move {
// Forward the response to the ingress channel of our node.
// On receive side we apply our node receiving rate limit.
let action =
NetworkAction::new(authority_discovery_id_clone, future, size, None);
ingress_tx.send(action).unwrap();
}
.boxed();
NetworkAction::new(
authority_discovery_id,
future_wrapper,
size,
// Generate a random latency based on configuration.
random_latency(self.config.latency.as_ref()),
)
},
_ => panic!("received an unexpected request"),
}
impl MockNetworkBridgeRx {
pub fn new(
network_receiver: NetworkInterfaceReceiver,
chunk_request_sender: Option<ProtocolConfig>,
) -> MockNetworkBridgeRx {
Self { network_receiver, chunk_request_sender }
}
}
@@ -260,43 +77,26 @@ impl<Context> MockNetworkBridgeTx {
fn start(self, ctx: Context) -> SpawnedSubsystem {
let future = self.run(ctx).map(|_| Ok(())).boxed();
SpawnedSubsystem { name: "test-environment", future }
SpawnedSubsystem { name: "network-bridge-tx", future }
}
}
#[overseer::subsystem(NetworkBridgeRx, error=SubsystemError, prefix=self::overseer)]
impl<Context> MockNetworkBridgeRx {
fn start(self, ctx: Context) -> SpawnedSubsystem {
let future = self.run(ctx).map(|_| Ok(())).boxed();
SpawnedSubsystem { name: "network-bridge-rx", future }
}
}
#[overseer::contextbounds(NetworkBridgeTx, prefix = self::overseer)]
impl MockNetworkBridgeTx {
async fn run<Context>(mut self, mut ctx: Context) {
let (mut ingress_tx, mut ingress_rx) =
tokio::sync::mpsc::unbounded_channel::<NetworkAction>();
// Initialize our node bandwidth limits.
let mut rx_limiter = RateLimit::new(10, self.config.bandwidth);
let our_network = self.network.clone();
// This task will handle node messages receipt from the simulated network.
ctx.spawn_blocking(
"network-receive",
async move {
while let Some(action) = ingress_rx.recv().await {
let size = action.size();
// account for our node receiving the data.
our_network.inc_received(size);
rx_limiter.reap(size).await;
action.run().await;
}
}
.boxed(),
)
.expect("We never fail to spawn tasks");
async fn run<Context>(self, mut ctx: Context) {
// Main subsystem loop.
loop {
let msg = ctx.recv().await.expect("Overseer never fails us");
match msg {
let subsystem_message = ctx.recv().await.expect("Overseer never fails us");
match subsystem_message {
orchestra::FromOrchestra::Signal(signal) =>
if signal == OverseerSignal::Conclude {
return
@@ -305,14 +105,27 @@ impl MockNetworkBridgeTx {
NetworkBridgeTxMessage::SendRequests(requests, _if_disconnected) => {
for request in requests {
gum::debug!(target: LOG_TARGET, request = ?request, "Processing request");
self.network.inc_sent(request_size(&request));
let action = self.respond_to_send_request(request, &mut ingress_tx);
let peer_id =
request.authority_id().expect("all nodes are authorities").clone();
// Will account for our node sending the request over the emulated
// network.
self.network.submit_peer_action(action.peer(), action);
if !self.network.is_peer_connected(&peer_id) {
// Attempting to send a request to a disconnected peer.
request
.into_response_sender()
.send(Err(RequestFailure::NotConnected))
.expect("send never fails");
continue
}
let peer_message =
NetworkMessage::RequestFromNode(peer_id.clone(), request);
let _ = self.to_network_interface.unbounded_send(peer_message);
}
},
NetworkBridgeTxMessage::ReportPeer(_) => {
// ingore rep changes
},
_ => {
unimplemented!("Unexpected network bridge message")
},
@@ -322,12 +135,56 @@ impl MockNetworkBridgeTx {
}
}
// A helper to determine the request payload size.
fn request_size(request: &Requests) -> usize {
match request {
Requests::ChunkFetchingV1(outgoing_request) => outgoing_request.payload.encoded_size(),
Requests::AvailableDataFetchingV1(outgoing_request) =>
outgoing_request.payload.encoded_size(),
_ => unimplemented!("received an unexpected request"),
#[overseer::contextbounds(NetworkBridgeRx, prefix = self::overseer)]
impl MockNetworkBridgeRx {
async fn run<Context>(mut self, mut ctx: Context) {
// Main subsystem loop.
let mut from_network_interface = self.network_receiver.0;
loop {
futures::select! {
maybe_peer_message = from_network_interface.next() => {
if let Some(message) = maybe_peer_message {
match message {
NetworkMessage::MessageFromPeer(message) => match message {
Versioned::V2(
polkadot_node_network_protocol::v2::ValidationProtocol::BitfieldDistribution(
bitfield,
),
) => {
ctx.send_message(
BitfieldDistributionMessage::NetworkBridgeUpdate(NetworkBridgeEvent::PeerMessage(PeerId::random(), polkadot_node_network_protocol::Versioned::V2(bitfield)))
).await;
},
_ => {
unimplemented!("We only talk v2 network protocol")
},
},
NetworkMessage::RequestFromPeer(request) => {
if let Some(protocol) = self.chunk_request_sender.as_mut() {
assert_eq!(&*protocol.name, CHUNK_REQ_PROTOCOL_NAME_V1);
if let Some(inbound_queue) = protocol.inbound_queue.as_ref() {
inbound_queue
.send(request)
.await
.expect("Forwarding requests to subsystem never fails");
}
}
},
_ => {
panic!("NetworkMessage::RequestFromNode is not expected to be received from a peer")
}
}
}
},
subsystem_message = ctx.recv().fuse() => {
match subsystem_message.expect("Overseer never fails us") {
orchestra::FromOrchestra::Signal(signal) => if signal == OverseerSignal::Conclude { return },
_ => {
unimplemented!("Unexpected network bridge rx message")
},
}
}
}
}
}
}
@@ -16,31 +16,45 @@
//!
//! A generic runtime api subsystem mockup suitable to be used in benchmarks.
use polkadot_primitives::{GroupIndex, IndexedVec, SessionInfo, ValidatorIndex};
use polkadot_primitives::{
CandidateReceipt, CoreState, GroupIndex, IndexedVec, OccupiedCore, SessionInfo, ValidatorIndex,
};
use bitvec::prelude::BitVec;
use polkadot_node_subsystem::{
messages::{RuntimeApiMessage, RuntimeApiRequest},
overseer, SpawnedSubsystem, SubsystemError,
};
use polkadot_node_subsystem_types::OverseerSignal;
use sp_core::H256;
use std::collections::HashMap;
use crate::core::configuration::{TestAuthorities, TestConfiguration};
use futures::FutureExt;
const LOG_TARGET: &str = "subsystem-bench::runtime-api-mock";
/// Minimal state to answer requests.
pub struct RuntimeApiState {
// All authorities in the test,
authorities: TestAuthorities,
// Candidate
candidate_hashes: HashMap<H256, Vec<CandidateReceipt>>,
}
/// A mocked `runtime-api` subsystem.
pub struct MockRuntimeApi {
state: RuntimeApiState,
config: TestConfiguration,
}
impl MockRuntimeApi {
pub fn new(config: TestConfiguration, authorities: TestAuthorities) -> MockRuntimeApi {
Self { state: RuntimeApiState { authorities }, config }
pub fn new(
config: TestConfiguration,
authorities: TestAuthorities,
candidate_hashes: HashMap<H256, Vec<CandidateReceipt>>,
) -> MockRuntimeApi {
Self { state: RuntimeApiState { authorities, candidate_hashes }, config }
}
fn session_info(&self) -> SessionInfo {
@@ -48,8 +62,10 @@ impl MockRuntimeApi {
.map(|i| ValidatorIndex(i as _))
.collect::<Vec<_>>();
let validator_groups = all_validators.chunks(5).map(Vec::from).collect::<Vec<_>>();
let validator_groups = all_validators
.chunks(self.config.max_validators_per_core)
.map(Vec::from)
.collect::<Vec<_>>();
SessionInfo {
validators: self.state.authorities.validator_public.clone().into(),
discovery_keys: self.state.authorities.validator_authority_id.clone(),
@@ -80,6 +96,8 @@ impl<Context> MockRuntimeApi {
#[overseer::contextbounds(RuntimeApi, prefix = self::overseer)]
impl MockRuntimeApi {
async fn run<Context>(self, mut ctx: Context) {
let validator_group_count = self.session_info().validator_groups.len();
loop {
let msg = ctx.recv().await.expect("Overseer never fails us");
@@ -93,14 +111,79 @@ impl MockRuntimeApi {
match msg {
RuntimeApiMessage::Request(
_request,
_block_hash,
RuntimeApiRequest::SessionInfo(_session_index, sender),
) => {
let _ = sender.send(Ok(Some(self.session_info())));
},
RuntimeApiMessage::Request(
_block_hash,
RuntimeApiRequest::SessionExecutorParams(_session_index, sender),
) => {
let _ = sender.send(Ok(Some(Default::default())));
},
RuntimeApiMessage::Request(
_block_hash,
RuntimeApiRequest::Validators(sender),
) => {
let _ =
sender.send(Ok(self.state.authorities.validator_public.clone()));
},
RuntimeApiMessage::Request(
_block_hash,
RuntimeApiRequest::CandidateEvents(sender),
) => {
let _ = sender.send(Ok(Default::default()));
},
RuntimeApiMessage::Request(
_block_hash,
RuntimeApiRequest::SessionIndexForChild(sender),
) => {
// Session is always the same.
let _ = sender.send(Ok(0));
},
RuntimeApiMessage::Request(
block_hash,
RuntimeApiRequest::AvailabilityCores(sender),
) => {
let candidate_hashes = self
.state
.candidate_hashes
.get(&block_hash)
.expect("Relay chain block hashes are generated at test start");
// All cores are always occupied.
let cores = candidate_hashes
.iter()
.enumerate()
.map(|(index, candidate_receipt)| {
// Ensure test breaks if badly configured.
assert!(index < validator_group_count);
CoreState::Occupied(OccupiedCore {
next_up_on_available: None,
occupied_since: 0,
time_out_at: 0,
next_up_on_time_out: None,
availability: BitVec::default(),
group_responsible: GroupIndex(index as u32),
candidate_hash: candidate_receipt.hash(),
candidate_descriptor: candidate_receipt.descriptor.clone(),
})
})
.collect::<Vec<_>>();
let _ = sender.send(Ok(cores));
},
RuntimeApiMessage::Request(
_block_hash,
RuntimeApiRequest::NodeFeatures(_session_index, sender),
) => {
let _ = sender.send(Ok(Default::default()));
},
// Long term TODO: implement more as needed.
_ => {
unimplemented!("Unexpected runtime-api message")
message => {
unimplemented!("Unexpected runtime-api message: {:?}", message)
},
}
},
File diff suppressed because it is too large Load Diff