Convert SyncingEngine::run to use tokio::select! instead of polling (#2132)

This commit is contained in:
Dmitry Markin
2023-11-03 17:10:24 +02:00
committed by GitHub
parent 8dc41ba49d
commit d512b3f00b
4 changed files with 241 additions and 220 deletions
@@ -16,10 +16,11 @@
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//! `BlockAnnounceValidator` is responsible for async validation of block announcements.
//! [`BlockAnnounceValidator`] is responsible for async validation of block announcements.
//! [`Stream`] implemented by [`BlockAnnounceValidator`] never terminates.
use crate::futures_stream::FuturesStream;
use futures::{Future, FutureExt, Stream, StreamExt};
use futures::{stream::FusedStream, Future, FutureExt, Stream, StreamExt};
use libp2p::PeerId;
use log::{debug, error, trace, warn};
use sc_network_common::sync::message::BlockAnnounce;
@@ -300,6 +301,13 @@ impl<B: BlockT> Stream for BlockAnnounceValidator<B> {
}
}
// As [`BlockAnnounceValidator`] never terminates, we can easily implement [`FusedStream`] for it.
impl<B: BlockT> FusedStream for BlockAnnounceValidator<B> {
fn is_terminated(&self) -> bool {
false
}
}
#[cfg(test)]
mod tests {
use super::*;
+206 -209
View File
@@ -47,7 +47,6 @@ use futures::{
future::{BoxFuture, Fuse},
FutureExt, StreamExt,
};
use futures_timer::Delay;
use libp2p::{request_response::OutboundFailure, PeerId};
use log::{debug, trace};
use prometheus_endpoint::{
@@ -56,6 +55,7 @@ use prometheus_endpoint::{
};
use prost::Message;
use schnellru::{ByLength, LruMap};
use tokio::time::{Interval, MissedTickBehavior};
use sc_client_api::{BlockBackend, HeaderBackend, ProofProvider};
use sc_consensus::import_queue::ImportQueueService;
@@ -85,7 +85,6 @@ use std::{
atomic::{AtomicBool, AtomicUsize, Ordering},
Arc,
},
task::Poll,
time::{Duration, Instant},
};
@@ -254,7 +253,7 @@ pub struct SyncingEngine<B: BlockT, Client> {
service_rx: TracingUnboundedReceiver<ToServiceCommand<B>>,
/// Channel for receiving inbound connections from `Protocol`.
rx: sc_utils::mpsc::TracingUnboundedReceiver<sc_network::SyncEvent<B>>,
sync_events_rx: sc_utils::mpsc::TracingUnboundedReceiver<sc_network::SyncEvent<B>>,
/// Assigned roles.
roles: Roles,
@@ -266,7 +265,7 @@ pub struct SyncingEngine<B: BlockT, Client> {
event_streams: Vec<TracingUnboundedSender<SyncEvent>>,
/// Interval at which we call `tick`.
tick_timeout: Delay,
tick_timeout: Interval,
/// All connected peers. Contains both full and light node peers.
peers: HashMap<PeerId, Peer<B>>,
@@ -304,7 +303,7 @@ pub struct SyncingEngine<B: BlockT, Client> {
boot_node_ids: HashSet<PeerId>,
/// A channel to get target block header if we skip over proofs downloading during warp sync.
warp_sync_target_block_header_rx:
warp_sync_target_block_header_rx_fused:
Fuse<BoxFuture<'static, Result<B::Header, oneshot::Canceled>>>,
/// Protocol name used for block announcements
@@ -363,7 +362,7 @@ where
block_downloader: Arc<dyn BlockDownloader<B>>,
state_request_protocol_name: ProtocolName,
warp_sync_protocol_name: Option<ProtocolName>,
rx: sc_utils::mpsc::TracingUnboundedReceiver<sc_network::SyncEvent<B>>,
sync_events_rx: sc_utils::mpsc::TracingUnboundedReceiver<sc_network::SyncEvent<B>>,
) -> Result<(Self, SyncingService<B>, NonDefaultSetConfig), ClientError> {
let mode = net_config.network_config.sync_mode;
let max_parallel_downloads = net_config.network_config.max_parallel_downloads;
@@ -436,7 +435,7 @@ where
// Make sure polling of the target block channel is a no-op if there is no block to
// retrieve.
let warp_sync_target_block_header_rx = warp_sync_target_block_header_rx
let warp_sync_target_block_header_rx_fused = warp_sync_target_block_header_rx
.map_or(futures::future::pending().boxed().fuse(), |rx| rx.boxed().fuse());
let block_announce_config = Self::get_block_announce_proto_config(
@@ -478,6 +477,12 @@ where
let max_out_peers = net_config.network_config.default_peers_set.out_peers;
let max_in_peers = (max_full_peers - max_out_peers) as usize;
let tick_timeout = {
let mut interval = tokio::time::interval(TICK_TIMEOUT);
interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
interval
};
Ok((
Self {
roles,
@@ -493,11 +498,11 @@ where
num_connected: num_connected.clone(),
is_major_syncing: is_major_syncing.clone(),
service_rx,
rx,
sync_events_rx,
genesis_hash,
important_peers,
default_peers_set_no_slot_connected_peers: HashSet::new(),
warp_sync_target_block_header_rx,
warp_sync_target_block_header_rx_fused,
boot_node_ids,
default_peers_set_no_slot_peers,
default_peers_set_num_full,
@@ -505,7 +510,7 @@ where
num_in_peers: 0usize,
max_in_peers,
event_streams: Vec::new(),
tick_timeout: Delay::new(TICK_TIMEOUT),
tick_timeout,
syncing_started: None,
last_notification_io: Instant::now(),
metrics: if let Some(r) = metrics_registry {
@@ -691,230 +696,222 @@ where
self.syncing_started = Some(Instant::now());
loop {
futures::future::poll_fn(|cx| self.poll(cx)).await;
tokio::select! {
_ = self.tick_timeout.tick() => self.perform_periodic_actions(),
command = self.service_rx.select_next_some() =>
self.process_service_command(command),
sync_event = self.sync_events_rx.select_next_some() =>
self.process_sync_event(sync_event),
warp_target_block_header = &mut self.warp_sync_target_block_header_rx_fused =>
self.pass_warp_sync_target_block_header(warp_target_block_header),
response_event = self.pending_responses.select_next_some() =>
self.process_response_event(response_event),
validation_result = self.block_announce_validator.select_next_some() =>
self.process_block_announce_validation_result(validation_result),
}
// Update atomic variables
self.num_connected.store(self.peers.len(), Ordering::Relaxed);
self.is_major_syncing
.store(self.chain_sync.status().state.is_major_syncing(), Ordering::Relaxed);
// Send outbound requests on `ChanSync`'s behalf.
self.send_chain_sync_requests();
}
}
pub fn poll(&mut self, cx: &mut std::task::Context) -> Poll<()> {
self.num_connected.store(self.peers.len(), Ordering::Relaxed);
self.is_major_syncing
.store(self.chain_sync.status().state.is_major_syncing(), Ordering::Relaxed);
fn perform_periodic_actions(&mut self) {
self.report_metrics();
while let Poll::Ready(()) = self.tick_timeout.poll_unpin(cx) {
self.report_metrics();
self.tick_timeout.reset(TICK_TIMEOUT);
// if `SyncingEngine` has just started, don't evict seemingly inactive peers right away
// as they may not have produced blocks not because they've disconnected but because
// they're still waiting to receive enough relaychain blocks to start producing blocks.
if let Some(started) = self.syncing_started {
if started.elapsed() < INITIAL_EVICTION_WAIT_PERIOD {
continue
}
self.syncing_started = None;
self.last_notification_io = Instant::now();
// if `SyncingEngine` has just started, don't evict seemingly inactive peers right away
// as they may not have produced blocks not because they've disconnected but because
// they're still waiting to receive enough relaychain blocks to start producing blocks.
if let Some(started) = self.syncing_started {
if started.elapsed() < INITIAL_EVICTION_WAIT_PERIOD {
return
}
// if syncing hasn't sent or received any blocks within `INACTIVITY_EVICT_THRESHOLD`,
// it means the local node has stalled and is connected to peers who either don't
// consider it connected or are also all stalled. In order to unstall the node,
// disconnect all peers and allow `ProtocolController` to establish new connections.
if self.last_notification_io.elapsed() > INACTIVITY_EVICT_THRESHOLD {
log::debug!(
target: LOG_TARGET,
"syncing has halted due to inactivity, evicting all peers",
);
for peer in self.peers.keys() {
self.network_service.report_peer(*peer, rep::INACTIVE_SUBSTREAM);
self.network_service
.disconnect_peer(*peer, self.block_announce_protocol_name.clone());
}
// after all the peers have been evicted, start timer again to prevent evicting
// new peers that join after the old peer have been evicted
self.last_notification_io = Instant::now();
}
self.syncing_started = None;
self.last_notification_io = Instant::now();
}
while let Poll::Ready(Some(event)) = self.service_rx.poll_next_unpin(cx) {
match event {
ToServiceCommand::SetSyncForkRequest(peers, hash, number) => {
self.chain_sync.set_sync_fork_request(peers, &hash, number);
},
ToServiceCommand::EventStream(tx) => self.event_streams.push(tx),
ToServiceCommand::RequestJustification(hash, number) =>
self.chain_sync.request_justification(&hash, number),
ToServiceCommand::ClearJustificationRequests =>
self.chain_sync.clear_justification_requests(),
ToServiceCommand::BlocksProcessed(imported, count, results) => {
for result in self.chain_sync.on_blocks_processed(imported, count, results) {
match result {
Ok(action) => match action {
BlockRequestAction::SendRequest { peer_id, request } => {
// drop obsolete pending response first
self.pending_responses.remove(&peer_id);
self.send_block_request(peer_id, request);
},
BlockRequestAction::RemoveStale { peer_id } => {
self.pending_responses.remove(&peer_id);
},
},
Err(BadPeer(peer_id, repu)) => {
// if syncing hasn't sent or received any blocks within `INACTIVITY_EVICT_THRESHOLD`,
// it means the local node has stalled and is connected to peers who either don't
// consider it connected or are also all stalled. In order to unstall the node,
// disconnect all peers and allow `ProtocolController` to establish new connections.
if self.last_notification_io.elapsed() > INACTIVITY_EVICT_THRESHOLD {
log::debug!(
target: LOG_TARGET,
"syncing has halted due to inactivity, evicting all peers",
);
for peer in self.peers.keys() {
self.network_service.report_peer(*peer, rep::INACTIVE_SUBSTREAM);
self.network_service
.disconnect_peer(*peer, self.block_announce_protocol_name.clone());
}
// after all the peers have been evicted, start timer again to prevent evicting
// new peers that join after the old peer have been evicted
self.last_notification_io = Instant::now();
}
}
fn process_service_command(&mut self, command: ToServiceCommand<B>) {
match command {
ToServiceCommand::SetSyncForkRequest(peers, hash, number) => {
self.chain_sync.set_sync_fork_request(peers, &hash, number);
},
ToServiceCommand::EventStream(tx) => self.event_streams.push(tx),
ToServiceCommand::RequestJustification(hash, number) =>
self.chain_sync.request_justification(&hash, number),
ToServiceCommand::ClearJustificationRequests =>
self.chain_sync.clear_justification_requests(),
ToServiceCommand::BlocksProcessed(imported, count, results) => {
for result in self.chain_sync.on_blocks_processed(imported, count, results) {
match result {
Ok(action) => match action {
BlockRequestAction::SendRequest { peer_id, request } => {
// drop obsolete pending response first
self.pending_responses.remove(&peer_id);
self.network_service.disconnect_peer(
peer_id,
self.block_announce_protocol_name.clone(),
);
self.network_service.report_peer(peer_id, repu)
self.send_block_request(peer_id, request);
},
}
BlockRequestAction::RemoveStale { peer_id } => {
self.pending_responses.remove(&peer_id);
},
},
Err(BadPeer(peer_id, repu)) => {
self.pending_responses.remove(&peer_id);
self.network_service.disconnect_peer(
peer_id,
self.block_announce_protocol_name.clone(),
);
self.network_service.report_peer(peer_id, repu)
},
}
},
ToServiceCommand::JustificationImported(peer_id, hash, number, success) => {
self.chain_sync.on_justification_import(hash, number, success);
if !success {
log::info!(
target: LOG_TARGET,
"💔 Invalid justification provided by {peer_id} for #{hash}",
);
self.network_service
.disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
self.network_service.report_peer(
peer_id,
ReputationChange::new_fatal("Invalid justification"),
);
}
},
ToServiceCommand::AnnounceBlock(hash, data) => self.announce_block(hash, data),
ToServiceCommand::NewBestBlockImported(hash, number) =>
self.new_best_block_imported(hash, number),
ToServiceCommand::Status(tx) => {
let mut status = self.chain_sync.status();
status.num_connected_peers = self.peers.len() as u32;
let _ = tx.send(status);
},
ToServiceCommand::NumActivePeers(tx) => {
let _ = tx.send(self.num_active_peers());
},
ToServiceCommand::SyncState(tx) => {
let _ = tx.send(self.chain_sync.status());
},
ToServiceCommand::BestSeenBlock(tx) => {
let _ = tx.send(self.chain_sync.status().best_seen_block);
},
ToServiceCommand::NumSyncPeers(tx) => {
let _ = tx.send(self.chain_sync.status().num_peers);
},
ToServiceCommand::NumQueuedBlocks(tx) => {
let _ = tx.send(self.chain_sync.status().queued_blocks);
},
ToServiceCommand::NumDownloadedBlocks(tx) => {
let _ = tx.send(self.chain_sync.num_downloaded_blocks());
},
ToServiceCommand::NumSyncRequests(tx) => {
let _ = tx.send(self.chain_sync.num_sync_requests());
},
ToServiceCommand::PeersInfo(tx) => {
let peers_info = self
.peers
.iter()
.map(|(peer_id, peer)| (*peer_id, peer.info.clone()))
.collect();
let _ = tx.send(peers_info);
},
ToServiceCommand::OnBlockFinalized(hash, header) =>
self.chain_sync.on_block_finalized(&hash, *header.number()),
}
}
},
ToServiceCommand::JustificationImported(peer_id, hash, number, success) => {
self.chain_sync.on_justification_import(hash, number, success);
if !success {
log::info!(
target: LOG_TARGET,
"💔 Invalid justification provided by {peer_id} for #{hash}",
);
self.network_service
.disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
self.network_service
.report_peer(peer_id, ReputationChange::new_fatal("Invalid justification"));
}
},
ToServiceCommand::AnnounceBlock(hash, data) => self.announce_block(hash, data),
ToServiceCommand::NewBestBlockImported(hash, number) =>
self.new_best_block_imported(hash, number),
ToServiceCommand::Status(tx) => {
let mut status = self.chain_sync.status();
status.num_connected_peers = self.peers.len() as u32;
let _ = tx.send(status);
},
ToServiceCommand::NumActivePeers(tx) => {
let _ = tx.send(self.num_active_peers());
},
ToServiceCommand::SyncState(tx) => {
let _ = tx.send(self.chain_sync.status());
},
ToServiceCommand::BestSeenBlock(tx) => {
let _ = tx.send(self.chain_sync.status().best_seen_block);
},
ToServiceCommand::NumSyncPeers(tx) => {
let _ = tx.send(self.chain_sync.status().num_peers);
},
ToServiceCommand::NumQueuedBlocks(tx) => {
let _ = tx.send(self.chain_sync.status().queued_blocks);
},
ToServiceCommand::NumDownloadedBlocks(tx) => {
let _ = tx.send(self.chain_sync.num_downloaded_blocks());
},
ToServiceCommand::NumSyncRequests(tx) => {
let _ = tx.send(self.chain_sync.num_sync_requests());
},
ToServiceCommand::PeersInfo(tx) => {
let peers_info = self
.peers
.iter()
.map(|(peer_id, peer)| (*peer_id, peer.info.clone()))
.collect();
let _ = tx.send(peers_info);
},
ToServiceCommand::OnBlockFinalized(hash, header) =>
self.chain_sync.on_block_finalized(&hash, *header.number()),
}
}
while let Poll::Ready(Some(event)) = self.rx.poll_next_unpin(cx) {
match event {
sc_network::SyncEvent::NotificationStreamOpened {
remote,
received_handshake,
sink,
inbound,
tx,
} => match self.on_sync_peer_connected(remote, &received_handshake, sink, inbound) {
Ok(()) => {
let _ = tx.send(true);
},
Err(()) => {
log::debug!(
target: LOG_TARGET,
"Failed to register peer {remote:?}: {received_handshake:?}",
);
let _ = tx.send(false);
},
fn process_sync_event(&mut self, event: sc_network::SyncEvent<B>) {
match event {
sc_network::SyncEvent::NotificationStreamOpened {
remote,
received_handshake,
sink,
inbound,
tx,
} => match self.on_sync_peer_connected(remote, &received_handshake, sink, inbound) {
Ok(()) => {
let _ = tx.send(true);
},
sc_network::SyncEvent::NotificationStreamClosed { remote } => {
if self.on_sync_peer_disconnected(remote).is_err() {
Err(()) => {
log::debug!(
target: LOG_TARGET,
"Failed to register peer {remote:?}: {received_handshake:?}",
);
let _ = tx.send(false);
},
},
sc_network::SyncEvent::NotificationStreamClosed { remote } => {
if self.on_sync_peer_disconnected(remote).is_err() {
log::trace!(
target: LOG_TARGET,
"Disconnected peer which had earlier been refused by on_sync_peer_connected {}",
remote
);
}
},
sc_network::SyncEvent::NotificationsReceived { remote, messages } => {
for message in messages {
if self.peers.contains_key(&remote) {
if let Ok(announce) = BlockAnnounce::decode(&mut message.as_ref()) {
self.last_notification_io = Instant::now();
self.push_block_announce_validation(remote, announce);
} else {
log::warn!(target: "sub-libp2p", "Failed to decode block announce");
}
} else {
log::trace!(
target: LOG_TARGET,
"Disconnected peer which had earlier been refused by on_sync_peer_connected {}",
remote
"Received sync for peer earlier refused by sync layer: {remote}",
);
}
},
sc_network::SyncEvent::NotificationsReceived { remote, messages } => {
for message in messages {
if self.peers.contains_key(&remote) {
if let Ok(announce) = BlockAnnounce::decode(&mut message.as_ref()) {
self.last_notification_io = Instant::now();
self.push_block_announce_validation(remote, announce);
} else {
log::warn!(target: "sub-libp2p", "Failed to decode block announce");
}
} else {
log::trace!(
target: LOG_TARGET,
"Received sync for peer earlier refused by sync layer: {remote}",
);
}
}
},
sc_network::SyncEvent::NotificationSinkReplaced { remote, sink } => {
if let Some(peer) = self.peers.get_mut(&remote) {
peer.sink = sink;
}
},
}
}
// Retreive warp sync target block header just before polling `ChainSync`
// to make progress as soon as we receive it.
match self.warp_sync_target_block_header_rx.poll_unpin(cx) {
Poll::Ready(Ok(target)) => {
self.chain_sync.set_warp_sync_target_block(target);
}
},
Poll::Ready(Err(err)) => {
sc_network::SyncEvent::NotificationSinkReplaced { remote, sink } => {
if let Some(peer) = self.peers.get_mut(&remote) {
peer.sink = sink;
}
},
}
}
fn pass_warp_sync_target_block_header(&mut self, header: Result<B::Header, oneshot::Canceled>) {
match header {
Ok(header) => {
self.chain_sync.set_warp_sync_target_block(header);
},
Err(err) => {
log::error!(
target: LOG_TARGET,
"Failed to get target block for warp sync. Error: {err:?}",
);
},
Poll::Pending => {},
}
// Send outbound requests on `ChanSync`'s behalf.
self.send_chain_sync_requests();
// Poll & process pending responses.
while let Poll::Ready(Some(event)) = self.pending_responses.poll_next_unpin(cx) {
self.process_response_event(event);
}
// Poll block announce validations last, because if a block announcement was received
// through the event stream between `SyncingEngine` and `Protocol` and the validation
// finished right after it is queued, the resulting block request (if any) can be sent
// right away.
while let Poll::Ready(Some(result)) = self.block_announce_validator.poll_next_unpin(cx) {
self.process_block_announce_validation_result(result);
}
Poll::Pending
}
/// Called by peer when it is disconnecting.
@@ -17,20 +17,20 @@
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//! [`PendingResponses`] is responsible for keeping track of pending responses and
//! polling them.
//! polling them. [`Stream`] implemented by [`PendingResponses`] never terminates.
use crate::types::PeerRequest;
use futures::{
channel::oneshot,
future::BoxFuture,
stream::{BoxStream, Stream},
stream::{BoxStream, FusedStream, Stream},
FutureExt, StreamExt,
};
use libp2p::PeerId;
use log::error;
use sc_network::request_responses::RequestFailure;
use sp_runtime::traits::Block as BlockT;
use std::task::{Context, Poll};
use std::task::{Context, Poll, Waker};
use tokio_stream::StreamMap;
/// Log target for this file.
@@ -53,11 +53,13 @@ pub(crate) struct ResponseEvent<B: BlockT> {
pub(crate) struct PendingResponses<B: BlockT> {
/// Pending responses
pending_responses: StreamMap<PeerId, BoxStream<'static, (PeerRequest<B>, ResponseResult)>>,
/// Waker to implement never terminating stream
waker: Option<Waker>,
}
impl<B: BlockT> PendingResponses<B> {
pub fn new() -> Self {
Self { pending_responses: StreamMap::new() }
Self { pending_responses: StreamMap::new(), waker: None }
}
pub fn insert(
@@ -82,6 +84,10 @@ impl<B: BlockT> PendingResponses<B> {
);
debug_assert!(false);
}
if let Some(waker) = self.waker.take() {
waker.wake();
}
}
pub fn remove(&mut self, peer_id: &PeerId) -> bool {
@@ -93,8 +99,6 @@ impl<B: BlockT> PendingResponses<B> {
}
}
impl<B: BlockT> Unpin for PendingResponses<B> {}
impl<B: BlockT> Stream for PendingResponses<B> {
type Item = ResponseEvent<B>;
@@ -102,8 +106,8 @@ impl<B: BlockT> Stream for PendingResponses<B> {
mut self: std::pin::Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
match futures::ready!(self.pending_responses.poll_next_unpin(cx)) {
Some((peer_id, (request, response))) => {
match self.pending_responses.poll_next_unpin(cx) {
Poll::Ready(Some((peer_id, (request, response)))) => {
// We need to manually remove the stream, because `StreamMap` doesn't know yet that
// it's going to yield `None`, so may not remove it before the next request is made
// to the same peer.
@@ -111,7 +115,18 @@ impl<B: BlockT> Stream for PendingResponses<B> {
Poll::Ready(Some(ResponseEvent { peer_id, request, response }))
},
None => Poll::Ready(None),
Poll::Ready(None) | Poll::Pending => {
self.waker = Some(cx.waker().clone());
Poll::Pending
},
}
}
}
// As [`PendingResponses`] never terminates, we can easily implement [`FusedStream`] for it.
impl<B: BlockT> FusedStream for PendingResponses<B> {
fn is_terminated(&self) -> bool {
false
}
}