Revert #1409 partially (#1603)

Futures channels that are used by default has a side effect of
`Sender::Clone` that efficiently increases the capacity of the bounded
channel by one. This PR fixes the undesired backpressure removal that
was caused by the #1409. This issue has been discovered by @sandreim
during Versi testing and needs to be treated as critical that should not
be included in any release without this reversion.

This PR reverts the original behaviour.
This commit is contained in:
Vsevolod Stakhov
2023-09-18 21:32:19 +01:00
committed by GitHub
parent ffe5db0f76
commit 122086d3d5
+8 -55
View File
@@ -20,10 +20,7 @@ use super::*;
use always_assert::never;
use bytes::Bytes;
use futures::{
future::BoxFuture,
stream::{BoxStream, FuturesUnordered, StreamExt},
};
use futures::stream::{BoxStream, StreamExt};
use parity_scale_codec::{Decode, DecodeAll};
use sc_network::Event as NetworkEvent;
@@ -1044,65 +1041,21 @@ fn dispatch_collation_event_to_all_unbounded(
}
}
fn send_or_queue_validation_event<E, Sender>(
event: E,
sender: &mut Sender,
delayed_queue: &FuturesUnordered<BoxFuture<'static, ()>>,
) where
E: Send + 'static,
Sender: overseer::NetworkBridgeRxSenderTrait + overseer::SubsystemSender<E>,
{
match sender.try_send_message(event) {
Ok(()) => {},
Err(overseer::TrySendError::Full(event)) => {
let mut sender = sender.clone();
delayed_queue.push(Box::pin(async move {
sender.send_message(event).await;
}));
},
Err(overseer::TrySendError::Closed(_)) => {
panic!(
"NetworkBridgeRxSender is closed when trying to send event of type: {}",
std::any::type_name::<E>()
);
},
}
}
async fn dispatch_validation_events_to_all<I>(
events: I,
sender: &mut impl overseer::NetworkBridgeRxSenderTrait,
metrics: &Metrics,
_metrics: &Metrics,
) where
I: IntoIterator<Item = NetworkBridgeEvent<net_protocol::VersionedValidationProtocol>>,
I::IntoIter: Send,
{
let delayed_messages: FuturesUnordered<BoxFuture<'static, ()>> = FuturesUnordered::new();
// Fast path for sending events to subsystems, if any subsystem's queue is full, we hold
// the slow path future in the `delayed_messages` queue.
for event in events {
if let Ok(msg) = event.focus().map(StatementDistributionMessage::from) {
send_or_queue_validation_event(msg, sender, &delayed_messages);
}
if let Ok(msg) = event.focus().map(BitfieldDistributionMessage::from) {
send_or_queue_validation_event(msg, sender, &delayed_messages);
}
if let Ok(msg) = event.focus().map(ApprovalDistributionMessage::from) {
send_or_queue_validation_event(msg, sender, &delayed_messages);
}
if let Ok(msg) = event.focus().map(GossipSupportMessage::from) {
send_or_queue_validation_event(msg, sender, &delayed_messages);
}
}
let delayed_messages_count = delayed_messages.len();
metrics.on_delayed_rx_queue(delayed_messages_count);
if delayed_messages_count > 0 {
// Here we wait for all the delayed messages to be sent.
let _timer = metrics.time_delayed_rx_events(); // Dropped after `await` is completed
let _: Vec<()> = delayed_messages.collect().await;
sender
.send_messages(event.focus().map(StatementDistributionMessage::from))
.await;
sender.send_messages(event.focus().map(BitfieldDistributionMessage::from)).await;
sender.send_messages(event.focus().map(ApprovalDistributionMessage::from)).await;
sender.send_messages(event.focus().map(GossipSupportMessage::from)).await;
}
}