Extract warp sync strategy from ChainSync (#2467)

Extract `WarpSync` (and `StateSync` as part of warp sync) from
`ChainSync` as independent syncing strategy called by `SyncingEngine`.
Introduce `SyncingStrategy` enum as a proxy between `SyncingEngine` and
specific syncing strategies.

## Limitations
Gap sync is kept in `ChainSync` for now because it shares the same set
of peers as block syncing implementation in `ChainSync`. Extraction of a
common context responsible for peer management in syncing strategies
able to run in parallel is planned for a follow-up PR.

## Further improvements
A possibility of conversion of `SyncingStartegy` into a trait should be
evaluated. The main stopper for this is that different strategies need
to communicate different actions to `SyncingEngine` and respond to
different events / provide different APIs (e.g., requesting
justifications is only possible via `ChainSync` and not through
`WarpSync`; `SendWarpProofRequest` action is only relevant to
`WarpSync`, etc.)

---------

Co-authored-by: Aaro Altonen <48052676+altonen@users.noreply.github.com>
This commit is contained in:
Dmitry Markin
2024-01-12 17:17:15 +02:00
committed by GitHub
parent 5ed0a75fcd
commit 5208bed7d2
30 changed files with 3227 additions and 1016 deletions
+176 -177
View File
@@ -25,17 +25,20 @@ use crate::{
},
block_relay_protocol::{BlockDownloader, BlockResponseError},
block_request_handler::MAX_BLOCKS_IN_RESPONSE,
chain_sync::{ChainSync, ChainSyncAction},
pending_responses::{PendingResponses, ResponseEvent},
schema::v1::{StateRequest, StateResponse},
service::{
self,
syncing_service::{SyncingService, ToServiceCommand},
},
strategy::{
warp::{EncodedProof, WarpProofRequest, WarpSyncParams},
SyncingAction, SyncingConfig, SyncingStrategy,
},
types::{
BadPeer, ExtendedPeerInfo, OpaqueStateRequest, OpaqueStateResponse, PeerRequest, SyncEvent,
},
warp::{EncodedProof, WarpProofRequest, WarpSyncParams},
LOG_TARGET,
};
use codec::{Decode, DecodeAll, Encode};
@@ -45,10 +48,9 @@ use futures::{
FutureExt, StreamExt,
};
use libp2p::{request_response::OutboundFailure, PeerId};
use log::{debug, trace};
use log::{debug, error, trace};
use prometheus_endpoint::{
register, Counter, Gauge, GaugeVec, MetricSource, Opts, PrometheusError, Registry,
SourcedGauge, U64,
register, Counter, Gauge, MetricSource, Opts, PrometheusError, Registry, SourcedGauge, U64,
};
use prost::Message;
use schnellru::{ByLength, LruMap};
@@ -97,9 +99,6 @@ const TICK_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(1100)
/// Maximum number of known block hashes to keep for a peer.
const MAX_KNOWN_BLOCKS: usize = 1024; // ~32kb per peer + LruHashSet overhead
/// Logging target for the file.
const LOG_TARGET: &str = "sync";
/// If the block announces stream to peer has been inactive for 30 seconds meaning local node
/// has not sent or received block announcements to/from the peer, report the node for inactivity,
/// disconnect it and attempt to establish connection to some other peer.
@@ -140,9 +139,6 @@ mod rep {
struct Metrics {
peers: Gauge<U64>,
queued_blocks: Gauge<U64>,
fork_targets: Gauge<U64>,
justifications: GaugeVec<U64>,
import_queue_blocks_submitted: Counter<U64>,
import_queue_justifications_submitted: Counter<U64>,
}
@@ -155,25 +151,6 @@ impl Metrics {
let g = Gauge::new("substrate_sync_peers", "Number of peers we sync with")?;
register(g, r)?
},
queued_blocks: {
let g =
Gauge::new("substrate_sync_queued_blocks", "Number of blocks in import queue")?;
register(g, r)?
},
fork_targets: {
let g = Gauge::new("substrate_sync_fork_targets", "Number of fork sync targets")?;
register(g, r)?
},
justifications: {
let g = GaugeVec::new(
Opts::new(
"substrate_sync_extra_justifications",
"Number of extra justifications requests",
),
&["status"],
)?;
register(g, r)?
},
import_queue_blocks_submitted: {
let c = Counter::new(
"substrate_sync_import_queue_blocks_submitted",
@@ -234,9 +211,11 @@ pub struct Peer<B: BlockT> {
}
pub struct SyncingEngine<B: BlockT, Client> {
/// State machine that handles the list of in-progress requests. Only full node peers are
/// registered.
chain_sync: ChainSync<B, Client>,
/// Syncing strategy.
strategy: SyncingStrategy<B, Client>,
/// Syncing configuration for startegies.
syncing_config: SyncingConfig,
/// Blockchain client.
client: Arc<Client>,
@@ -381,6 +360,12 @@ where
} else {
net_config.network_config.max_blocks_per_request
};
let syncing_config = SyncingConfig {
mode,
max_parallel_downloads,
max_blocks_per_request,
metrics_registry: metrics_registry.cloned(),
};
let cache_capacity = (net_config.network_config.default_peers_set.in_peers +
net_config.network_config.default_peers_set.out_peers)
.max(1);
@@ -429,19 +414,6 @@ where
total.saturating_sub(net_config.network_config.default_peers_set_num_full) as usize
};
// Split warp sync params into warp sync config and a channel to retreive target block
// header.
let (warp_sync_config, warp_sync_target_block_header_rx) =
warp_sync_params.map_or((None, None), |params| {
let (config, target_block_rx) = params.split();
(Some(config), target_block_rx)
});
// Make sure polling of the target block channel is a no-op if there is no block to
// retrieve.
let warp_sync_target_block_header_rx_fused = warp_sync_target_block_header_rx
.map_or(futures::future::pending().boxed().fuse(), |rx| rx.boxed().fuse());
let (block_announce_config, notification_service) = Self::get_block_announce_proto_config(
protocol_id,
fork_id,
@@ -455,13 +427,22 @@ where
.expect("Genesis block exists; qed"),
);
let chain_sync = ChainSync::new(
mode,
client.clone(),
max_parallel_downloads,
max_blocks_per_request,
warp_sync_config,
)?;
// Split warp sync params into warp sync config and a channel to retreive target block
// header.
let (warp_sync_config, warp_sync_target_block_header_rx) =
warp_sync_params.map_or((None, None), |params| {
let (config, target_block_rx) = params.split();
(Some(config), target_block_rx)
});
// Make sure polling of the target block channel is a no-op if there is no block to
// retrieve.
let warp_sync_target_block_header_rx_fused = warp_sync_target_block_header_rx
.map_or(futures::future::pending().boxed().fuse(), |rx| rx.boxed().fuse());
// Initialize syncing strategy.
let strategy =
SyncingStrategy::new(syncing_config.clone(), client.clone(), warp_sync_config)?;
let block_announce_protocol_name = block_announce_config.protocol_name().clone();
let (tx, service_rx) = tracing_unbounded("mpsc_chain_sync", 100_000);
@@ -489,7 +470,8 @@ where
Self {
roles,
client,
chain_sync,
strategy,
syncing_config,
network_service,
peers: HashMap::new(),
block_announce_data_cache: LruMap::new(ByLength::new(cache_capacity)),
@@ -543,37 +525,19 @@ where
if let Some(metrics) = &self.metrics {
let n = u64::try_from(self.peers.len()).unwrap_or(std::u64::MAX);
metrics.peers.set(n);
let m = self.chain_sync.metrics();
metrics.fork_targets.set(m.fork_targets.into());
metrics.queued_blocks.set(m.queued_blocks.into());
metrics
.justifications
.with_label_values(&["pending"])
.set(m.justifications.pending_requests.into());
metrics
.justifications
.with_label_values(&["active"])
.set(m.justifications.active_requests.into());
metrics
.justifications
.with_label_values(&["failed"])
.set(m.justifications.failed_requests.into());
metrics
.justifications
.with_label_values(&["importing"])
.set(m.justifications.importing_requests.into());
}
self.strategy.report_metrics();
}
fn update_peer_info(&mut self, peer_id: &PeerId) {
if let Some(info) = self.chain_sync.peer_info(peer_id) {
if let Some(ref mut peer) = self.peers.get_mut(peer_id) {
peer.info.best_hash = info.best_hash;
peer.info.best_number = info.best_number;
}
fn update_peer_info(
&mut self,
peer_id: &PeerId,
best_hash: B::Hash,
best_number: NumberFor<B>,
) {
if let Some(ref mut peer) = self.peers.get_mut(peer_id) {
peer.info.best_hash = best_hash;
peer.info.best_number = best_number;
}
}
@@ -585,9 +549,11 @@ where
match validation_result {
BlockAnnounceValidationResult::Skip { peer_id: _ } => {},
BlockAnnounceValidationResult::Process { is_new_best, peer_id, announce } => {
self.chain_sync.on_validated_block_announce(is_new_best, peer_id, &announce);
self.update_peer_info(&peer_id);
if let Some((best_hash, best_number)) =
self.strategy.on_validated_block_announce(is_new_best, peer_id, &announce)
{
self.update_peer_info(&peer_id, best_hash, best_number);
}
if let Some(data) = announce.data {
if !data.is_empty() {
@@ -705,83 +671,106 @@ where
// Update atomic variables
self.num_connected.store(self.peers.len(), Ordering::Relaxed);
self.is_major_syncing
.store(self.chain_sync.status().state.is_major_syncing(), Ordering::Relaxed);
self.is_major_syncing.store(self.strategy.is_major_syncing(), Ordering::Relaxed);
// Process actions requested by `ChainSync`.
self.process_chain_sync_actions();
// Process actions requested by a syncing strategy.
if let Err(e) = self.process_strategy_actions() {
error!("Terminating `SyncingEngine` due to fatal error: {e:?}");
return
}
}
}
fn process_chain_sync_actions(&mut self) {
self.chain_sync.actions().for_each(|action| match action {
ChainSyncAction::SendBlockRequest { peer_id, request } => {
// Sending block request implies dropping obsolete pending response as we are not
// interested in it anymore (see [`ChainSyncAction::SendBlockRequest`]).
// Furthermore, only one request at a time is allowed to any peer.
let removed = self.pending_responses.remove(&peer_id);
self.send_block_request(peer_id, request.clone());
fn process_strategy_actions(&mut self) -> Result<(), ClientError> {
for action in self.strategy.actions() {
match action {
SyncingAction::SendBlockRequest { peer_id, request } => {
// Sending block request implies dropping obsolete pending response as we are
// not interested in it anymore (see [`SyncingAction::SendBlockRequest`]).
// Furthermore, only one request at a time is allowed to any peer.
let removed = self.pending_responses.remove(&peer_id);
self.send_block_request(peer_id, request.clone());
trace!(
target: LOG_TARGET,
"Processed `ChainSyncAction::SendBlockRequest` to {} with {:?}, stale response removed: {}.",
peer_id,
request,
removed,
)
},
ChainSyncAction::CancelBlockRequest { peer_id } => {
let removed = self.pending_responses.remove(&peer_id);
trace!(
target: LOG_TARGET,
"Processed `ChainSyncAction::SendBlockRequest` to {} with {:?}, stale response removed: {}.",
peer_id,
request,
removed,
)
},
SyncingAction::CancelBlockRequest { peer_id } => {
let removed = self.pending_responses.remove(&peer_id);
trace!(target: LOG_TARGET, "Processed {action:?}, response removed: {removed}.");
},
ChainSyncAction::SendStateRequest { peer_id, request } => {
self.send_state_request(peer_id, request);
trace!(
target: LOG_TARGET,
"Processed {action:?}, response removed: {removed}.",
);
},
SyncingAction::SendStateRequest { peer_id, request } => {
self.send_state_request(peer_id, request);
trace!(
target: LOG_TARGET,
"Processed `ChainSyncAction::SendBlockRequest` to {peer_id}.",
);
},
ChainSyncAction::SendWarpProofRequest { peer_id, request } => {
self.send_warp_proof_request(peer_id, request.clone());
trace!(
target: LOG_TARGET,
"Processed `ChainSyncAction::SendBlockRequest` to {peer_id}.",
);
},
SyncingAction::SendWarpProofRequest { peer_id, request } => {
self.send_warp_proof_request(peer_id, request.clone());
trace!(
target: LOG_TARGET,
"Processed `ChainSyncAction::SendWarpProofRequest` to {}, request: {:?}.",
peer_id,
request,
);
},
ChainSyncAction::DropPeer(BadPeer(peer_id, rep)) => {
self.pending_responses.remove(&peer_id);
self.network_service
.disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
self.network_service.report_peer(peer_id, rep);
trace!(
target: LOG_TARGET,
"Processed `ChainSyncAction::SendWarpProofRequest` to {}, request: {:?}.",
peer_id,
request,
);
},
SyncingAction::DropPeer(BadPeer(peer_id, rep)) => {
self.pending_responses.remove(&peer_id);
self.network_service
.disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
self.network_service.report_peer(peer_id, rep);
trace!(target: LOG_TARGET, "Processed {action:?}.");
},
ChainSyncAction::ImportBlocks { origin, blocks } => {
let count = blocks.len();
self.import_blocks(origin, blocks);
trace!(target: LOG_TARGET, "{peer_id:?} dropped: {rep:?}.");
},
SyncingAction::ImportBlocks { origin, blocks } => {
let count = blocks.len();
self.import_blocks(origin, blocks);
trace!(
target: LOG_TARGET,
"Processed `ChainSyncAction::ImportBlocks` with {count} blocks.",
);
},
ChainSyncAction::ImportJustifications { peer_id, hash, number, justifications } => {
self.import_justifications(peer_id, hash, number, justifications);
trace!(
target: LOG_TARGET,
"Processed `ChainSyncAction::ImportBlocks` with {count} blocks.",
);
},
SyncingAction::ImportJustifications { peer_id, hash, number, justifications } => {
self.import_justifications(peer_id, hash, number, justifications);
trace!(
target: LOG_TARGET,
"Processed `ChainSyncAction::ImportJustifications` from peer {} for block {} ({}).",
peer_id,
hash,
number,
)
},
});
trace!(
target: LOG_TARGET,
"Processed `ChainSyncAction::ImportJustifications` from peer {} for block {} ({}).",
peer_id,
hash,
number,
)
},
SyncingAction::Finished => {
let connected_peers = self.peers.iter().filter_map(|(peer_id, peer)| {
peer.info.roles.is_full().then_some((
*peer_id,
peer.info.best_hash,
peer.info.best_number,
))
});
self.strategy.switch_to_next(
self.syncing_config.clone(),
self.client.clone(),
connected_peers,
)?;
},
}
}
Ok(())
}
fn perform_periodic_actions(&mut self) {
@@ -824,18 +813,18 @@ where
fn process_service_command(&mut self, command: ToServiceCommand<B>) {
match command {
ToServiceCommand::SetSyncForkRequest(peers, hash, number) => {
self.chain_sync.set_sync_fork_request(peers, &hash, number);
self.strategy.set_sync_fork_request(peers, &hash, number);
},
ToServiceCommand::EventStream(tx) => self.event_streams.push(tx),
ToServiceCommand::RequestJustification(hash, number) =>
self.chain_sync.request_justification(&hash, number),
self.strategy.request_justification(&hash, number),
ToServiceCommand::ClearJustificationRequests =>
self.chain_sync.clear_justification_requests(),
self.strategy.clear_justification_requests(),
ToServiceCommand::BlocksProcessed(imported, count, results) => {
self.chain_sync.on_blocks_processed(imported, count, results);
self.strategy.on_blocks_processed(imported, count, results);
},
ToServiceCommand::JustificationImported(peer_id, hash, number, success) => {
self.chain_sync.on_justification_import(hash, number, success);
self.strategy.on_justification_import(hash, number, success);
if !success {
log::info!(
target: LOG_TARGET,
@@ -849,9 +838,9 @@ where
},
ToServiceCommand::AnnounceBlock(hash, data) => self.announce_block(hash, data),
ToServiceCommand::NewBestBlockImported(hash, number) => {
log::debug!(target: "sync", "New best block imported {:?}/#{}", hash, number);
log::debug!(target: LOG_TARGET, "New best block imported {:?}/#{}", hash, number);
self.chain_sync.update_chain_info(&hash, number);
self.strategy.update_chain_info(&hash, number);
let _ = self.notification_service.try_set_handshake(
BlockAnnouncesHandshake::<B>::build(
self.roles,
@@ -863,7 +852,7 @@ where
);
},
ToServiceCommand::Status(tx) => {
let mut status = self.chain_sync.status();
let mut status = self.strategy.status();
status.num_connected_peers = self.peers.len() as u32;
let _ = tx.send(status);
},
@@ -871,22 +860,22 @@ where
let _ = tx.send(self.num_active_peers());
},
ToServiceCommand::SyncState(tx) => {
let _ = tx.send(self.chain_sync.status());
let _ = tx.send(self.strategy.status());
},
ToServiceCommand::BestSeenBlock(tx) => {
let _ = tx.send(self.chain_sync.status().best_seen_block);
let _ = tx.send(self.strategy.status().best_seen_block);
},
ToServiceCommand::NumSyncPeers(tx) => {
let _ = tx.send(self.chain_sync.status().num_peers);
let _ = tx.send(self.strategy.status().num_peers);
},
ToServiceCommand::NumQueuedBlocks(tx) => {
let _ = tx.send(self.chain_sync.status().queued_blocks);
let _ = tx.send(self.strategy.status().queued_blocks);
},
ToServiceCommand::NumDownloadedBlocks(tx) => {
let _ = tx.send(self.chain_sync.num_downloaded_blocks());
let _ = tx.send(self.strategy.num_downloaded_blocks());
},
ToServiceCommand::NumSyncRequests(tx) => {
let _ = tx.send(self.chain_sync.num_sync_requests());
let _ = tx.send(self.strategy.num_sync_requests());
},
ToServiceCommand::PeersInfo(tx) => {
let peers_info = self
@@ -897,7 +886,7 @@ where
let _ = tx.send(peers_info);
},
ToServiceCommand::OnBlockFinalized(hash, header) =>
self.chain_sync.on_block_finalized(&hash, *header.number()),
self.strategy.on_block_finalized(&hash, *header.number()),
}
}
@@ -961,11 +950,18 @@ where
fn pass_warp_sync_target_block_header(&mut self, header: Result<B::Header, oneshot::Canceled>) {
match header {
Ok(header) => {
self.chain_sync.set_warp_sync_target_block(header);
},
Ok(header) =>
if let SyncingStrategy::WarpSyncStrategy(warp_sync) = &mut self.strategy {
warp_sync.set_target_block(header);
} else {
error!(
target: LOG_TARGET,
"Cannot set warp sync target block: no warp sync strategy is active."
);
debug_assert!(false);
},
Err(err) => {
log::error!(
error!(
target: LOG_TARGET,
"Failed to get target block for warp sync. Error: {err:?}",
);
@@ -1005,7 +1001,7 @@ where
}
}
self.chain_sync.peer_disconnected(&peer_id);
self.strategy.remove_peer(&peer_id);
self.pending_responses.remove(&peer_id);
self.event_streams
.retain(|stream| stream.unbounded_send(SyncEvent::PeerDisconnected(peer_id)).is_ok());
@@ -1091,7 +1087,7 @@ where
let this_peer_reserved_slot: usize = if no_slot_peer { 1 } else { 0 };
if handshake.roles.is_full() &&
self.chain_sync.num_peers() >=
self.strategy.num_peers() >=
self.default_peers_set_num_full +
self.default_peers_set_no_slot_connected_peers.len() +
this_peer_reserved_slot
@@ -1115,7 +1111,7 @@ where
// `ChainSync` only accepts full peers whereas `SyncingEngine` accepts both full and light
// peers. Verify that there is a slot in `SyncingEngine` for the inbound light peer
if handshake.roles.is_light() &&
(self.peers.len() - self.chain_sync.num_peers()) >= self.default_peers_set_num_light
(self.peers.len() - self.strategy.num_peers()) >= self.default_peers_set_num_light
{
log::debug!(target: LOG_TARGET, "Too many light nodes, rejecting {peer_id}");
return Err(false)
@@ -1149,7 +1145,10 @@ where
inbound: direction.is_inbound(),
};
self.chain_sync.new_peer(peer_id, peer.info.best_hash, peer.info.best_number);
// Only forward full peers to syncing strategy.
if status.roles.is_full() {
self.strategy.add_peer(peer_id, peer.info.best_hash, peer.info.best_number);
}
log::debug!(target: LOG_TARGET, "Connected {peer_id}");
@@ -1267,7 +1266,7 @@ where
PeerRequest::Block(req) => {
match self.block_downloader.block_response_into_blocks(&req, resp) {
Ok(blocks) => {
self.chain_sync.on_block_response(peer_id, req, blocks);
self.strategy.on_block_response(peer_id, req, blocks);
},
Err(BlockResponseError::DecodeFailed(e)) => {
debug!(
@@ -1312,10 +1311,10 @@ where
},
};
self.chain_sync.on_state_response(peer_id, response);
self.strategy.on_state_response(peer_id, response);
},
PeerRequest::WarpProof => {
self.chain_sync.on_warp_sync_response(&peer_id, EncodedProof(resp));
self.strategy.on_warp_proof_response(&peer_id, EncodedProof(resp));
},
},
Ok(Err(e)) => {