Poll the substream validation before polling Notifications (#13934)

* Poll the substream validation before polling `Notifications`

In tests, it can happen that `Notifications` doesn't produce any events
which causes `poll()` to return `Poll::Pending` and the substream
validation futures won't get polled.

Poll the futures before calling `Notifications` so results for substream
validations are received even if `Notifications` is not producing any
events.

* Remove `pending_messages`

* Remove unused import
This commit is contained in:
Aaro Altonen
2023-04-18 10:47:36 +03:00
committed by GitHub
parent d23a251ee9
commit 818976d98e
+16 -28
View File
@@ -40,7 +40,7 @@ use sc_utils::mpsc::TracingUnboundedSender;
use sp_runtime::traits::Block as BlockT;
use std::{
collections::{HashMap, HashSet, VecDeque},
collections::{HashMap, HashSet},
future::Future,
iter,
pin::Pin,
@@ -77,8 +77,6 @@ type PendingSyncSubstreamValidation =
// Lock must always be taken in order declared here.
pub struct Protocol<B: BlockT> {
/// Pending list of messages to return from `poll` as a priority.
pending_messages: VecDeque<CustomMessageOutcome>,
/// Used to report reputation changes.
peerset_handle: sc_peerset::PeersetHandle,
/// Handles opening the unique substream and sending and receiving raw messages.
@@ -181,7 +179,6 @@ impl<B: BlockT> Protocol<B> {
};
let protocol = Self {
pending_messages: VecDeque::new(),
peerset_handle: peerset_handle.clone(),
behaviour,
notification_protocols: iter::once(block_announces_protocol.notifications_protocol)
@@ -409,8 +406,21 @@ impl<B: BlockT> NetworkBehaviour for Protocol<B> {
cx: &mut std::task::Context,
params: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<Self::OutEvent, Self::ConnectionHandler>> {
if let Some(message) = self.pending_messages.pop_front() {
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(message))
while let Poll::Ready(Some(validation_result)) =
self.sync_substream_validations.poll_next_unpin(cx)
{
match validation_result {
Ok((peer, roles)) => {
self.peers.insert(peer, roles);
},
Err(peer) => {
log::debug!(
target: "sub-libp2p",
"`SyncingEngine` rejected stream"
);
self.behaviour.disconnect_peer(&peer, HARDCODED_PEERSETS_SYNC);
},
}
}
let event = match self.behaviour.poll(cx, params) {
@@ -430,23 +440,6 @@ impl<B: BlockT> NetworkBehaviour for Protocol<B> {
return Poll::Ready(NetworkBehaviourAction::CloseConnection { peer_id, connection }),
};
while let Poll::Ready(Some(validation_result)) =
self.sync_substream_validations.poll_next_unpin(cx)
{
match validation_result {
Ok((peer, roles)) => {
self.peers.insert(peer, roles);
},
Err(peer) => {
log::debug!(
target: "sub-libp2p",
"`SyncingEngine` rejected stream"
);
self.behaviour.disconnect_peer(&peer, HARDCODED_PEERSETS_SYNC);
},
}
}
let outcome = match event {
NotificationsOut::CustomProtocolOpen {
peer_id,
@@ -509,7 +502,6 @@ impl<B: BlockT> NetworkBehaviour for Protocol<B> {
) {
Ok(handshake) => {
let roles = handshake.roles;
self.peers.insert(peer_id, roles);
let (tx, rx) = oneshot::channel();
let _ = self.tx.unbounded_send(
@@ -644,10 +636,6 @@ impl<B: BlockT> NetworkBehaviour for Protocol<B> {
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(outcome))
}
if let Some(message) = self.pending_messages.pop_front() {
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(message))
}
// This block can only be reached if an event was pulled from the behaviour and that
// resulted in `CustomMessageOutcome::None`. Since there might be another pending
// message from the behaviour, the task is scheduled again.