Attempt to relieve pressure on mpsc_network_worker (#13725)

* Attempt to relieve pressure on `mpsc_network_worker`

`SyncingEngine` interacting with `NetworkWorker` can put a lot of strain
on the channel if the number of inbound connections is high. This is
because `SyncingEngine` is notified of each inbound substream which it
then can either accept or reject and this causes a lot of message
exchange on the already busy channel.

Use a direct channel pair between `Protocol` and `SyncingEngine`
to exchange notification events. It is a temporary change to alleviate
the problems caused by syncing being an independent protocol and the
fix will be removed once `NotificationService` is implemented.

* Apply review comments

* fixes

* trigger ci

* Fix tests

Verify that both peers have a connection now that the validation goes
through `SyncingEngine`. Depending on how the tasks are scheduled,
one of them might not have the peer registered in `SyncingEngine` at which
point the test won't make any progress because block announcement received
from an unknown peer is discarded.

Move polling of `ChainSync` at the end of the function so that if a block
announcement causes a block request to be sent, that can be sent in the
same call to `SyncingEngine::poll()`.

---------

Co-authored-by: parity-processbot <>
This commit is contained in:
Aaro Altonen
2023-03-30 14:59:58 +03:00
committed by GitHub
parent 7985495b8c
commit 4240490d1d
12 changed files with 259 additions and 153 deletions
+87 -112
View File
@@ -24,8 +24,8 @@ use crate::{
ChainSync, ClientError, SyncingService,
};
use codec::{Decode, DecodeAll, Encode};
use futures::{FutureExt, Stream, StreamExt};
use codec::{Decode, Encode};
use futures::{FutureExt, StreamExt};
use futures_timer::Delay;
use libp2p::PeerId;
use lru::LruCache;
@@ -39,9 +39,8 @@ use sc_network::{
config::{
NetworkConfiguration, NonDefaultSetConfig, ProtocolId, SyncMode as SyncOperationMode,
},
event::Event,
utils::LruHashSet,
ProtocolName,
NotificationsSink, ProtocolName,
};
use sc_network_common::{
role::Roles,
@@ -63,7 +62,6 @@ use sp_runtime::traits::{Block as BlockT, Header, NumberFor, Zero};
use std::{
collections::{HashMap, HashSet},
num::NonZeroUsize,
pin::Pin,
sync::{
atomic::{AtomicBool, AtomicUsize, Ordering},
Arc,
@@ -79,8 +77,6 @@ const MAX_KNOWN_BLOCKS: usize = 1024; // ~32kb per peer + LruHashSet overhead
mod rep {
use sc_peerset::ReputationChange as Rep;
/// We received a message that failed to decode.
pub const BAD_MESSAGE: Rep = Rep::new(-(1 << 12), "Bad message");
/// Peer has different genesis.
pub const GENESIS_MISMATCH: Rep = Rep::new_fatal("Genesis mismatch");
/// Peer send us a block announcement that failed at validation.
@@ -162,6 +158,8 @@ pub struct Peer<B: BlockT> {
pub info: ExtendedPeerInfo<B>,
/// Holds a set of blocks known to this peer.
pub known_blocks: LruHashSet<B::Hash>,
/// Notification sink.
sink: NotificationsSink,
}
pub struct SyncingEngine<B: BlockT, Client> {
@@ -184,6 +182,9 @@ pub struct SyncingEngine<B: BlockT, Client> {
/// Channel for receiving service commands
service_rx: TracingUnboundedReceiver<ToServiceCommand<B>>,
/// Channel for receiving inbound connections from `Protocol`.
rx: sc_utils::mpsc::TracingUnboundedReceiver<sc_network::SyncEvent<B>>,
/// Assigned roles.
roles: Roles,
@@ -254,6 +255,7 @@ where
block_request_protocol_name: ProtocolName,
state_request_protocol_name: ProtocolName,
warp_sync_protocol_name: Option<ProtocolName>,
rx: sc_utils::mpsc::TracingUnboundedReceiver<sc_network::SyncEvent<B>>,
) -> Result<(Self, SyncingService<B>, NonDefaultSetConfig), ClientError> {
let mode = match network_config.sync_mode {
SyncOperationMode::Full => SyncMode::Full,
@@ -347,6 +349,7 @@ where
num_connected: num_connected.clone(),
is_major_syncing: is_major_syncing.clone(),
service_rx,
rx,
genesis_hash,
important_peers,
default_peers_set_no_slot_connected_peers: HashSet::new(),
@@ -554,11 +557,7 @@ where
data: Some(data.clone()),
};
self.network_service.write_notification(
*who,
self.block_announce_protocol_name.clone(),
message.encode(),
);
peer.sink.send_sync_notification(message.encode());
}
}
}
@@ -575,17 +574,13 @@ where
)
}
pub async fn run(mut self, mut stream: Pin<Box<dyn Stream<Item = Event> + Send>>) {
pub async fn run(mut self) {
loop {
futures::future::poll_fn(|cx| self.poll(cx, &mut stream)).await;
futures::future::poll_fn(|cx| self.poll(cx)).await;
}
}
pub fn poll(
&mut self,
cx: &mut std::task::Context,
event_stream: &mut Pin<Box<dyn Stream<Item = Event> + Send>>,
) -> Poll<()> {
pub fn poll(&mut self, cx: &mut std::task::Context) -> Poll<()> {
self.num_connected.store(self.peers.len(), Ordering::Relaxed);
self.is_major_syncing
.store(self.chain_sync.status().state.is_major_syncing(), Ordering::Relaxed);
@@ -595,84 +590,6 @@ where
self.tick_timeout.reset(TICK_TIMEOUT);
}
while let Poll::Ready(Some(event)) = event_stream.poll_next_unpin(cx) {
match event {
Event::NotificationStreamOpened {
remote, protocol, received_handshake, ..
} => {
if protocol != self.block_announce_protocol_name {
continue
}
match <BlockAnnouncesHandshake<B> as DecodeAll>::decode_all(
&mut &received_handshake[..],
) {
Ok(handshake) => {
if self.on_sync_peer_connected(remote, handshake).is_err() {
log::debug!(
target: "sync",
"Failed to register peer {remote:?}: {received_handshake:?}",
);
}
},
Err(err) => {
log::debug!(
target: "sync",
"Couldn't decode handshake sent by {}: {:?}: {}",
remote,
received_handshake,
err,
);
self.network_service.report_peer(remote, rep::BAD_MESSAGE);
},
}
},
Event::NotificationStreamClosed { remote, protocol } => {
if protocol != self.block_announce_protocol_name {
continue
}
if self.on_sync_peer_disconnected(remote).is_err() {
log::trace!(
target: "sync",
"Disconnected peer which had earlier been refused by on_sync_peer_connected {}",
remote
);
}
},
Event::NotificationsReceived { remote, messages } => {
for (protocol, message) in messages {
if protocol != self.block_announce_protocol_name {
continue
}
if self.peers.contains_key(&remote) {
if let Ok(announce) = BlockAnnounce::decode(&mut message.as_ref()) {
self.push_block_announce_validation(remote, announce);
// Make sure that the newly added block announce validation future
// was polled once to be registered in the task.
if let Poll::Ready(res) =
self.chain_sync.poll_block_announce_validation(cx)
{
self.process_block_announce_validation_result(res)
}
} else {
log::warn!(target: "sub-libp2p", "Failed to decode block announce");
}
} else {
log::trace!(
target: "sync",
"Received sync for peer earlier refused by sync layer: {}",
remote
);
}
}
},
_ => {},
}
}
while let Poll::Ready(Some(event)) = self.service_rx.poll_next_unpin(cx) {
match event {
ToServiceCommand::SetSyncForkRequest(peers, hash, number) => {
@@ -746,6 +663,70 @@ where
}
}
while let Poll::Ready(Some(event)) = self.rx.poll_next_unpin(cx) {
match event {
sc_network::SyncEvent::NotificationStreamOpened {
remote,
received_handshake,
sink,
tx,
} => match self.on_sync_peer_connected(remote, &received_handshake, sink) {
Ok(()) => {
let _ = tx.send(true);
},
Err(()) => {
log::debug!(
target: "sync",
"Failed to register peer {remote:?}: {received_handshake:?}",
);
let _ = tx.send(false);
},
},
sc_network::SyncEvent::NotificationStreamClosed { remote } => {
if self.on_sync_peer_disconnected(remote).is_err() {
log::trace!(
target: "sync",
"Disconnected peer which had earlier been refused by on_sync_peer_connected {}",
remote
);
}
},
sc_network::SyncEvent::NotificationsReceived { remote, messages } => {
for message in messages {
if self.peers.contains_key(&remote) {
if let Ok(announce) = BlockAnnounce::decode(&mut message.as_ref()) {
self.push_block_announce_validation(remote, announce);
// Make sure that the newly added block announce validation future
// was polled once to be registered in the task.
if let Poll::Ready(res) =
self.chain_sync.poll_block_announce_validation(cx)
{
self.process_block_announce_validation_result(res)
}
} else {
log::warn!(target: "sub-libp2p", "Failed to decode block announce");
}
} else {
log::trace!(
target: "sync",
"Received sync for peer earlier refused by sync layer: {}",
remote
);
}
}
},
sc_network::SyncEvent::NotificationSinkReplaced { remote, sink } => {
if let Some(peer) = self.peers.get_mut(&remote) {
peer.sink = sink;
}
},
}
}
// poll `ChainSync` last because of a block announcement was received through the
// event stream between `SyncingEngine` and `Protocol` and the validation finished
// right after it as queued, the resulting block request (if any) can be sent right away.
while let Poll::Ready(result) = self.chain_sync.poll(cx) {
self.process_block_announce_validation_result(result);
}
@@ -757,13 +738,13 @@ where
///
/// Returns a result if the handshake of this peer was indeed accepted.
pub fn on_sync_peer_disconnected(&mut self, peer: PeerId) -> Result<(), ()> {
if self.important_peers.contains(&peer) {
log::warn!(target: "sync", "Reserved peer {} disconnected", peer);
} else {
log::debug!(target: "sync", "{} disconnected", peer);
}
if self.peers.remove(&peer).is_some() {
if self.important_peers.contains(&peer) {
log::warn!(target: "sync", "Reserved peer {} disconnected", peer);
} else {
log::debug!(target: "sync", "{} disconnected", peer);
}
self.chain_sync.peer_disconnected(&peer);
self.default_peers_set_no_slot_connected_peers.remove(&peer);
self.event_streams
@@ -782,7 +763,8 @@ where
pub fn on_sync_peer_connected(
&mut self,
who: PeerId,
status: BlockAnnouncesHandshake<B>,
status: &BlockAnnouncesHandshake<B>,
sink: NotificationsSink,
) -> Result<(), ()> {
log::trace!(target: "sync", "New peer {} {:?}", who, status);
@@ -794,8 +776,6 @@ where
if status.genesis_hash != self.genesis_hash {
self.network_service.report_peer(who, rep::GENESIS_MISMATCH);
self.network_service
.disconnect_peer(who, self.block_announce_protocol_name.clone());
if self.important_peers.contains(&who) {
log::error!(
@@ -834,8 +814,6 @@ where
this_peer_reserved_slot
{
log::debug!(target: "sync", "Too many full nodes, rejecting {}", who);
self.network_service
.disconnect_peer(who, self.block_announce_protocol_name.clone());
return Err(())
}
@@ -844,8 +822,6 @@ where
{
// Make sure that not all slots are occupied by light clients.
log::debug!(target: "sync", "Too many light nodes, rejecting {}", who);
self.network_service
.disconnect_peer(who, self.block_announce_protocol_name.clone());
return Err(())
}
@@ -858,14 +834,13 @@ where
known_blocks: LruHashSet::new(
NonZeroUsize::new(MAX_KNOWN_BLOCKS).expect("Constant is nonzero"),
),
sink,
};
let req = if peer.info.roles.is_full() {
match self.chain_sync.new_peer(who, peer.info.best_hash, peer.info.best_number) {
Ok(req) => req,
Err(BadPeer(id, repu)) => {
self.network_service
.disconnect_peer(id, self.block_announce_protocol_name.clone());
self.network_service.report_peer(id, repu);
return Err(())
},