Warp sync part I (#9227)

* Started warp sync

* BABE & GRANDPA recovery

* Warp sync protocol

* Sync warp proofs first

* Added basic documentation

* Apply suggestions from code review

Co-authored-by: André Silva <123550+andresilva@users.noreply.github.com>

* Style changes

* Apply suggestions from code review

Co-authored-by: André Silva <123550+andresilva@users.noreply.github.com>

* fmt

* Apply suggestions from code review

Co-authored-by: André Silva <123550+andresilva@users.noreply.github.com>

* Fixed chage trie pruning wrt missing blocks

* Restore parent finalization

* fmt

* fmt

* Revert pwasm-utils bump

* Change error type & check API version

* Apply suggestions from code review

Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com>

* Build fix

* Fixed target block check

* Formatting

Co-authored-by: André Silva <123550+andresilva@users.noreply.github.com>
Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com>
This commit is contained in:
Arkadiy Paronyan
2021-08-02 10:27:18 +02:00
committed by GitHub
parent fa8c6274ac
commit f56232602f
51 changed files with 1308 additions and 448 deletions
+212 -15
View File
@@ -27,6 +27,7 @@
//! The `ChainSync` struct maintains the state of the block requests. Whenever something happens on
//! the network, or whenever a block has been successfully verified, call the appropriate method in
//! order to update it.
//!
use crate::{
protocol::message::{self, BlockAnnounce, BlockAttributes, BlockRequest, BlockResponse},
@@ -62,10 +63,12 @@ use std::{
pin::Pin,
sync::Arc,
};
use warp::{WarpProofRequest, WarpSync, WarpSyncProvider};
mod blocks;
mod extra_requests;
mod state;
mod warp;
/// Maximum blocks to request in a single packet.
const MAX_BLOCKS_TO_REQUEST: usize = 128;
@@ -101,6 +104,9 @@ const STATE_SYNC_FINALITY_THRESHOLD: u32 = 8;
/// so far behind.
const MAJOR_SYNC_BLOCKS: u8 = 5;
/// Number of peers that need to be connected before warp sync is started.
const MIN_PEERS_TO_START_WARP_SYNC: usize = 3;
mod rep {
use sc_peerset::ReputationChange as Rep;
/// Reputation change when a peer sent us a message that led to a
@@ -217,6 +223,10 @@ pub struct ChainSync<B: BlockT> {
block_announce_validation_per_peer_stats: HashMap<PeerId, usize>,
/// State sync in progress, if any.
state_sync: Option<StateSync<B>>,
/// Warp sync in progress, if any.
warp_sync: Option<WarpSync<B>>,
/// Warp sync provider.
warp_sync_provider: Option<Arc<dyn WarpSyncProvider<B>>>,
/// Enable importing existing blocks. This is used used after the state download to
/// catch up to the latest state while re-importing blocks.
import_existing: bool,
@@ -290,6 +300,8 @@ pub enum PeerSyncState<B: BlockT> {
DownloadingJustification(B::Hash),
/// Downloading state.
DownloadingState,
/// Downloading warp proof.
DownloadingWarpProof,
}
impl<B: BlockT> PeerSyncState<B> {
@@ -316,6 +328,39 @@ pub struct StateDownloadProgress {
pub size: u64,
}
/// Reported warp sync phase.
#[derive(Clone, Eq, PartialEq, Debug)]
pub enum WarpSyncPhase {
/// Waiting for peers to connect.
AwaitingPeers,
/// Downloading and verifying grandpa warp proofs.
DownloadingWarpProofs,
/// Downloading state data.
DownloadingState,
/// Importing state.
ImportingState,
}
impl fmt::Display for WarpSyncPhase {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
WarpSyncPhase::AwaitingPeers => write!(f, "Waiting for peers"),
WarpSyncPhase::DownloadingWarpProofs => write!(f, "Downloading finality proofs"),
WarpSyncPhase::DownloadingState => write!(f, "Downloading state"),
WarpSyncPhase::ImportingState => write!(f, "Importing state"),
}
}
}
/// Reported warp sync progress.
#[derive(Clone, Eq, PartialEq, Debug)]
pub struct WarpSyncProgress {
/// Estimated download percentage.
pub phase: WarpSyncPhase,
/// Total bytes downloaded so far.
pub total_bytes: u64,
}
/// Syncing status and statistics.
#[derive(Clone)]
pub struct Status<B: BlockT> {
@@ -329,6 +374,8 @@ pub struct Status<B: BlockT> {
pub queued_blocks: u32,
/// State sync status in progress, if any.
pub state_sync: Option<StateDownloadProgress>,
/// Warp sync in progress, if any.
pub warp_sync: Option<WarpSyncProgress>,
}
/// A peer did not behave as expected and should be reported.
@@ -373,6 +420,15 @@ pub enum OnStateData<B: BlockT> {
Request(PeerId, StateRequest),
}
/// Result of [`ChainSync::on_warp_sync_data`].
#[derive(Debug)]
pub enum OnWarpSyncData<B: BlockT> {
/// Warp proof request is issued.
WarpProofRequest(PeerId, warp::WarpProofRequest<B>),
/// A new state request needs to be made to the given peer.
StateRequest(PeerId, StateRequest),
}
/// Result of [`ChainSync::poll_block_announce_validation`].
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum PollBlockAnnounceValidation<H> {
@@ -460,6 +516,8 @@ pub enum SyncMode {
Full,
// Sync headers and the last finalied state
LightState { storage_chain_mode: bool, skip_proofs: bool },
// Warp sync mode.
Warp,
}
/// Result of [`ChainSync::has_slot_for_block_announce_validation`].
@@ -479,6 +537,7 @@ impl<B: BlockT> ChainSync<B> {
client: Arc<dyn crate::chain::Client<B>>,
block_announce_validator: Box<dyn BlockAnnounceValidator<B> + Send>,
max_parallel_downloads: u32,
warp_sync_provider: Option<Arc<dyn WarpSyncProvider<B>>>,
) -> Result<Self, ClientError> {
let mut sync = ChainSync {
client,
@@ -497,6 +556,8 @@ impl<B: BlockT> ChainSync<B> {
block_announce_validation: Default::default(),
block_announce_validation_per_peer_stats: Default::default(),
state_sync: None,
warp_sync: None,
warp_sync_provider,
import_existing: false,
};
sync.reset_sync_start_point()?;
@@ -508,7 +569,7 @@ impl<B: BlockT> ChainSync<B> {
SyncMode::Full =>
BlockAttributes::HEADER | BlockAttributes::JUSTIFICATION | BlockAttributes::BODY,
SyncMode::Light => BlockAttributes::HEADER | BlockAttributes::JUSTIFICATION,
SyncMode::LightState { storage_chain_mode: false, .. } =>
SyncMode::LightState { storage_chain_mode: false, .. } | SyncMode::Warp =>
BlockAttributes::HEADER | BlockAttributes::JUSTIFICATION | BlockAttributes::BODY,
SyncMode::LightState { storage_chain_mode: true, .. } =>
BlockAttributes::HEADER |
@@ -522,6 +583,7 @@ impl<B: BlockT> ChainSync<B> {
SyncMode::Full => false,
SyncMode::Light => true,
SyncMode::LightState { .. } => true,
SyncMode::Warp => true,
}
}
@@ -550,12 +612,20 @@ impl<B: BlockT> ChainSync<B> {
SyncState::Idle
};
let warp_sync_progress = match (&self.warp_sync, &self.mode) {
(None, SyncMode::Warp) =>
Some(WarpSyncProgress { phase: WarpSyncPhase::AwaitingPeers, total_bytes: 0 }),
(Some(sync), _) => Some(sync.progress()),
_ => None,
};
Status {
state: sync_state,
best_seen_block: best_seen,
num_peers: self.peers.len() as u32,
queued_blocks: self.queue_blocks.len() as u32,
state_sync: self.state_sync.as_ref().map(|s| s.progress()),
warp_sync: warp_sync_progress,
}
}
@@ -620,6 +690,17 @@ impl<B: BlockT> ChainSync<B> {
return Ok(None)
}
if let SyncMode::Warp = &self.mode {
if self.peers.len() >= MIN_PEERS_TO_START_WARP_SYNC && self.warp_sync.is_none()
{
log::debug!(target: "sync", "Starting warp state sync.");
if let Some(provider) = &self.warp_sync_provider {
self.warp_sync =
Some(WarpSync::new(self.client.clone(), provider.clone()));
}
}
}
// If we are at genesis, just start downloading.
let (state, req) = if self.best_queued_number.is_zero() {
debug!(
@@ -792,7 +873,8 @@ impl<B: BlockT> ChainSync<B> {
/// Get an iterator over all block requests of all peers.
pub fn block_requests(&mut self) -> impl Iterator<Item = (&PeerId, BlockRequest<B>)> + '_ {
if self.pending_requests.is_empty() || self.state_sync.is_some() {
if self.pending_requests.is_empty() || self.state_sync.is_some() || self.warp_sync.is_some()
{
return Either::Left(std::iter::empty())
}
if self.queue_blocks.len() > MAX_IMPORTING_BLOCKS {
@@ -876,16 +958,16 @@ impl<B: BlockT> ChainSync<B> {
Either::Right(iter)
}
/// Get a state request, if any
/// Get a state request, if any.
pub fn state_request(&mut self) -> Option<(PeerId, StateRequest)> {
if self.peers.iter().any(|(_, peer)| peer.state == PeerSyncState::DownloadingState) {
// Only one pending state request is allowed.
return None
}
if let Some(sync) = &self.state_sync {
if sync.is_complete() {
return None
}
if self.peers.iter().any(|(_, peer)| peer.state == PeerSyncState::DownloadingState) {
// Only one pending state request is allowed.
return None
}
for (id, peer) in self.peers.iter_mut() {
if peer.state.is_available() && peer.common_number >= sync.target_block_num() {
trace!(target: "sync", "New StateRequest for {}", id);
@@ -895,6 +977,55 @@ impl<B: BlockT> ChainSync<B> {
}
}
}
if let Some(sync) = &self.warp_sync {
if sync.is_complete() {
return None
}
if let (Some(request), Some(target)) =
(sync.next_state_request(), sync.target_block_number())
{
for (id, peer) in self.peers.iter_mut() {
if peer.state.is_available() && peer.best_number >= target {
trace!(target: "sync", "New StateRequest for {}", id);
peer.state = PeerSyncState::DownloadingState;
return Some((id.clone(), request))
}
}
}
}
None
}
/// Get a warp sync request, if any.
pub fn warp_sync_request(&mut self) -> Option<(PeerId, WarpProofRequest<B>)> {
if self
.peers
.iter()
.any(|(_, peer)| peer.state == PeerSyncState::DownloadingWarpProof)
{
// Only one pending state request is allowed.
return None
}
if let Some(sync) = &self.warp_sync {
if sync.is_complete() {
return None
}
if let Some(request) = sync.next_warp_poof_request() {
let mut targets: Vec<_> = self.peers.values().map(|p| p.best_number).collect();
if !targets.is_empty() {
targets.sort();
let median = targets[targets.len() / 2];
// Find a random peer that is synced as much as peer majority.
for (id, peer) in self.peers.iter_mut() {
if peer.state.is_available() && peer.best_number >= median {
trace!(target: "sync", "New WarpProofRequest for {}", id);
peer.state = PeerSyncState::DownloadingWarpProof;
return Some((id.clone(), request))
}
}
}
}
}
None
}
@@ -1055,7 +1186,8 @@ impl<B: BlockT> ChainSync<B> {
},
PeerSyncState::Available |
PeerSyncState::DownloadingJustification(..) |
PeerSyncState::DownloadingState => Vec::new(),
PeerSyncState::DownloadingState |
PeerSyncState::DownloadingWarpProof => Vec::new(),
}
} else {
// When request.is_none() this is a block announcement. Just accept blocks.
@@ -1105,6 +1237,15 @@ impl<B: BlockT> ChainSync<B> {
response.proof.len(),
);
sync.import(response)
} else if let Some(sync) = &mut self.warp_sync {
debug!(
target: "sync",
"Importing state data from {} with {} keys, {} proof nodes.",
who,
response.entries.len(),
response.proof.len(),
);
sync.import_state(response)
} else {
debug!(target: "sync", "Ignored obsolete state response from {}", who);
return Err(BadPeer(who.clone(), rep::NOT_REQUESTED))
@@ -1112,12 +1253,7 @@ impl<B: BlockT> ChainSync<B> {
match import_result {
state::ImportResult::Import(hash, header, state) => {
let origin = if self.status().state != SyncState::Downloading {
BlockOrigin::NetworkBroadcast
} else {
BlockOrigin::NetworkInitialSync
};
let origin = BlockOrigin::NetworkInitialSync;
let block = IncomingBlock {
hash,
header: Some(header),
@@ -1142,6 +1278,39 @@ impl<B: BlockT> ChainSync<B> {
}
}
/// Handle a response from the remote to a warp proof request that we made.
///
/// Returns next request.
pub fn on_warp_sync_data(
&mut self,
who: &PeerId,
response: warp::EncodedProof,
) -> Result<OnWarpSyncData<B>, BadPeer> {
let import_result = if let Some(sync) = &mut self.warp_sync {
debug!(
target: "sync",
"Importing warp proof data from {}, {} bytes.",
who,
response.0.len(),
);
sync.import_warp_proof(response)
} else {
debug!(target: "sync", "Ignored obsolete warp sync response from {}", who);
return Err(BadPeer(who.clone(), rep::NOT_REQUESTED))
};
match import_result {
warp::WarpProofImportResult::StateRequest(request) =>
Ok(OnWarpSyncData::StateRequest(who.clone(), request)),
warp::WarpProofImportResult::WarpProofRequest(request) =>
Ok(OnWarpSyncData::WarpProofRequest(who.clone(), request)),
warp::WarpProofImportResult::BadResponse => {
debug!(target: "sync", "Bad proof data received from {}", who);
Err(BadPeer(who.clone(), rep::BAD_BLOCK))
},
}
}
fn validate_and_queue_blocks(
&mut self,
mut new_blocks: Vec<IncomingBlock<B>>,
@@ -1308,6 +1477,20 @@ impl<B: BlockT> ChainSync<B> {
self.mode = SyncMode::Full;
output.extend(self.restart());
}
let warp_sync_complete = self
.warp_sync
.as_ref()
.map_or(false, |s| s.target_block_hash() == Some(hash));
if warp_sync_complete {
info!(
target: "sync",
"Warp sync is complete ({} MiB), restarting block sync.",
self.warp_sync.as_ref().map_or(0, |s| s.progress().total_bytes / (1024 * 1024)),
);
self.warp_sync = None;
self.mode = SyncMode::Full;
output.extend(self.restart());
}
},
Err(BlockImportError::IncompleteHeader(who)) =>
if let Some(peer) = who {
@@ -1349,6 +1532,7 @@ impl<B: BlockT> ChainSync<B> {
e @ Err(BlockImportError::UnknownParent) | e @ Err(BlockImportError::Other(_)) => {
warn!(target: "sync", "💔 Error importing block {:?}: {:?}", hash, e);
self.state_sync = None;
self.warp_sync = None;
output.extend(self.restart());
},
Err(BlockImportError::Cancelled) => {},
@@ -1828,6 +2012,13 @@ impl<B: BlockT> ChainSync<B> {
);
self.mode = SyncMode::Full;
}
if matches!(self.mode, SyncMode::Warp) && info.finalized_state.is_some() {
log::warn!(
target: "sync",
"Can't use warp sync mode with a partially synced database. Reverting to full sync mode."
);
self.mode = SyncMode::Full;
}
self.import_existing = false;
self.best_queued_hash = info.best_hash;
self.best_queued_number = info.best_number;
@@ -2253,7 +2444,8 @@ mod test {
let peer_id = PeerId::random();
let mut sync =
ChainSync::new(SyncMode::Full, client.clone(), block_announce_validator, 1).unwrap();
ChainSync::new(SyncMode::Full, client.clone(), block_announce_validator, 1, None)
.unwrap();
let (a1_hash, a1_number) = {
let a1 = client.new_block(Default::default()).unwrap().build().unwrap().block;
@@ -2307,6 +2499,7 @@ mod test {
client.clone(),
Box::new(DefaultBlockAnnounceValidator),
1,
None,
)
.unwrap();
@@ -2470,6 +2663,7 @@ mod test {
client.clone(),
Box::new(DefaultBlockAnnounceValidator),
5,
None,
)
.unwrap();
@@ -2584,6 +2778,7 @@ mod test {
client.clone(),
Box::new(DefaultBlockAnnounceValidator),
5,
None,
)
.unwrap();
@@ -2707,6 +2902,7 @@ mod test {
client.clone(),
Box::new(DefaultBlockAnnounceValidator),
5,
None,
)
.unwrap();
@@ -2814,6 +3010,7 @@ mod test {
client.clone(),
Box::new(DefaultBlockAnnounceValidator),
1,
None,
)
.unwrap();