Unify ChainSync actions under one enum (#2180)

All `ChainSync` actions that `SyncingEngine` should perform are unified
under one `ChainSyncAction`. Processing of these actions put into a
single place after `select!` in `SyncingEngine::run` instead of multiple
places where calling `ChainSync` methods.
This commit is contained in:
Dmitry Markin
2023-11-13 07:33:37 +02:00
committed by GitHub
parent 0c5dcca9e3
commit 951bcceba0
3 changed files with 314 additions and 328 deletions
+77 -86
View File
@@ -25,10 +25,7 @@ use crate::{
},
block_relay_protocol::{BlockDownloader, BlockResponseError},
block_request_handler::MAX_BLOCKS_IN_RESPONSE,
chain_sync::{
BlockRequestAction, ChainSync, ImportBlocksAction, ImportJustificationsAction,
OnBlockResponse, OnStateResponse,
},
chain_sync::{ChainSync, ChainSyncAction},
pending_responses::{PendingResponses, ResponseEvent},
schema::v1::{StateRequest, StateResponse},
service::{
@@ -58,7 +55,7 @@ use schnellru::{ByLength, LruMap};
use tokio::time::{Interval, MissedTickBehavior};
use sc_client_api::{BlockBackend, HeaderBackend, ProofProvider};
use sc_consensus::import_queue::ImportQueueService;
use sc_consensus::{import_queue::ImportQueueService, IncomingBlock};
use sc_network::{
config::{
FullNetworkConfiguration, NonDefaultSetConfig, NonReservedPeerMode, NotificationHandshake,
@@ -74,8 +71,11 @@ use sc_network_common::{
};
use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
use sp_blockchain::{Error as ClientError, HeaderMetadata};
use sp_consensus::block_validation::BlockAnnounceValidator;
use sp_runtime::traits::{Block as BlockT, Header, NumberFor, Zero};
use sp_consensus::{block_validation::BlockAnnounceValidator, BlockOrigin};
use sp_runtime::{
traits::{Block as BlockT, Header, NumberFor, Zero},
Justifications,
};
use std::{
collections::{HashMap, HashSet},
@@ -713,11 +713,67 @@ where
self.is_major_syncing
.store(self.chain_sync.status().state.is_major_syncing(), Ordering::Relaxed);
// Process actions requested by `ChainSync` during `select!`.
self.process_chain_sync_actions();
// Send outbound requests on `ChanSync`'s behalf.
self.send_chain_sync_requests();
}
}
fn process_chain_sync_actions(&mut self) {
self.chain_sync.take_actions().for_each(|action| match action {
ChainSyncAction::SendBlockRequest { peer_id, request } => {
// Sending block request implies dropping obsolete pending response as we are not
// interested in it anymore (see [`ChainSyncAction::SendBlockRequest`]).
// Furthermore, only one request at a time is allowed to any peer.
let removed = self.pending_responses.remove(&peer_id);
self.send_block_request(peer_id, request.clone());
trace!(
target: LOG_TARGET,
"Processed `ChainSyncAction::SendBlockRequest` to {} with {:?}, stale response removed: {}.",
peer_id,
request,
removed,
)
},
ChainSyncAction::CancelBlockRequest { peer_id } => {
let removed = self.pending_responses.remove(&peer_id);
trace!(target: LOG_TARGET, "Processed {action:?}., response removed: {removed}.");
},
ChainSyncAction::DropPeer(BadPeer(peer_id, rep)) => {
self.pending_responses.remove(&peer_id);
self.network_service
.disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
self.network_service.report_peer(peer_id, rep);
trace!(target: LOG_TARGET, "Processed {action:?}.");
},
ChainSyncAction::ImportBlocks { origin, blocks } => {
let count = blocks.len();
self.import_blocks(origin, blocks);
trace!(
target: LOG_TARGET,
"Processed `ChainSyncAction::ImportBlocks` with {count} blocks.",
);
},
ChainSyncAction::ImportJustifications { peer_id, hash, number, justifications } => {
self.import_justifications(peer_id, hash, number, justifications);
trace!(
target: LOG_TARGET,
"Processed `ChainSyncAction::ImportJustifications` from peer {} for block {} ({}).",
peer_id,
hash,
number,
)
},
});
}
fn perform_periodic_actions(&mut self) {
self.report_metrics();
@@ -766,28 +822,7 @@ where
ToServiceCommand::ClearJustificationRequests =>
self.chain_sync.clear_justification_requests(),
ToServiceCommand::BlocksProcessed(imported, count, results) => {
for result in self.chain_sync.on_blocks_processed(imported, count, results) {
match result {
Ok(action) => match action {
BlockRequestAction::SendRequest { peer_id, request } => {
// drop obsolete pending response first
self.pending_responses.remove(&peer_id);
self.send_block_request(peer_id, request);
},
BlockRequestAction::RemoveStale { peer_id } => {
self.pending_responses.remove(&peer_id);
},
},
Err(BadPeer(peer_id, repu)) => {
self.pending_responses.remove(&peer_id);
self.network_service.disconnect_peer(
peer_id,
self.block_announce_protocol_name.clone(),
);
self.network_service.report_peer(peer_id, repu)
},
}
}
self.chain_sync.on_blocks_processed(imported, count, results);
},
ToServiceCommand::JustificationImported(peer_id, hash, number, success) => {
self.chain_sync.on_justification_import(hash, number, success);
@@ -940,9 +975,7 @@ where
}
}
if let Some(import_blocks_action) = self.chain_sync.peer_disconnected(&peer_id) {
self.import_blocks(import_blocks_action)
}
self.chain_sync.peer_disconnected(&peer_id);
self.pending_responses.remove(&peer_id);
self.event_streams.retain(|stream| {
@@ -1053,17 +1086,7 @@ where
inbound,
};
let req = if peer.info.roles.is_full() {
match self.chain_sync.new_peer(peer_id, peer.info.best_hash, peer.info.best_number) {
Ok(req) => req,
Err(BadPeer(id, repu)) => {
self.network_service.report_peer(id, repu);
return Err(())
},
}
} else {
None
};
self.chain_sync.new_peer(peer_id, peer.info.best_hash, peer.info.best_number);
log::debug!(target: LOG_TARGET, "Connected {peer_id}");
@@ -1075,10 +1098,6 @@ where
self.num_in_peers += 1;
}
if let Some(req) = req {
self.send_block_request(peer_id, req);
}
self.event_streams
.retain(|stream| stream.unbounded_send(SyncEvent::PeerConnected(peer_id)).is_ok());
@@ -1202,22 +1221,7 @@ where
PeerRequest::Block(req) => {
match self.block_downloader.block_response_into_blocks(&req, resp) {
Ok(blocks) => {
match self.chain_sync.on_block_response(peer_id, req, blocks) {
OnBlockResponse::SendBlockRequest { peer_id, request } =>
self.send_block_request(peer_id, request),
OnBlockResponse::ImportBlocks(import_blocks_action) =>
self.import_blocks(import_blocks_action),
OnBlockResponse::ImportJustifications(action) =>
self.import_justifications(action),
OnBlockResponse::Nothing => {},
OnBlockResponse::DisconnectPeer(BadPeer(peer_id, rep)) => {
self.network_service.disconnect_peer(
peer_id,
self.block_announce_protocol_name.clone(),
);
self.network_service.report_peer(peer_id, rep);
},
}
self.chain_sync.on_block_response(peer_id, req, blocks);
},
Err(BlockResponseError::DecodeFailed(e)) => {
debug!(
@@ -1262,27 +1266,10 @@ where
},
};
match self.chain_sync.on_state_response(peer_id, response) {
OnStateResponse::ImportBlocks(import_blocks_action) =>
self.import_blocks(import_blocks_action),
OnStateResponse::DisconnectPeer(BadPeer(peer_id, rep)) => {
self.network_service.disconnect_peer(
peer_id,
self.block_announce_protocol_name.clone(),
);
self.network_service.report_peer(peer_id, rep);
},
OnStateResponse::Nothing => {},
}
self.chain_sync.on_state_response(peer_id, response);
},
PeerRequest::WarpProof => {
if let Err(BadPeer(peer_id, rep)) =
self.chain_sync.on_warp_sync_response(&peer_id, EncodedProof(resp))
{
self.network_service
.disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
self.network_service.report_peer(peer_id, rep);
}
self.chain_sync.on_warp_sync_response(&peer_id, EncodedProof(resp));
},
},
Ok(Err(e)) => {
@@ -1388,7 +1375,7 @@ where
}
/// Import blocks.
fn import_blocks(&mut self, ImportBlocksAction { origin, blocks }: ImportBlocksAction<B>) {
fn import_blocks(&mut self, origin: BlockOrigin, blocks: Vec<IncomingBlock<B>>) {
if let Some(metrics) = &self.metrics {
metrics.import_queue_blocks_submitted.inc();
}
@@ -1397,13 +1384,17 @@ where
}
/// Import justifications.
fn import_justifications(&mut self, action: ImportJustificationsAction<B>) {
fn import_justifications(
&mut self,
peer_id: PeerId,
hash: B::Hash,
number: NumberFor<B>,
justifications: Justifications,
) {
if let Some(metrics) = &self.metrics {
metrics.import_queue_justifications_submitted.inc();
}
let ImportJustificationsAction { peer_id, hash, number, justifications } = action;
self.import_queue.import_justifications(peer_id, hash, number, justifications);
}
}