Get rid of polling in WarpSync (#1265)

This commit is contained in:
Dmitry Markin
2023-09-05 12:34:50 +03:00
committed by GitHub
parent adf847a582
commit 12194445b2
9 changed files with 344 additions and 234 deletions
+88 -30
View File
@@ -24,11 +24,16 @@ use crate::{
BlockAnnounceValidationResult, BlockAnnounceValidator as BlockAnnounceValidatorStream,
},
service::{self, chain_sync::ToServiceCommand},
warp::WarpSyncParams,
ChainSync, ClientError, SyncingService,
};
use codec::{Decode, Encode};
use futures::{FutureExt, StreamExt};
use futures::{
channel::oneshot,
future::{BoxFuture, Fuse},
FutureExt, StreamExt,
};
use futures_timer::Delay;
use libp2p::PeerId;
use prometheus_endpoint::{
@@ -47,7 +52,6 @@ use sc_network_common::{
role::Roles,
sync::{
message::{BlockAnnounce, BlockAnnouncesHandshake, BlockState},
warp::WarpSyncParams,
BadPeer, ChainSync as ChainSyncT, ExtendedPeerInfo, SyncEvent,
},
};
@@ -67,6 +71,9 @@ use std::{
time::{Duration, Instant},
};
/// Log target for this file.
const LOG_TARGET: &'static str = "sync";
/// Interval at which we perform time based maintenance
const TICK_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(1100);
@@ -251,6 +258,10 @@ pub struct SyncingEngine<B: BlockT, Client> {
/// The `PeerId`'s of all boot nodes.
boot_node_ids: HashSet<PeerId>,
/// A channel to get target block header if we skip over proofs downloading during warp sync.
warp_sync_target_block_header_rx:
Fuse<BoxFuture<'static, Result<B::Header, oneshot::Canceled>>>,
/// Protocol name used for block announcements
block_announce_protocol_name: ProtocolName,
@@ -299,7 +310,11 @@ where
let max_blocks_per_request = if net_config.network_config.max_blocks_per_request >
crate::MAX_BLOCKS_IN_RESPONSE as u32
{
log::info!(target: "sync", "clamping maximum blocks per request to {}", crate::MAX_BLOCKS_IN_RESPONSE);
log::info!(
target: LOG_TARGET,
"clamping maximum blocks per request to {}",
crate::MAX_BLOCKS_IN_RESPONSE,
);
crate::MAX_BLOCKS_IN_RESPONSE as u32
} else {
net_config.network_config.max_blocks_per_request
@@ -352,6 +367,19 @@ where
total.saturating_sub(net_config.network_config.default_peers_set_num_full) as usize
};
// Split warp sync params into warp sync config and a channel to retreive target block
// header.
let (warp_sync_config, warp_sync_target_block_header_rx) =
warp_sync_params.map_or((None, None), |params| {
let (config, target_block_rx) = params.split();
(Some(config), target_block_rx)
});
// Make sure polling of the target block channel is a no-op if there is no block to
// retrieve.
let warp_sync_target_block_header_rx = warp_sync_target_block_header_rx
.map_or(futures::future::pending().boxed().fuse(), |rx| rx.boxed().fuse());
let (chain_sync, block_announce_config) = ChainSync::new(
mode,
client.clone(),
@@ -360,7 +388,7 @@ where
roles,
max_parallel_downloads,
max_blocks_per_request,
warp_sync_params,
warp_sync_config,
metrics_registry,
network_service.clone(),
import_queue,
@@ -404,6 +432,7 @@ where
genesis_hash,
important_peers,
default_peers_set_no_slot_connected_peers: HashSet::new(),
warp_sync_target_block_header_rx,
boot_node_ids,
default_peers_set_no_slot_peers,
default_peers_set_num_full,
@@ -418,7 +447,7 @@ where
match Metrics::register(r, is_major_syncing.clone()) {
Ok(metrics) => Some(metrics),
Err(err) => {
log::error!(target: "sync", "Failed to register metrics {err:?}");
log::error!(target: LOG_TARGET, "Failed to register metrics {err:?}");
None
},
}
@@ -510,7 +539,10 @@ where
let peer = match self.peers.get_mut(&peer_id) {
Some(p) => p,
None => {
log::error!(target: "sync", "Received block announce from disconnected peer {}", peer_id);
log::error!(
target: LOG_TARGET,
"Received block announce from disconnected peer {peer_id}",
);
debug_assert!(false);
return
},
@@ -536,11 +568,11 @@ where
let header = match self.client.header(hash) {
Ok(Some(header)) => header,
Ok(None) => {
log::warn!(target: "sync", "Trying to announce unknown block: {}", hash);
log::warn!(target: LOG_TARGET, "Trying to announce unknown block: {hash}");
return
},
Err(e) => {
log::warn!(target: "sync", "Error reading block header {}: {}", hash, e);
log::warn!(target: LOG_TARGET, "Error reading block header {hash}: {e}");
return
},
};
@@ -551,7 +583,7 @@ where
}
let is_best = self.client.info().best_hash == hash;
log::debug!(target: "sync", "Reannouncing block {:?} is_best: {}", hash, is_best);
log::debug!(target: LOG_TARGET, "Reannouncing block {hash:?} is_best: {is_best}");
let data = data
.or_else(|| self.block_announce_data_cache.get(&hash).cloned())
@@ -560,7 +592,7 @@ where
for (peer_id, ref mut peer) in self.peers.iter_mut() {
let inserted = peer.known_blocks.insert(hash);
if inserted {
log::trace!(target: "sync", "Announcing block {:?} to {}", hash, peer_id);
log::trace!(target: LOG_TARGET, "Announcing block {hash:?} to {peer_id}");
let message = BlockAnnounce {
header: header.clone(),
state: if is_best { Some(BlockState::Best) } else { Some(BlockState::Normal) },
@@ -575,7 +607,7 @@ where
/// Inform sync about new best imported block.
pub fn new_best_block_imported(&mut self, hash: B::Hash, number: NumberFor<B>) {
log::debug!(target: "sync", "New best block imported {:?}/#{}", hash, number);
log::debug!(target: LOG_TARGET, "New best block imported {hash:?}/#{number}");
self.chain_sync.update_chain_info(&hash, number);
self.network_service.set_notification_handshake(
@@ -619,7 +651,10 @@ where
// consider it connected or are also all stalled. In order to unstall the node,
// disconnect all peers and allow `ProtocolController` to establish new connections.
if self.last_notification_io.elapsed() > INACTIVITY_EVICT_THRESHOLD {
log::debug!(target: "sync", "syncing has halted due to inactivity, evicting all peers");
log::debug!(
target: LOG_TARGET,
"syncing has halted due to inactivity, evicting all peers",
);
for peer in self.peers.keys() {
self.network_service.report_peer(*peer, rep::INACTIVE_SUBSTREAM);
@@ -658,7 +693,10 @@ where
ToServiceCommand::JustificationImported(peer_id, hash, number, success) => {
self.chain_sync.on_justification_import(hash, number, success);
if !success {
log::info!(target: "sync", "💔 Invalid justification provided by {} for #{}", peer_id, hash);
log::info!(
target: LOG_TARGET,
"💔 Invalid justification provided by {peer_id} for #{hash}",
);
self.network_service
.disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
self.network_service.report_peer(
@@ -723,7 +761,7 @@ where
},
Err(()) => {
log::debug!(
target: "sync",
target: LOG_TARGET,
"Failed to register peer {remote:?}: {received_handshake:?}",
);
let _ = tx.send(false);
@@ -732,7 +770,7 @@ where
sc_network::SyncEvent::NotificationStreamClosed { remote } => {
if self.on_sync_peer_disconnected(remote).is_err() {
log::trace!(
target: "sync",
target: LOG_TARGET,
"Disconnected peer which had earlier been refused by on_sync_peer_connected {}",
remote
);
@@ -749,9 +787,8 @@ where
}
} else {
log::trace!(
target: "sync",
"Received sync for peer earlier refused by sync layer: {}",
remote
target: LOG_TARGET,
"Received sync for peer earlier refused by sync layer: {remote}",
);
}
}
@@ -764,6 +801,21 @@ where
}
}
// Retreive warp sync target block header just before polling `ChainSync`
// to make progress as soon as we receive it.
match self.warp_sync_target_block_header_rx.poll_unpin(cx) {
Poll::Ready(Ok(target)) => {
self.chain_sync.set_warp_sync_target_block(target);
},
Poll::Ready(Err(err)) => {
log::error!(
target: LOG_TARGET,
"Failed to get target block for warp sync. Error: {err:?}",
);
},
Poll::Pending => {},
}
// Drive `ChainSync`.
while let Poll::Ready(()) = self.chain_sync.poll(cx) {}
@@ -784,9 +836,9 @@ where
pub fn on_sync_peer_disconnected(&mut self, peer_id: PeerId) -> Result<(), ()> {
if let Some(info) = self.peers.remove(&peer_id) {
if self.important_peers.contains(&peer_id) {
log::warn!(target: "sync", "Reserved peer {} disconnected", peer_id);
log::warn!(target: LOG_TARGET, "Reserved peer {peer_id} disconnected");
} else {
log::debug!(target: "sync", "{} disconnected", peer_id);
log::debug!(target: LOG_TARGET, "{peer_id} disconnected");
}
if !self.default_peers_set_no_slot_connected_peers.remove(&peer_id) &&
@@ -798,7 +850,7 @@ where
},
None => {
log::error!(
target: "sync",
target: LOG_TARGET,
"trying to disconnect an inbound node which is not counted as inbound"
);
debug_assert!(false);
@@ -828,10 +880,13 @@ where
sink: NotificationsSink,
inbound: bool,
) -> Result<(), ()> {
log::trace!(target: "sync", "New peer {} {:?}", peer_id, status);
log::trace!(target: LOG_TARGET, "New peer {peer_id} {status:?}");
if self.peers.contains_key(&peer_id) {
log::error!(target: "sync", "Called on_sync_peer_connected with already connected peer {}", peer_id);
log::error!(
target: LOG_TARGET,
"Called on_sync_peer_connected with already connected peer {peer_id}",
);
debug_assert!(false);
return Err(())
}
@@ -841,7 +896,7 @@ where
if self.important_peers.contains(&peer_id) {
log::error!(
target: "sync",
target: LOG_TARGET,
"Reserved peer id `{}` is on a different chain (our genesis: {} theirs: {})",
peer_id,
self.genesis_hash,
@@ -849,7 +904,7 @@ where
);
} else if self.boot_node_ids.contains(&peer_id) {
log::error!(
target: "sync",
target: LOG_TARGET,
"Bootnode with peer id `{}` is on a different chain (our genesis: {} theirs: {})",
peer_id,
self.genesis_hash,
@@ -857,7 +912,7 @@ where
);
} else {
log::debug!(
target: "sync",
target: LOG_TARGET,
"Peer is on different chain (our genesis: {} theirs: {})",
self.genesis_hash, status.genesis_hash
);
@@ -874,7 +929,10 @@ where
status.roles.is_full() &&
inbound && self.num_in_peers == self.max_in_peers
{
log::debug!(target: "sync", "All inbound slots have been consumed, rejecting {peer_id}");
log::debug!(
target: LOG_TARGET,
"All inbound slots have been consumed, rejecting {peer_id}",
);
return Err(())
}
@@ -884,7 +942,7 @@ where
self.default_peers_set_no_slot_connected_peers.len() +
this_peer_reserved_slot
{
log::debug!(target: "sync", "Too many full nodes, rejecting {}", peer_id);
log::debug!(target: LOG_TARGET, "Too many full nodes, rejecting {peer_id}");
return Err(())
}
@@ -892,7 +950,7 @@ where
(self.peers.len() - self.chain_sync.num_peers()) >= self.default_peers_set_num_light
{
// Make sure that not all slots are occupied by light clients.
log::debug!(target: "sync", "Too many light nodes, rejecting {}", peer_id);
log::debug!(target: LOG_TARGET, "Too many light nodes, rejecting {peer_id}");
return Err(())
}
@@ -921,7 +979,7 @@ where
None
};
log::debug!(target: "sync", "Connected {}", peer_id);
log::debug!(target: LOG_TARGET, "Connected {peer_id}");
self.peers.insert(peer_id, peer);
+186 -149
View File
@@ -32,7 +32,7 @@ use crate::{
blocks::BlockCollection,
schema::v1::{StateRequest, StateResponse},
state::StateSync,
warp::{WarpProofImportResult, WarpSync},
warp::{WarpProofImportResult, WarpSync, WarpSyncConfig},
};
use codec::{Decode, DecodeAll, Encode};
@@ -61,7 +61,7 @@ use sc_network_common::{
BlockAnnounce, BlockAnnouncesHandshake, BlockAttributes, BlockData, BlockRequest,
BlockResponse, Direction, FromBlock,
},
warp::{EncodedProof, WarpProofRequest, WarpSyncParams, WarpSyncPhase, WarpSyncProgress},
warp::{EncodedProof, WarpProofRequest, WarpSyncPhase, WarpSyncProgress},
BadPeer, ChainSync as ChainSyncT, ImportResult, Metrics, OnBlockData, OnBlockJustification,
OnStateData, OpaqueBlockRequest, OpaqueBlockResponse, OpaqueStateRequest,
OpaqueStateResponse, PeerInfo, PeerRequest, SyncMode, SyncState, SyncStatus,
@@ -103,6 +103,9 @@ pub mod state_request_handler;
pub mod warp;
pub mod warp_request_handler;
/// Log target for this file.
const LOG_TARGET: &'static str = "sync";
/// Maximum blocks to store in the import queue.
const MAX_IMPORTING_BLOCKS: usize = 2048;
@@ -302,10 +305,12 @@ pub struct ChainSync<B: BlockT, Client> {
state_sync: Option<StateSync<B, Client>>,
/// Warp sync in progress, if any.
warp_sync: Option<WarpSync<B, Client>>,
/// Warp sync params.
/// Warp sync configuration.
///
/// Will be `None` after `self.warp_sync` is `Some(_)`.
warp_sync_params: Option<WarpSyncParams<B>>,
warp_sync_config: Option<WarpSyncConfig<B>>,
/// A temporary storage for warp sync target block until warp sync is initialized.
warp_sync_target_block_header: Option<B::Header>,
/// Enable importing existing blocks. This is used used after the state download to
/// catch up to the latest state while re-importing blocks.
import_existing: bool,
@@ -351,7 +356,7 @@ impl<B: BlockT> PeerSync<B> {
fn update_common_number(&mut self, new_common: NumberFor<B>) {
if self.common_number < new_common {
trace!(
target: "sync",
target: LOG_TARGET,
"Updating peer {} common number from={} => to={}.",
self.peer_id,
self.common_number,
@@ -497,7 +502,7 @@ where
// There is nothing sync can get from the node that has no blockchain data.
match self.block_status(&best_hash) {
Err(e) => {
debug!(target:"sync", "Error reading blockchain: {}", e);
debug!(target:LOG_TARGET, "Error reading blockchain: {e}");
Err(BadPeer(who, rep::BLOCKCHAIN_READ_ERROR))
},
Ok(BlockStatus::KnownBad) => {
@@ -515,7 +520,7 @@ where
// an ancestor search, which is what we do in the next match case below.
if self.queue_blocks.len() > MAJOR_SYNC_BLOCKS.into() {
debug!(
target:"sync",
target:LOG_TARGET,
"New peer with unknown best hash {} ({}), assuming common block.",
self.best_queued_hash,
self.best_queued_number
@@ -536,10 +541,8 @@ where
// If we are at genesis, just start downloading.
let (state, req) = if self.best_queued_number.is_zero() {
debug!(
target:"sync",
"New peer with best hash {} ({}).",
best_hash,
best_number,
target:LOG_TARGET,
"New peer with best hash {best_hash} ({best_number}).",
);
(PeerSyncState::Available, None)
@@ -547,7 +550,7 @@ where
let common_best = std::cmp::min(self.best_queued_number, best_number);
debug!(
target:"sync",
target:LOG_TARGET,
"New peer with unknown best hash {} ({}), searching for common ancestor.",
best_hash,
best_number
@@ -578,9 +581,14 @@ where
if let SyncMode::Warp = self.mode {
if self.peers.len() >= MIN_PEERS_TO_START_WARP_SYNC && self.warp_sync.is_none()
{
log::debug!(target: "sync", "Starting warp state sync.");
if let Some(params) = self.warp_sync_params.take() {
self.warp_sync = Some(WarpSync::new(self.client.clone(), params));
log::debug!(target: LOG_TARGET, "Starting warp state sync.");
if let Some(config) = self.warp_sync_config.take() {
let mut warp_sync = WarpSync::new(self.client.clone(), config);
if let Some(header) = self.warp_sync_target_block_header.take() {
warp_sync.set_target_block(header);
}
self.warp_sync = Some(warp_sync);
}
}
}
@@ -590,10 +598,8 @@ where
Ok(BlockStatus::InChainWithState) |
Ok(BlockStatus::InChainPruned) => {
debug!(
target: "sync",
"New peer with known best hash {} ({}).",
best_hash,
best_number,
target: LOG_TARGET,
"New peer with known best hash {best_hash} ({best_number}).",
);
self.peers.insert(
who,
@@ -642,21 +648,23 @@ where
.collect();
debug!(
target: "sync",
"Explicit sync request for block {:?} with no peers specified. \
Syncing from these peers {:?} instead.",
hash, peers,
target: LOG_TARGET,
"Explicit sync request for block {hash:?} with no peers specified. \
Syncing from these peers {peers:?} instead.",
);
} else {
debug!(target: "sync", "Explicit sync request for block {:?} with {:?}", hash, peers);
debug!(
target: LOG_TARGET,
"Explicit sync request for block {hash:?} with {peers:?}",
);
}
if self.is_known(hash) {
debug!(target: "sync", "Refusing to sync known hash {:?}", hash);
debug!(target: LOG_TARGET, "Refusing to sync known hash {hash:?}");
return
}
trace!(target: "sync", "Downloading requested old fork {:?}", hash);
trace!(target: LOG_TARGET, "Downloading requested old fork {hash:?}");
for peer_id in &peers {
if let Some(peer) = self.peers.get_mut(peer_id) {
if let PeerSyncState::AncestorSearch { .. } = peer.state {
@@ -689,7 +697,7 @@ where
let new_blocks: Vec<IncomingBlock<B>> = if let Some(peer) = self.peers.get_mut(who) {
let mut blocks = response.blocks;
if request.as_ref().map_or(false, |r| r.direction == Direction::Descending) {
trace!(target: "sync", "Reversing incoming block list");
trace!(target: LOG_TARGET, "Reversing incoming block list");
blocks.reverse()
}
self.allowed_requests.add(who);
@@ -740,17 +748,22 @@ where
}
})
.collect();
debug!(target: "sync", "Drained {} gap blocks from {}", blocks.len(), gap_sync.best_queued_number);
debug!(
target: LOG_TARGET,
"Drained {} gap blocks from {}",
blocks.len(),
gap_sync.best_queued_number,
);
blocks
} else {
debug!(target: "sync", "Unexpected gap block response from {}", who);
debug!(target: LOG_TARGET, "Unexpected gap block response from {who}");
return Err(BadPeer(*who, rep::NO_BLOCK))
}
},
PeerSyncState::DownloadingStale(_) => {
peer.state = PeerSyncState::Available;
if blocks.is_empty() {
debug!(target: "sync", "Empty block response from {}", who);
debug!(target: LOG_TARGET, "Empty block response from {who}");
return Err(BadPeer(*who, rep::NO_BLOCK))
}
validate_blocks::<B>(&blocks, who, Some(request))?;
@@ -779,7 +792,7 @@ where
let matching_hash = match (blocks.get(0), self.client.hash(*current)) {
(Some(block), Ok(maybe_our_block_hash)) => {
trace!(
target: "sync",
target: LOG_TARGET,
"Got ancestry block #{} ({}) from peer {}",
current,
block.hash,
@@ -789,17 +802,15 @@ where
},
(None, _) => {
debug!(
target: "sync",
"Invalid response when searching for ancestor from {}",
who,
target: LOG_TARGET,
"Invalid response when searching for ancestor from {who}",
);
return Err(BadPeer(*who, rep::UNKNOWN_ANCESTOR))
},
(_, Err(e)) => {
info!(
target: "sync",
"❌ Error answering legitimate blockchain query: {}",
e,
target: LOG_TARGET,
"❌ Error answering legitimate blockchain query: {e}",
);
return Err(BadPeer(*who, rep::BLOCKCHAIN_READ_ERROR))
},
@@ -817,7 +828,10 @@ where
}
}
if matching_hash.is_none() && current.is_zero() {
trace!(target:"sync", "Ancestry search: genesis mismatch for peer {}", who);
trace!(
target:LOG_TARGET,
"Ancestry search: genesis mismatch for peer {who}",
);
return Err(BadPeer(*who, rep::GENESIS_MISMATCH))
}
if let Some((next_state, next_num)) =
@@ -833,7 +847,7 @@ where
// Ancestry search is complete. Check if peer is on a stale fork unknown
// to us and add it to sync targets if necessary.
trace!(
target: "sync",
target: LOG_TARGET,
"Ancestry search complete. Ours={} ({}), Theirs={} ({}), Common={:?} ({})",
self.best_queued_hash,
self.best_queued_number,
@@ -846,7 +860,7 @@ where
peer.best_number < self.best_queued_number
{
trace!(
target: "sync",
target: LOG_TARGET,
"Added fork target {} for {}",
peer.best_hash,
who,
@@ -879,11 +893,11 @@ where
return Err(BadPeer(*who, rep::VERIFICATION_FAIL)),
}
} else if blocks.is_empty() {
debug!(target: "sync", "Empty block response from {}", who);
debug!(target: LOG_TARGET, "Empty block response from {who}");
return Err(BadPeer(*who, rep::NO_BLOCK))
} else {
debug!(
target: "sync",
target: LOG_TARGET,
"Too many blocks ({}) in warp target block response from {}",
blocks.len(),
who,
@@ -892,7 +906,7 @@ where
}
} else {
debug!(
target: "sync",
target: LOG_TARGET,
"Logic error: we think we are downloading warp target block from {}, but no warp sync is happening.",
who,
);
@@ -944,7 +958,10 @@ where
let peer = if let Some(peer) = self.peers.get_mut(&who) {
peer
} else {
error!(target: "sync", "💔 Called on_block_justification with a peer ID of an unknown peer");
error!(
target: LOG_TARGET,
"💔 Called on_block_justification with a peer ID of an unknown peer",
);
return Ok(OnBlockJustification::Nothing)
};
@@ -956,7 +973,7 @@ where
let justification = if let Some(block) = response.blocks.into_iter().next() {
if hash != block.hash {
warn!(
target: "sync",
target: LOG_TARGET,
"💔 Invalid block justification provided by {}: requested: {:?} got: {:?}",
who,
hash,
@@ -972,10 +989,8 @@ where
// we might have asked the peer for a justification on a block that we assumed it
// had but didn't (regardless of whether it had a justification for it or not).
trace!(
target: "sync",
"Peer {:?} provided empty response for justification request {:?}",
who,
hash,
target: LOG_TARGET,
"Peer {who:?} provided empty response for justification request {hash:?}",
);
None
@@ -1013,10 +1028,8 @@ where
if number + STATE_SYNC_FINALITY_THRESHOLD.saturated_into() >= median {
if let Ok(Some(header)) = self.client.header(*hash) {
log::debug!(
target: "sync",
"Starting state sync for #{} ({})",
number,
hash,
target: LOG_TARGET,
"Starting state sync for #{number} ({hash})",
);
self.state_sync = Some(StateSync::new(
self.client.clone(),
@@ -1033,9 +1046,8 @@ where
if let Err(err) = r {
warn!(
target: "sync",
"💔 Error cleaning up pending extra justification data requests: {}",
err,
target: LOG_TARGET,
"💔 Error cleaning up pending extra justification data requests: {err}",
);
}
}
@@ -1057,12 +1069,12 @@ where
let peer = if let Some(peer) = self.peers.get_mut(&who) {
peer
} else {
error!(target: "sync", "💔 Called `on_validated_block_announce` with a bad peer ID");
error!(target: LOG_TARGET, "💔 Called `on_validated_block_announce` with a bad peer ID");
return
};
if let PeerSyncState::AncestorSearch { .. } = peer.state {
trace!(target: "sync", "Peer {} is in the ancestor search state.", who);
trace!(target: LOG_TARGET, "Peer {} is in the ancestor search state.", who);
return
}
@@ -1222,12 +1234,6 @@ where
}
fn poll(&mut self, cx: &mut std::task::Context) -> Poll<()> {
// Should be called before `process_outbound_requests` to ensure
// that a potential target block is directly leading to requests.
if let Some(warp_sync) = &mut self.warp_sync {
let _ = warp_sync.poll(cx);
}
self.process_outbound_requests();
while let Poll::Ready(result) = self.poll_pending_responses(cx) {
@@ -1262,9 +1268,8 @@ where
},
Err(err) => {
log::warn!(
target: "sync",
"Failed to encode block request {:?}: {:?}",
opaque_req, err
target: LOG_TARGET,
"Failed to encode block request {opaque_req:?}: {err:?}",
);
},
}
@@ -1292,7 +1297,7 @@ where
roles: Roles,
max_parallel_downloads: u32,
max_blocks_per_request: u32,
warp_sync_params: Option<WarpSyncParams<B>>,
warp_sync_config: Option<WarpSyncConfig<B>>,
metrics_registry: Option<&Registry>,
network_service: service::network::NetworkServiceHandle,
import_queue: Box<dyn ImportQueueService<B>>,
@@ -1334,7 +1339,8 @@ where
network_service,
block_request_protocol_name,
state_request_protocol_name,
warp_sync_params,
warp_sync_config,
warp_sync_target_block_header: None,
warp_sync_protocol_name,
block_announce_protocol_name: block_announce_config
.notifications_protocol
@@ -1346,7 +1352,10 @@ where
match SyncingMetrics::register(r) {
Ok(metrics) => Some(metrics),
Err(err) => {
error!(target: "sync", "Failed to register metrics for ChainSync: {err:?}");
error!(
target: LOG_TARGET,
"Failed to register metrics for ChainSync: {err:?}",
);
None
},
}
@@ -1403,7 +1412,7 @@ where
new_blocks.retain(|b| !self.queue_blocks.contains(&b.hash));
if new_blocks.len() != orig_len {
debug!(
target: "sync",
target: LOG_TARGET,
"Ignoring {} blocks that are already queued",
orig_len - new_blocks.len(),
);
@@ -1420,7 +1429,7 @@ where
.and_then(|b| b.header.as_ref().map(|h| (&b.hash, *h.number())))
{
trace!(
target:"sync",
target:LOG_TARGET,
"Accepted {} blocks ({:?}) with origin {:?}",
new_blocks.len(),
h,
@@ -1444,7 +1453,7 @@ where
/// through all peers to update our view of their state as well.
fn on_block_queued(&mut self, hash: &B::Hash, number: NumberFor<B>) {
if self.fork_targets.remove(hash).is_some() {
trace!(target: "sync", "Completed fork sync {:?}", hash);
trace!(target: LOG_TARGET, "Completed fork sync {hash:?}");
}
if let Some(gap_sync) = &mut self.gap_sync {
if number > gap_sync.best_queued_number && number <= gap_sync.target {
@@ -1463,7 +1472,7 @@ where
let new_common_number =
if peer.best_number >= number { number } else { peer.best_number };
trace!(
target: "sync",
target: LOG_TARGET,
"Updating peer {} info, ours={}, common={}->{}, their best={}",
n,
number,
@@ -1483,10 +1492,15 @@ where
fn restart(&mut self) -> impl Iterator<Item = Result<(PeerId, BlockRequest<B>), BadPeer>> + '_ {
self.blocks.clear();
if let Err(e) = self.reset_sync_start_point() {
warn!(target: "sync", "💔 Unable to restart sync: {}", e);
warn!(target: LOG_TARGET, "💔 Unable to restart sync: {e}");
}
self.allowed_requests.set_all();
debug!(target:"sync", "Restarted with {} ({})", self.best_queued_number, self.best_queued_hash);
debug!(
target: LOG_TARGET,
"Restarted with {} ({})",
self.best_queued_number,
self.best_queued_hash,
);
let old_peers = std::mem::take(&mut self.peers);
old_peers.into_iter().filter_map(move |(id, mut p)| {
@@ -1517,14 +1531,14 @@ where
let info = self.client.info();
if matches!(self.mode, SyncMode::LightState { .. }) && info.finalized_state.is_some() {
warn!(
target: "sync",
target: LOG_TARGET,
"Can't use fast sync mode with a partially synced database. Reverting to full sync mode."
);
self.mode = SyncMode::Full;
}
if matches!(self.mode, SyncMode::Warp) && info.finalized_state.is_some() {
warn!(
target: "sync",
target: LOG_TARGET,
"Can't use warp sync mode with a partially synced database. Reverting to full sync mode."
);
self.mode = SyncMode::Full;
@@ -1539,25 +1553,30 @@ where
self.import_existing = true;
// Latest state is missing, start with the last finalized state or genesis instead.
if let Some((hash, number)) = info.finalized_state {
debug!(target: "sync", "Starting from finalized state #{}", number);
debug!(target: LOG_TARGET, "Starting from finalized state #{number}");
self.best_queued_hash = hash;
self.best_queued_number = number;
} else {
debug!(target: "sync", "Restarting from genesis");
debug!(target: LOG_TARGET, "Restarting from genesis");
self.best_queued_hash = Default::default();
self.best_queued_number = Zero::zero();
}
}
if let Some((start, end)) = info.block_gap {
debug!(target: "sync", "Starting gap sync #{} - #{}", start, end);
debug!(target: LOG_TARGET, "Starting gap sync #{start} - #{end}");
self.gap_sync = Some(GapSync {
best_queued_number: start - One::one(),
target: end,
blocks: BlockCollection::new(),
});
}
trace!(target: "sync", "Restarted sync at #{} ({:?})", self.best_queued_number, self.best_queued_hash);
trace!(
target: LOG_TARGET,
"Restarted sync at #{} ({:?})",
self.best_queued_number,
self.best_queued_hash,
);
Ok(())
}
@@ -1607,6 +1626,15 @@ where
.collect()
}
/// Set warp sync target block externally in case we skip warp proof downloading.
pub fn set_warp_sync_target_block(&mut self, header: B::Header) {
if let Some(ref mut warp_sync) = self.warp_sync {
warp_sync.set_target_block(header);
} else {
self.warp_sync_target_block_header = Some(header);
}
}
/// Generate block request for downloading of the target block body during warp sync.
fn warp_target_block_request(&mut self) -> Option<(PeerId, BlockRequest<B>)> {
let sync = &self.warp_sync.as_ref()?;
@@ -1625,7 +1653,7 @@ where
// Find a random peer that has a block with the target number.
for (id, peer) in self.peers.iter_mut() {
if peer.state.is_available() && peer.best_number >= target_number {
trace!(target: "sync", "New warp target block request for {}", id);
trace!(target: LOG_TARGET, "New warp target block request for {id}");
peer.state = PeerSyncState::DownloadingWarpTargetBlock;
self.allowed_requests.clear();
return Some((*id, request))
@@ -1716,9 +1744,8 @@ where
},
Err(err) => {
log::warn!(
target: "sync",
"Failed to encode state request {:?}: {:?}",
request, err
target: LOG_TARGET,
"Failed to encode state request {request:?}: {err:?}",
);
},
}
@@ -1742,9 +1769,8 @@ where
),
None => {
log::warn!(
target: "sync",
"Trying to send warp sync request when no protocol is configured {:?}",
request,
target: LOG_TARGET,
"Trying to send warp sync request when no protocol is configured {request:?}",
);
},
}
@@ -1759,7 +1785,12 @@ where
let blocks = match self.block_response_into_blocks(&request, response) {
Ok(blocks) => blocks,
Err(err) => {
debug!(target: "sync", "Failed to decode block response from {}: {}", peer_id, err);
debug!(
target: LOG_TARGET,
"Failed to decode block response from {}: {}",
peer_id,
err,
);
self.network_service.report_peer(peer_id, rep::BAD_MESSAGE);
return None
},
@@ -1779,7 +1810,7 @@ where
_ => Default::default(),
};
trace!(
target: "sync",
target: LOG_TARGET,
"BlockResponse {} from {} with {} blocks {}",
block_response.id,
peer_id,
@@ -1888,10 +1919,8 @@ where
Ok(proto) => proto,
Err(e) => {
debug!(
target: "sync",
"Failed to decode block response from peer {:?}: {:?}.",
id,
e
target: LOG_TARGET,
"Failed to decode block response from peer {id:?}: {e:?}.",
);
self.network_service.report_peer(id, rep::BAD_MESSAGE);
self.network_service
@@ -1909,10 +1938,8 @@ where
Ok(proto) => proto,
Err(e) => {
debug!(
target: "sync",
"Failed to decode state response from peer {:?}: {:?}.",
id,
e
target: LOG_TARGET,
"Failed to decode state response from peer {id:?}: {e:?}.",
);
self.network_service.report_peer(id, rep::BAD_MESSAGE);
self.network_service
@@ -1930,7 +1957,7 @@ where
},
},
Ok(Err(e)) => {
debug!(target: "sync", "Request to peer {:?} failed: {:?}.", id, e);
debug!(target: LOG_TARGET, "Request to peer {id:?} failed: {e:?}.");
match e {
RequestFailure::Network(OutboundFailure::Timeout) => {
@@ -1971,9 +1998,8 @@ where
},
Err(oneshot::Canceled) => {
trace!(
target: "sync",
"Request to peer {:?} failed due to oneshot being canceled.",
id,
target: LOG_TARGET,
"Request to peer {id:?} failed due to oneshot being canceled.",
);
self.network_service
.disconnect_peer(id, self.block_announce_protocol_name.clone());
@@ -2058,7 +2084,7 @@ where
}
if self.queue_blocks.len() > MAX_IMPORTING_BLOCKS {
trace!(target: "sync", "Too many blocks in the queue.");
trace!(target: LOG_TARGET, "Too many blocks in the queue.");
return Vec::new()
}
let is_major_syncing = self.status().state.is_major_syncing();
@@ -2093,7 +2119,7 @@ where
queue.len() <= MAJOR_SYNC_BLOCKS.into()
{
trace!(
target: "sync",
target: LOG_TARGET,
"Peer {:?} common block {} too far behind of our best {}. Starting ancestry search.",
id,
peer.common_number,
@@ -2118,7 +2144,7 @@ where
) {
peer.state = PeerSyncState::DownloadingNew(range.start);
trace!(
target: "sync",
target: LOG_TARGET,
"New block request for {}, (best:{}, common:{}) {:?}",
id,
peer.best_number,
@@ -2141,7 +2167,7 @@ where
},
max_blocks_per_request,
) {
trace!(target: "sync", "Downloading fork {:?} from {}", hash, id);
trace!(target: LOG_TARGET, "Downloading fork {hash:?} from {id}");
peer.state = PeerSyncState::DownloadingStale(hash);
Some((id, req))
} else if let Some((range, req)) = gap_sync.as_mut().and_then(|sync| {
@@ -2157,7 +2183,7 @@ where
}) {
peer.state = PeerSyncState::DownloadingGap(range.start);
trace!(
target: "sync",
target: LOG_TARGET,
"New gap block request for {}, (best:{}, common:{}) {:?}",
id,
peer.best_number,
@@ -2192,7 +2218,7 @@ where
if peer.state.is_available() && peer.common_number >= sync.target_block_num() {
peer.state = PeerSyncState::DownloadingState;
let request = sync.next_request();
trace!(target: "sync", "New StateRequest for {}: {:?}", id, request);
trace!(target: LOG_TARGET, "New StateRequest for {}: {:?}", id, request);
self.allowed_requests.clear();
return Some((*id, OpaqueStateRequest(Box::new(request))))
}
@@ -2207,7 +2233,7 @@ where
{
for (id, peer) in self.peers.iter_mut() {
if peer.state.is_available() && peer.best_number >= target {
trace!(target: "sync", "New StateRequest for {}: {:?}", id, request);
trace!(target: LOG_TARGET, "New StateRequest for {id}: {request:?}");
peer.state = PeerSyncState::DownloadingState;
self.allowed_requests.clear();
return Some((*id, OpaqueStateRequest(Box::new(request))))
@@ -2237,7 +2263,7 @@ where
// Find a random peer that is synced as much as peer majority.
for (id, peer) in self.peers.iter_mut() {
if peer.state.is_available() && peer.best_number >= median {
trace!(target: "sync", "New WarpProofRequest for {}", id);
trace!(target: LOG_TARGET, "New WarpProofRequest for {id}");
peer.state = PeerSyncState::DownloadingWarpProof;
self.allowed_requests.clear();
return Some((*id, request))
@@ -2256,7 +2282,7 @@ where
) -> Result<OnStateData<B>, BadPeer> {
let response: Box<StateResponse> = response.0.downcast().map_err(|_error| {
error!(
target: "sync",
target: LOG_TARGET,
"Failed to downcast opaque state response, this is an implementation bug."
);
@@ -2271,7 +2297,7 @@ where
}
let import_result = if let Some(sync) = &mut self.state_sync {
debug!(
target: "sync",
target: LOG_TARGET,
"Importing state data from {} with {} keys, {} proof nodes.",
who,
response.entries.len(),
@@ -2280,7 +2306,7 @@ where
sync.import(*response)
} else if let Some(sync) = &mut self.warp_sync {
debug!(
target: "sync",
target: LOG_TARGET,
"Importing state data from {} with {} keys, {} proof nodes.",
who,
response.entries.len(),
@@ -2288,7 +2314,7 @@ where
);
sync.import_state(*response)
} else {
debug!(target: "sync", "Ignored obsolete state response from {}", who);
debug!(target: LOG_TARGET, "Ignored obsolete state response from {who}");
return Err(BadPeer(*who, rep::NOT_REQUESTED))
};
@@ -2307,12 +2333,12 @@ where
skip_execution: self.skip_execution(),
state: Some(state),
};
debug!(target: "sync", "State download is complete. Import is queued");
debug!(target: LOG_TARGET, "State download is complete. Import is queued");
Ok(OnStateData::Import(origin, block))
},
state::ImportResult::Continue => Ok(OnStateData::Continue),
state::ImportResult::BadResponse => {
debug!(target: "sync", "Bad state data received from {}", who);
debug!(target: LOG_TARGET, "Bad state data received from {who}");
Err(BadPeer(*who, rep::BAD_BLOCK))
},
}
@@ -2327,21 +2353,21 @@ where
}
let import_result = if let Some(sync) = &mut self.warp_sync {
debug!(
target: "sync",
target: LOG_TARGET,
"Importing warp proof data from {}, {} bytes.",
who,
response.0.len(),
);
sync.import_warp_proof(response)
} else {
debug!(target: "sync", "Ignored obsolete warp sync response from {}", who);
debug!(target: LOG_TARGET, "Ignored obsolete warp sync response from {who}");
return Err(BadPeer(*who, rep::NOT_REQUESTED))
};
match import_result {
WarpProofImportResult::Success => Ok(()),
WarpProofImportResult::BadResponse => {
debug!(target: "sync", "Bad proof data received from {}", who);
debug!(target: LOG_TARGET, "Bad proof data received from {who}");
Err(BadPeer(*who, rep::BAD_BLOCK))
},
}
@@ -2379,7 +2405,7 @@ where
count: usize,
results: Vec<(Result<BlockImportStatus<NumberFor<B>>, BlockImportError>, B::Hash)>,
) -> Box<dyn Iterator<Item = Result<(PeerId, BlockRequest<B>), BadPeer>>> {
trace!(target: "sync", "Imported {} of {}", imported, count);
trace!(target: LOG_TARGET, "Imported {imported} of {count}");
let mut output = Vec::new();
@@ -2406,7 +2432,7 @@ where
Ok(BlockImportStatus::ImportedUnknown(number, aux, who)) => {
if aux.clear_justification_requests {
trace!(
target: "sync",
target: LOG_TARGET,
"Block imported clears all pending justification requests {number}: {hash:?}",
);
self.clear_justification_requests();
@@ -2414,7 +2440,7 @@ where
if aux.needs_justification {
trace!(
target: "sync",
target: LOG_TARGET,
"Block imported but requires justification {number}: {hash:?}",
);
self.request_justification(&hash, number);
@@ -2434,7 +2460,7 @@ where
self.state_sync.as_ref().map_or(false, |s| s.target() == hash);
if state_sync_complete {
info!(
target: "sync",
target: LOG_TARGET,
"State sync is complete ({} MiB), restarting block sync.",
self.state_sync.as_ref().map_or(0, |s| s.progress().size / (1024 * 1024)),
);
@@ -2448,7 +2474,7 @@ where
.map_or(false, |s| s.target_block_hash() == Some(hash));
if warp_sync_complete {
info!(
target: "sync",
target: LOG_TARGET,
"Warp sync is complete ({} MiB), restarting block sync.",
self.warp_sync.as_ref().map_or(0, |s| s.progress().total_bytes / (1024 * 1024)),
);
@@ -2460,7 +2486,7 @@ where
self.gap_sync.as_ref().map_or(false, |s| s.target == number);
if gap_sync_complete {
info!(
target: "sync",
target: LOG_TARGET,
"Block history download is complete."
);
self.gap_sync = None;
@@ -2469,7 +2495,7 @@ where
Err(BlockImportError::IncompleteHeader(who)) =>
if let Some(peer) = who {
warn!(
target: "sync",
target: LOG_TARGET,
"💔 Peer sent block with incomplete header to import",
);
output.push(Err(BadPeer(peer, rep::INCOMPLETE_HEADER)));
@@ -2480,7 +2506,7 @@ where
who.map_or_else(|| "".into(), |peer| format!(" received from ({peer})"));
warn!(
target: "sync",
target: LOG_TARGET,
"💔 Verification failed for block {hash:?}{extra_message}: {e:?}",
);
@@ -2493,7 +2519,7 @@ where
Err(BlockImportError::BadBlock(who)) =>
if let Some(peer) = who {
warn!(
target: "sync",
target: LOG_TARGET,
"💔 Block {hash:?} received from peer {peer} has been blacklisted",
);
output.push(Err(BadPeer(peer, rep::BAD_BLOCK)));
@@ -2502,10 +2528,10 @@ where
// This may happen if the chain we were requesting upon has been discarded
// in the meantime because other chain has been finalized.
// Don't mark it as bad as it still may be synced if explicitly requested.
trace!(target: "sync", "Obsolete block {hash:?}");
trace!(target: LOG_TARGET, "Obsolete block {hash:?}");
},
e @ Err(BlockImportError::UnknownParent) | e @ Err(BlockImportError::Other(_)) => {
warn!(target: "sync", "💔 Error importing block {hash:?}: {}", e.unwrap_err());
warn!(target: LOG_TARGET, "💔 Error importing block {hash:?}: {}", e.unwrap_err());
self.state_sync = None;
self.warp_sync = None;
output.extend(self.restart());
@@ -2625,7 +2651,7 @@ fn peer_block_request<B: BlockT>(
return None
} else if peer.common_number < finalized {
trace!(
target: "sync",
target: LOG_TARGET,
"Requesting pre-finalized chain from {:?}, common={}, finalized={}, peer best={}, our best={}",
id, peer.common_number, finalized, peer.best_number, best_num,
);
@@ -2704,11 +2730,21 @@ fn fork_sync_request<B: BlockT>(
) -> Option<(B::Hash, BlockRequest<B>)> {
targets.retain(|hash, r| {
if r.number <= finalized {
trace!(target: "sync", "Removed expired fork sync request {:?} (#{})", hash, r.number);
trace!(
target: LOG_TARGET,
"Removed expired fork sync request {:?} (#{})",
hash,
r.number,
);
return false
}
if check_block(hash) != BlockStatus::Unknown {
trace!(target: "sync", "Removed obsolete fork sync request {:?} (#{})", hash, r.number);
trace!(
target: LOG_TARGET,
"Removed obsolete fork sync request {:?} (#{})",
hash,
r.number,
);
return false
}
true
@@ -2729,7 +2765,10 @@ fn fork_sync_request<B: BlockT>(
// request only single block
1
};
trace!(target: "sync", "Downloading requested fork {:?} from {}, {} blocks", hash, id, count);
trace!(
target: LOG_TARGET,
"Downloading requested fork {hash:?} from {id}, {count} blocks",
);
return Some((
*hash,
BlockRequest::<B> {
@@ -2741,7 +2780,7 @@ fn fork_sync_request<B: BlockT>(
},
))
} else {
trace!(target: "sync", "Fork too far in the future: {:?} (#{})", hash, r.number);
trace!(target: LOG_TARGET, "Fork too far in the future: {:?} (#{})", hash, r.number);
}
}
None
@@ -2778,7 +2817,7 @@ fn validate_blocks<Block: BlockT>(
if let Some(request) = request {
if Some(blocks.len() as _) > request.max {
debug!(
target: "sync",
target: LOG_TARGET,
"Received more blocks than requested from {}. Expected in maximum {:?}, got {}.",
who,
request.max,
@@ -2799,7 +2838,7 @@ fn validate_blocks<Block: BlockT>(
if !expected_block {
debug!(
target: "sync",
target: LOG_TARGET,
"Received block that was not requested. Requested {:?}, got {:?}.",
request.from,
block_header,
@@ -2812,9 +2851,8 @@ fn validate_blocks<Block: BlockT>(
blocks.iter().any(|b| b.header.is_none())
{
trace!(
target: "sync",
"Missing requested header for a block in response from {}.",
who,
target: LOG_TARGET,
"Missing requested header for a block in response from {who}.",
);
return Err(BadPeer(*who, rep::BAD_RESPONSE))
@@ -2823,9 +2861,8 @@ fn validate_blocks<Block: BlockT>(
if request.fields.contains(BlockAttributes::BODY) && blocks.iter().any(|b| b.body.is_none())
{
trace!(
target: "sync",
"Missing requested body for a block in response from {}.",
who,
target: LOG_TARGET,
"Missing requested body for a block in response from {who}.",
);
return Err(BadPeer(*who, rep::BAD_RESPONSE))
@@ -2837,7 +2874,7 @@ fn validate_blocks<Block: BlockT>(
let hash = header.hash();
if hash != b.hash {
debug!(
target:"sync",
target:LOG_TARGET,
"Bad header received from {}. Expected hash {:?}, got {:?}",
who,
b.hash,
@@ -2854,7 +2891,7 @@ fn validate_blocks<Block: BlockT>(
);
if expected != got {
debug!(
target:"sync",
target:LOG_TARGET,
"Bad extrinsic root for a block {} received from {}. Expected {:?}, got {:?}",
b.hash,
who,
@@ -3092,7 +3129,7 @@ mod test {
) -> BlockRequest<Block> {
let requests = sync.block_requests();
log::trace!(target: "sync", "Requests: {:?}", requests);
log::trace!(target: LOG_TARGET, "Requests: {requests:?}");
assert_eq!(1, requests.len());
assert_eq!(*peer, requests[0].0);
@@ -3469,7 +3506,7 @@ mod test {
break
};
log::trace!(target: "sync", "Request: {:?}", request);
log::trace!(target: LOG_TARGET, "Request: {request:?}");
}
// Now request and import the fork.
@@ -3611,7 +3648,7 @@ mod test {
break
};
log::trace!(target: "sync", "Request: {:?}", request);
log::trace!(target: LOG_TARGET, "Request: {request:?}");
}
// Now request and import the fork.
+61 -34
View File
@@ -19,36 +19,75 @@
//! Warp sync support.
use crate::{
oneshot,
schema::v1::{StateRequest, StateResponse},
state::{ImportResult, StateSync},
};
use futures::FutureExt;
use futures::channel::oneshot;
use log::error;
use sc_client_api::ProofProvider;
use sc_network_common::sync::{
message::{BlockAttributes, BlockData, BlockRequest, Direction, FromBlock},
warp::{
EncodedProof, VerificationResult, WarpProofRequest, WarpSyncParams, WarpSyncPhase,
WarpSyncProgress, WarpSyncProvider,
EncodedProof, VerificationResult, WarpProofRequest, WarpSyncPhase, WarpSyncProgress,
WarpSyncProvider,
},
};
use sp_blockchain::HeaderBackend;
use sp_consensus_grandpa::{AuthorityList, SetId};
use sp_runtime::traits::{Block as BlockT, Header, NumberFor, Zero};
use std::{sync::Arc, task::Poll};
use std::sync::Arc;
/// Log target for this file.
const LOG_TARGET: &'static str = "sync";
/// The different types of warp syncing, passed to `build_network`.
pub enum WarpSyncParams<Block: BlockT> {
/// Standard warp sync for the chain.
WithProvider(Arc<dyn WarpSyncProvider<Block>>),
/// Skip downloading proofs and wait for a header of the state that should be downloaded.
///
/// It is expected that the header provider ensures that the header is trusted.
WaitForTarget(oneshot::Receiver<<Block as BlockT>::Header>),
}
/// Warp sync configuration as accepted by [`WarpSync`].
pub enum WarpSyncConfig<Block: BlockT> {
/// Standard warp sync for the chain.
WithProvider(Arc<dyn WarpSyncProvider<Block>>),
/// Skip downloading proofs and wait for a header of the state that should be downloaded.
///
/// It is expected that the header provider ensures that the header is trusted.
WaitForTarget,
}
impl<Block: BlockT> WarpSyncParams<Block> {
/// Split `WarpSyncParams` into `WarpSyncConfig` and warp sync target block header receiver.
pub fn split(
self,
) -> (WarpSyncConfig<Block>, Option<oneshot::Receiver<<Block as BlockT>::Header>>) {
match self {
WarpSyncParams::WithProvider(provider) =>
(WarpSyncConfig::WithProvider(provider), None),
WarpSyncParams::WaitForTarget(rx) => (WarpSyncConfig::WaitForTarget, Some(rx)),
}
}
}
/// Warp sync phase.
enum Phase<B: BlockT, Client> {
/// Downloading warp proofs.
WarpProof {
set_id: SetId,
authorities: AuthorityList,
last_hash: B::Hash,
warp_sync_provider: Arc<dyn WarpSyncProvider<B>>,
},
PendingTargetBlock {
target_block: Option<oneshot::Receiver<B::Header>>,
},
/// Waiting for target block to be set externally if we skip warp proofs downloading,
/// and start straight from the target block (used by parachains warp sync).
PendingTargetBlock,
/// Downloading target block.
TargetBlock(B::Header),
/// Downloading state.
State(StateSync<B, Client>),
}
@@ -83,10 +122,10 @@ where
/// Create a new instance. When passing a warp sync provider we will be checking for proof and
/// authorities. Alternatively we can pass a target block when we want to skip downloading
/// proofs, in this case we will continue polling until the target block is known.
pub fn new(client: Arc<Client>, warp_sync_params: WarpSyncParams<B>) -> Self {
pub fn new(client: Arc<Client>, warp_sync_config: WarpSyncConfig<B>) -> Self {
let last_hash = client.hash(Zero::zero()).unwrap().expect("Genesis header always exists");
match warp_sync_params {
WarpSyncParams::WithProvider(warp_sync_provider) => {
match warp_sync_config {
WarpSyncConfig::WithProvider(warp_sync_provider) => {
let phase = Phase::WarpProof {
set_id: 0,
authorities: warp_sync_provider.current_authorities(),
@@ -95,35 +134,23 @@ where
};
Self { client, phase, total_proof_bytes: 0 }
},
WarpSyncParams::WaitForTarget(block) => Self {
client,
phase: Phase::PendingTargetBlock { target_block: Some(block) },
total_proof_bytes: 0,
},
WarpSyncConfig::WaitForTarget =>
Self { client, phase: Phase::PendingTargetBlock, total_proof_bytes: 0 },
}
}
/// Poll to make progress.
///
/// This only makes progress when `phase = Phase::PendingTargetBlock` and the pending block was
/// sent.
pub fn poll(&mut self, cx: &mut std::task::Context) {
let new_phase = if let Phase::PendingTargetBlock { target_block: Some(target_block) } =
&mut self.phase
{
match target_block.poll_unpin(cx) {
Poll::Ready(Ok(target)) => Phase::TargetBlock(target),
Poll::Ready(Err(e)) => {
error!(target: "sync", "Failed to get target block. Error: {:?}",e);
Phase::PendingTargetBlock { target_block: None }
},
_ => return,
}
} else {
/// Set target block externally in case we skip warp proof downloading.
pub fn set_target_block(&mut self, header: B::Header) {
let Phase::PendingTargetBlock = self.phase else {
error!(
target: LOG_TARGET,
"Attempt to set warp sync target block in invalid phase.",
);
debug_assert!(false);
return
};
self.phase = new_phase;
self.phase = Phase::TargetBlock(header);
}
/// Validate and import a state response.