mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-04-27 20:57:59 +00:00
Prepare syncing for parallel sync strategies (#3224)
This PR should supersede https://github.com/paritytech/polkadot-sdk/pull/2814 and accomplish the same with less changes. It's needed to run sync strategies in parallel, like running `ChainSync` and `GapSync` as independent strategies, and running `ChainSync` and Sync 2.0 alongside each other. The difference with https://github.com/paritytech/polkadot-sdk/pull/2814 is that we allow simultaneous requests to remote peers initiated by different strategies, as this is not tracked on the remote node in any way. Therefore, `PeerPool` is not needed. CC @skunert --------- Co-authored-by: Sebastian Kunert <skunert49@gmail.com>
This commit is contained in:
@@ -33,7 +33,7 @@ use crate::{
|
||||
},
|
||||
strategy::{
|
||||
warp::{EncodedProof, WarpProofRequest, WarpSyncParams},
|
||||
SyncingAction, SyncingConfig, SyncingStrategy,
|
||||
StrategyKey, SyncingAction, SyncingConfig, SyncingStrategy,
|
||||
},
|
||||
types::{
|
||||
BadPeer, ExtendedPeerInfo, OpaqueStateRequest, OpaqueStateResponse, PeerRequest, SyncEvent,
|
||||
@@ -48,7 +48,7 @@ use futures::{
|
||||
FutureExt, StreamExt,
|
||||
};
|
||||
use libp2p::{request_response::OutboundFailure, PeerId};
|
||||
use log::{debug, error, trace};
|
||||
use log::{debug, error, trace, warn};
|
||||
use prometheus_endpoint::{
|
||||
register, Counter, Gauge, MetricSource, Opts, PrometheusError, Registry, SourcedGauge, U64,
|
||||
};
|
||||
@@ -214,9 +214,6 @@ pub struct SyncingEngine<B: BlockT, Client> {
|
||||
/// Syncing strategy.
|
||||
strategy: SyncingStrategy<B, Client>,
|
||||
|
||||
/// Syncing configuration for startegies.
|
||||
syncing_config: SyncingConfig,
|
||||
|
||||
/// Blockchain client.
|
||||
client: Arc<Client>,
|
||||
|
||||
@@ -441,8 +438,7 @@ where
|
||||
.map_or(futures::future::pending().boxed().fuse(), |rx| rx.boxed().fuse());
|
||||
|
||||
// Initialize syncing strategy.
|
||||
let strategy =
|
||||
SyncingStrategy::new(syncing_config.clone(), client.clone(), warp_sync_config)?;
|
||||
let strategy = SyncingStrategy::new(syncing_config, client.clone(), warp_sync_config)?;
|
||||
|
||||
let block_announce_protocol_name = block_announce_config.protocol_name().clone();
|
||||
let (tx, service_rx) = tracing_unbounded("mpsc_chain_sync", 100_000);
|
||||
@@ -471,7 +467,6 @@ where
|
||||
roles,
|
||||
client,
|
||||
strategy,
|
||||
syncing_config,
|
||||
network_service,
|
||||
peers: HashMap::new(),
|
||||
block_announce_data_cache: LruMap::new(ByLength::new(cache_capacity)),
|
||||
@@ -661,8 +656,15 @@ where
|
||||
Some(event) => self.process_notification_event(event),
|
||||
None => return,
|
||||
},
|
||||
warp_target_block_header = &mut self.warp_sync_target_block_header_rx_fused =>
|
||||
self.pass_warp_sync_target_block_header(warp_target_block_header),
|
||||
warp_target_block_header = &mut self.warp_sync_target_block_header_rx_fused => {
|
||||
if let Err(_) = self.pass_warp_sync_target_block_header(warp_target_block_header) {
|
||||
error!(
|
||||
target: LOG_TARGET,
|
||||
"Failed to set warp sync target block header, terminating `SyncingEngine`.",
|
||||
);
|
||||
return
|
||||
}
|
||||
},
|
||||
response_event = self.pending_responses.select_next_some() =>
|
||||
self.process_response_event(response_event),
|
||||
validation_result = self.block_announce_validator.select_next_some() =>
|
||||
@@ -675,48 +677,61 @@ where
|
||||
|
||||
// Process actions requested by a syncing strategy.
|
||||
if let Err(e) = self.process_strategy_actions() {
|
||||
error!("Terminating `SyncingEngine` due to fatal error: {e:?}");
|
||||
error!(
|
||||
target: LOG_TARGET,
|
||||
"Terminating `SyncingEngine` due to fatal error: {e:?}.",
|
||||
);
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn process_strategy_actions(&mut self) -> Result<(), ClientError> {
|
||||
for action in self.strategy.actions() {
|
||||
for action in self.strategy.actions()? {
|
||||
match action {
|
||||
SyncingAction::SendBlockRequest { peer_id, request } => {
|
||||
SyncingAction::SendBlockRequest { peer_id, key, request } => {
|
||||
// Sending block request implies dropping obsolete pending response as we are
|
||||
// not interested in it anymore (see [`SyncingAction::SendBlockRequest`]).
|
||||
// Furthermore, only one request at a time is allowed to any peer.
|
||||
let removed = self.pending_responses.remove(&peer_id);
|
||||
self.send_block_request(peer_id, request.clone());
|
||||
let removed = self.pending_responses.remove(peer_id, key);
|
||||
self.send_block_request(peer_id, key, request.clone());
|
||||
|
||||
trace!(
|
||||
target: LOG_TARGET,
|
||||
"Processed `ChainSyncAction::SendBlockRequest` to {} with {:?}, stale response removed: {}.",
|
||||
peer_id,
|
||||
request,
|
||||
removed,
|
||||
)
|
||||
if removed {
|
||||
warn!(
|
||||
target: LOG_TARGET,
|
||||
"Processed `ChainSyncAction::SendBlockRequest` to {} from {:?} with {:?}. \
|
||||
Stale response removed!",
|
||||
peer_id,
|
||||
key,
|
||||
request,
|
||||
)
|
||||
} else {
|
||||
trace!(
|
||||
target: LOG_TARGET,
|
||||
"Processed `ChainSyncAction::SendBlockRequest` to {} from {:?} with {:?}.",
|
||||
peer_id,
|
||||
key,
|
||||
request,
|
||||
)
|
||||
}
|
||||
},
|
||||
SyncingAction::CancelBlockRequest { peer_id } => {
|
||||
let removed = self.pending_responses.remove(&peer_id);
|
||||
SyncingAction::CancelRequest { peer_id, key } => {
|
||||
let removed = self.pending_responses.remove(peer_id, key);
|
||||
|
||||
trace!(
|
||||
target: LOG_TARGET,
|
||||
"Processed {action:?}, response removed: {removed}.",
|
||||
);
|
||||
},
|
||||
SyncingAction::SendStateRequest { peer_id, request } => {
|
||||
self.send_state_request(peer_id, request);
|
||||
SyncingAction::SendStateRequest { peer_id, key, request } => {
|
||||
self.send_state_request(peer_id, key, request);
|
||||
|
||||
trace!(
|
||||
target: LOG_TARGET,
|
||||
"Processed `ChainSyncAction::SendBlockRequest` to {peer_id}.",
|
||||
"Processed `ChainSyncAction::SendStateRequest` to {peer_id}.",
|
||||
);
|
||||
},
|
||||
SyncingAction::SendWarpProofRequest { peer_id, request } => {
|
||||
self.send_warp_proof_request(peer_id, request.clone());
|
||||
SyncingAction::SendWarpProofRequest { peer_id, key, request } => {
|
||||
self.send_warp_proof_request(peer_id, key, request.clone());
|
||||
|
||||
trace!(
|
||||
target: LOG_TARGET,
|
||||
@@ -726,7 +741,7 @@ where
|
||||
);
|
||||
},
|
||||
SyncingAction::DropPeer(BadPeer(peer_id, rep)) => {
|
||||
self.pending_responses.remove(&peer_id);
|
||||
self.pending_responses.remove_all(&peer_id);
|
||||
self.network_service
|
||||
.disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
|
||||
self.network_service.report_peer(peer_id, rep);
|
||||
@@ -753,20 +768,8 @@ where
|
||||
number,
|
||||
)
|
||||
},
|
||||
SyncingAction::Finished => {
|
||||
let connected_peers = self.peers.iter().filter_map(|(peer_id, peer)| {
|
||||
peer.info.roles.is_full().then_some((
|
||||
*peer_id,
|
||||
peer.info.best_hash,
|
||||
peer.info.best_number,
|
||||
))
|
||||
});
|
||||
self.strategy.switch_to_next(
|
||||
self.syncing_config.clone(),
|
||||
self.client.clone(),
|
||||
connected_peers,
|
||||
)?;
|
||||
},
|
||||
// Nothing to do, this is handled internally by `SyncingStrategy`.
|
||||
SyncingAction::Finished => {},
|
||||
}
|
||||
}
|
||||
|
||||
@@ -948,23 +951,18 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
fn pass_warp_sync_target_block_header(&mut self, header: Result<B::Header, oneshot::Canceled>) {
|
||||
fn pass_warp_sync_target_block_header(
|
||||
&mut self,
|
||||
header: Result<B::Header, oneshot::Canceled>,
|
||||
) -> Result<(), ()> {
|
||||
match header {
|
||||
Ok(header) =>
|
||||
if let SyncingStrategy::WarpSyncStrategy(warp_sync) = &mut self.strategy {
|
||||
warp_sync.set_target_block(header);
|
||||
} else {
|
||||
error!(
|
||||
target: LOG_TARGET,
|
||||
"Cannot set warp sync target block: no warp sync strategy is active."
|
||||
);
|
||||
debug_assert!(false);
|
||||
},
|
||||
Ok(header) => self.strategy.set_warp_sync_target_block_header(header),
|
||||
Err(err) => {
|
||||
error!(
|
||||
target: LOG_TARGET,
|
||||
"Failed to get target block for warp sync. Error: {err:?}",
|
||||
);
|
||||
Err(())
|
||||
},
|
||||
}
|
||||
}
|
||||
@@ -1002,7 +1000,7 @@ where
|
||||
}
|
||||
|
||||
self.strategy.remove_peer(&peer_id);
|
||||
self.pending_responses.remove(&peer_id);
|
||||
self.pending_responses.remove_all(&peer_id);
|
||||
self.event_streams
|
||||
.retain(|stream| stream.unbounded_send(SyncEvent::PeerDisconnected(peer_id)).is_ok());
|
||||
}
|
||||
@@ -1167,7 +1165,7 @@ where
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn send_block_request(&mut self, peer_id: PeerId, request: BlockRequest<B>) {
|
||||
fn send_block_request(&mut self, peer_id: PeerId, key: StrategyKey, request: BlockRequest<B>) {
|
||||
if !self.peers.contains_key(&peer_id) {
|
||||
trace!(target: LOG_TARGET, "Cannot send block request to unknown peer {peer_id}");
|
||||
debug_assert!(false);
|
||||
@@ -1178,12 +1176,18 @@ where
|
||||
|
||||
self.pending_responses.insert(
|
||||
peer_id,
|
||||
key,
|
||||
PeerRequest::Block(request.clone()),
|
||||
async move { downloader.download_blocks(peer_id, request).await }.boxed(),
|
||||
);
|
||||
}
|
||||
|
||||
fn send_state_request(&mut self, peer_id: PeerId, request: OpaqueStateRequest) {
|
||||
fn send_state_request(
|
||||
&mut self,
|
||||
peer_id: PeerId,
|
||||
key: StrategyKey,
|
||||
request: OpaqueStateRequest,
|
||||
) {
|
||||
if !self.peers.contains_key(&peer_id) {
|
||||
trace!(target: LOG_TARGET, "Cannot send state request to unknown peer {peer_id}");
|
||||
debug_assert!(false);
|
||||
@@ -1192,7 +1196,7 @@ where
|
||||
|
||||
let (tx, rx) = oneshot::channel();
|
||||
|
||||
self.pending_responses.insert(peer_id, PeerRequest::State, rx.boxed());
|
||||
self.pending_responses.insert(peer_id, key, PeerRequest::State, rx.boxed());
|
||||
|
||||
match Self::encode_state_request(&request) {
|
||||
Ok(data) => {
|
||||
@@ -1213,7 +1217,12 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
fn send_warp_proof_request(&mut self, peer_id: PeerId, request: WarpProofRequest<B>) {
|
||||
fn send_warp_proof_request(
|
||||
&mut self,
|
||||
peer_id: PeerId,
|
||||
key: StrategyKey,
|
||||
request: WarpProofRequest<B>,
|
||||
) {
|
||||
if !self.peers.contains_key(&peer_id) {
|
||||
trace!(target: LOG_TARGET, "Cannot send warp proof request to unknown peer {peer_id}");
|
||||
debug_assert!(false);
|
||||
@@ -1222,7 +1231,7 @@ where
|
||||
|
||||
let (tx, rx) = oneshot::channel();
|
||||
|
||||
self.pending_responses.insert(peer_id, PeerRequest::WarpProof, rx.boxed());
|
||||
self.pending_responses.insert(peer_id, key, PeerRequest::WarpProof, rx.boxed());
|
||||
|
||||
match &self.warp_sync_protocol_name {
|
||||
Some(name) => self.network_service.start_request(
|
||||
@@ -1259,14 +1268,14 @@ where
|
||||
}
|
||||
|
||||
fn process_response_event(&mut self, response_event: ResponseEvent<B>) {
|
||||
let ResponseEvent { peer_id, request, response } = response_event;
|
||||
let ResponseEvent { peer_id, key, request, response } = response_event;
|
||||
|
||||
match response {
|
||||
Ok(Ok((resp, _))) => match request {
|
||||
PeerRequest::Block(req) => {
|
||||
match self.block_downloader.block_response_into_blocks(&req, resp) {
|
||||
Ok(blocks) => {
|
||||
self.strategy.on_block_response(peer_id, req, blocks);
|
||||
self.strategy.on_block_response(peer_id, key, req, blocks);
|
||||
},
|
||||
Err(BlockResponseError::DecodeFailed(e)) => {
|
||||
debug!(
|
||||
@@ -1311,10 +1320,10 @@ where
|
||||
},
|
||||
};
|
||||
|
||||
self.strategy.on_state_response(peer_id, response);
|
||||
self.strategy.on_state_response(peer_id, key, response);
|
||||
},
|
||||
PeerRequest::WarpProof => {
|
||||
self.strategy.on_warp_proof_response(&peer_id, EncodedProof(resp));
|
||||
self.strategy.on_warp_proof_response(&peer_id, key, EncodedProof(resp));
|
||||
},
|
||||
},
|
||||
Ok(Err(e)) => {
|
||||
|
||||
Reference in New Issue
Block a user