Allow to broadcast network messages in parallel (#1409)

This PR addresses multiple issues pending:

* [x] Update orchestra to the recent version and test how the node
performs
* [x] Add some useful metrics for outbound network bridge
* [x] Try to send incoming network requests to all subsystems without
blocking on some particular subsystem in that loop
* [x] Fix all incompatibilities between orchestra and polkadot code
(e.g. malus node)
This commit is contained in:
Vsevolod Stakhov
2023-09-11 19:33:51 +01:00
committed by GitHub
parent 2c8021f998
commit 44dbb73945
15 changed files with 223 additions and 56 deletions
@@ -105,9 +105,27 @@ impl Metrics {
pub fn on_report_event(&self) {
if let Some(metrics) = self.0.as_ref() {
self.on_message("report_peer");
metrics.report_events.inc()
}
}
pub fn on_message(&self, message_type: &'static str) {
if let Some(metrics) = self.0.as_ref() {
metrics.messages_sent.with_label_values(&[message_type]).inc()
}
}
pub fn on_delayed_rx_queue(&self, queue_size: usize) {
if let Some(metrics) = self.0.as_ref() {
metrics.rx_delayed_processing.observe(queue_size as f64);
}
}
pub fn time_delayed_rx_events(
&self,
) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
self.0.as_ref().map(|metrics| metrics.rx_delayed_processing_time.start_timer())
}
}
#[derive(Clone)]
@@ -123,6 +141,13 @@ pub(crate) struct MetricsInner {
bytes_received: prometheus::CounterVec<prometheus::U64>,
bytes_sent: prometheus::CounterVec<prometheus::U64>,
messages_sent: prometheus::CounterVec<prometheus::U64>,
// The reason why a `Histogram` is used to track a queue size is that
// we need not only an average size of the queue (that will be 0 normally), but
// we also need a dynamics for this queue size in case of messages delays.
rx_delayed_processing: prometheus::Histogram,
rx_delayed_processing_time: prometheus::Histogram,
}
impl metrics::Metrics for Metrics {
@@ -217,6 +242,34 @@ impl metrics::Metrics for Metrics {
)?,
registry,
)?,
messages_sent: prometheus::register(
prometheus::CounterVec::new(
prometheus::Opts::new(
"polkadot_parachain_messages_sent_total",
"The number of messages sent via network bridge",
),
&["type"]
)?,
registry,
)?,
rx_delayed_processing: prometheus::register(
prometheus::Histogram::with_opts(
prometheus::HistogramOpts::new(
"polkadot_parachain_network_bridge_rx_delayed",
"Number of events being delayed while broadcasting from the network bridge",
).buckets(vec![0.0, 1.0, 2.0, 8.0, 16.0]),
)?,
registry,
)?,
rx_delayed_processing_time: prometheus::register(
prometheus::Histogram::with_opts(
prometheus::HistogramOpts::new(
"polkadot_parachain_network_bridge_rx_delayed_time",
"Time spent for waiting of the delayed events",
),
)?,
registry,
)?,
};
Ok(Metrics(Some(metrics)))
@@ -61,6 +61,7 @@ pub(crate) fn send_message<M>(
let message = {
let encoded = message.encode();
metrics.on_notification_sent(peer_set, version, encoded.len(), peers.len());
metrics.on_message(std::any::type_name::<M>());
encoded
};
+60 -9
View File
@@ -20,7 +20,10 @@ use super::*;
use always_assert::never;
use bytes::Bytes;
use futures::stream::BoxStream;
use futures::{
future::BoxFuture,
stream::{BoxStream, FuturesUnordered, StreamExt},
};
use parity_scale_codec::{Decode, DecodeAll};
use sc_network::Event as NetworkEvent;
@@ -244,6 +247,7 @@ where
NetworkBridgeEvent::PeerViewChange(peer, View::default()),
],
&mut sender,
&metrics,
)
.await;
@@ -352,6 +356,7 @@ where
dispatch_validation_event_to_all(
NetworkBridgeEvent::PeerDisconnected(peer),
&mut sender,
&metrics,
)
.await,
PeerSet::Collation =>
@@ -490,7 +495,7 @@ where
network_service.report_peer(remote, report.into());
}
dispatch_validation_events_to_all(events, &mut sender).await;
dispatch_validation_events_to_all(events, &mut sender, &metrics).await;
}
if !c_messages.is_empty() {
@@ -992,8 +997,9 @@ fn send_collation_message_vstaging(
async fn dispatch_validation_event_to_all(
event: NetworkBridgeEvent<net_protocol::VersionedValidationProtocol>,
ctx: &mut impl overseer::NetworkBridgeRxSenderTrait,
metrics: &Metrics,
) {
dispatch_validation_events_to_all(std::iter::once(event), ctx).await
dispatch_validation_events_to_all(std::iter::once(event), ctx, metrics).await
}
async fn dispatch_collation_event_to_all(
@@ -1038,20 +1044,65 @@ fn dispatch_collation_event_to_all_unbounded(
}
}
fn send_or_queue_validation_event<E, Sender>(
event: E,
sender: &mut Sender,
delayed_queue: &FuturesUnordered<BoxFuture<'static, ()>>,
) where
E: Send + 'static,
Sender: overseer::NetworkBridgeRxSenderTrait + overseer::SubsystemSender<E>,
{
match sender.try_send_message(event) {
Ok(()) => {},
Err(overseer::TrySendError::Full(event)) => {
let mut sender = sender.clone();
delayed_queue.push(Box::pin(async move {
sender.send_message(event).await;
}));
},
Err(overseer::TrySendError::Closed(_)) => {
panic!(
"NetworkBridgeRxSender is closed when trying to send event of type: {}",
std::any::type_name::<E>()
);
},
}
}
async fn dispatch_validation_events_to_all<I>(
events: I,
sender: &mut impl overseer::NetworkBridgeRxSenderTrait,
metrics: &Metrics,
) where
I: IntoIterator<Item = NetworkBridgeEvent<net_protocol::VersionedValidationProtocol>>,
I::IntoIter: Send,
{
let delayed_messages: FuturesUnordered<BoxFuture<'static, ()>> = FuturesUnordered::new();
// Fast path for sending events to subsystems, if any subsystem's queue is full, we hold
// the slow path future in the `delayed_messages` queue.
for event in events {
sender
.send_messages(event.focus().map(StatementDistributionMessage::from))
.await;
sender.send_messages(event.focus().map(BitfieldDistributionMessage::from)).await;
sender.send_messages(event.focus().map(ApprovalDistributionMessage::from)).await;
sender.send_messages(event.focus().map(GossipSupportMessage::from)).await;
if let Ok(msg) = event.focus().map(StatementDistributionMessage::from) {
send_or_queue_validation_event(msg, sender, &delayed_messages);
}
if let Ok(msg) = event.focus().map(BitfieldDistributionMessage::from) {
send_or_queue_validation_event(msg, sender, &delayed_messages);
}
if let Ok(msg) = event.focus().map(ApprovalDistributionMessage::from) {
send_or_queue_validation_event(msg, sender, &delayed_messages);
}
if let Ok(msg) = event.focus().map(GossipSupportMessage::from) {
send_or_queue_validation_event(msg, sender, &delayed_messages);
}
}
let delayed_messages_count = delayed_messages.len();
metrics.on_delayed_rx_queue(delayed_messages_count);
if delayed_messages_count > 0 {
// Here we wait for all the delayed messages to be sent.
let _timer = metrics.time_delayed_rx_events(); // Dropped after `await` is completed
let _: Vec<()> = delayed_messages.collect().await;
}
}
@@ -33,6 +33,7 @@ use polkadot_node_subsystem::{
///
/// To be passed to [`FullNetworkConfiguration::add_notification_protocol`]().
pub use polkadot_node_network_protocol::peer_set::{peer_sets_info, IsAuthority};
use polkadot_node_network_protocol::request_response::Requests;
use sc_network::ReputationChange;
use crate::validator_discovery;
@@ -290,6 +291,20 @@ where
);
for req in reqs {
match req {
Requests::ChunkFetchingV1(_) => metrics.on_message("chunk_fetching_v1"),
Requests::AvailableDataFetchingV1(_) =>
metrics.on_message("available_data_fetching_v1"),
Requests::CollationFetchingV1(_) => metrics.on_message("collation_fetching_v1"),
Requests::CollationFetchingVStaging(_) =>
metrics.on_message("collation_fetching_vstaging"),
Requests::PoVFetchingV1(_) => metrics.on_message("pov_fetching_v1"),
Requests::DisputeSendingV1(_) => metrics.on_message("dispute_sending_v1"),
Requests::StatementFetchingV1(_) => metrics.on_message("statement_fetching_v1"),
Requests::AttestedCandidateVStaging(_) =>
metrics.on_message("attested_candidate_vstaging"),
}
network_service
.start_request(
&mut authority_discovery_service,