Import target block body during warp sync (#12300)

* Receive and import target block body

* Request target block

* minor: wording

* Check for block body in the test

* Import target block justifications

* Fix: do not fail block validation if no justifications received

* Fix: import target blocks without justifications

Co-authored-by: arkpar <arkady.paronyan@gmail.com>
This commit is contained in:
Dmitry Markin
2022-09-20 18:05:44 +03:00
committed by GitHub
parent 6e424467a2
commit ea1c8bd2db
8 changed files with 211 additions and 31 deletions
+80 -10
View File
@@ -80,6 +80,7 @@ use std::{
pin::Pin,
sync::Arc,
};
use warp::TargetBlockImportResult;
mod extra_requests;
@@ -315,6 +316,8 @@ pub enum PeerSyncState<B: BlockT> {
DownloadingState,
/// Downloading warp proof.
DownloadingWarpProof,
/// Downloading warp sync target block.
DownloadingWarpTargetBlock,
/// Actively downloading block history after warp sync.
DownloadingGap(NumberFor<B>),
}
@@ -659,10 +662,11 @@ where
}
fn block_requests(&mut self) -> Box<dyn Iterator<Item = (&PeerId, BlockRequest<B>)> + '_> {
if self.allowed_requests.is_empty() ||
self.state_sync.is_some() ||
self.mode == SyncMode::Warp
{
if self.mode == SyncMode::Warp {
return Box::new(std::iter::once(self.warp_target_block_request()).flatten())
}
if self.allowed_requests.is_empty() || self.state_sync.is_some() {
return Box::new(std::iter::empty())
}
@@ -824,7 +828,7 @@ where
// Only one pending state request is allowed.
return None
}
if let Some(request) = sync.next_warp_poof_request() {
if let Some(request) = sync.next_warp_proof_request() {
let mut targets: Vec<_> = self.peers.values().map(|p| p.best_number).collect();
if !targets.is_empty() {
targets.sort();
@@ -1031,6 +1035,40 @@ where
Vec::new()
}
},
PeerSyncState::DownloadingWarpTargetBlock => {
peer.state = PeerSyncState::Available;
if let Some(warp_sync) = &mut self.warp_sync {
if blocks.len() == 1 {
validate_blocks::<B>(&blocks, who, Some(request))?;
match warp_sync.import_target_block(
blocks.pop().expect("`blocks` len checked above."),
) {
TargetBlockImportResult::Success =>
return Ok(OnBlockData::Continue),
TargetBlockImportResult::BadResponse =>
return Err(BadPeer(*who, rep::VERIFICATION_FAIL)),
}
} else if blocks.is_empty() {
debug!(target: "sync", "Empty block response from {}", who);
return Err(BadPeer(*who, rep::NO_BLOCK))
} else {
debug!(
target: "sync",
"Too many blocks ({}) in warp target block response from {}",
blocks.len(),
who,
);
return Err(BadPeer(*who, rep::NOT_REQUESTED))
}
} else {
debug!(
target: "sync",
"Logic error: we think we are downloading warp target block from {}, but no warp sync is happening.",
who,
);
return Ok(OnBlockData::Continue)
}
},
PeerSyncState::Available |
PeerSyncState::DownloadingJustification(..) |
PeerSyncState::DownloadingState |
@@ -1112,14 +1150,14 @@ where
};
match import_result {
state::ImportResult::Import(hash, header, state) => {
state::ImportResult::Import(hash, header, state, body, justifications) => {
let origin = BlockOrigin::NetworkInitialSync;
let block = IncomingBlock {
hash,
header: Some(header),
body: None,
body,
indexed_body: None,
justifications: None,
justifications,
origin: None,
allow_missing_state: true,
import_existing: true,
@@ -1399,8 +1437,13 @@ where
number,
hash,
);
self.state_sync =
Some(StateSync::new(self.client.clone(), header, *skip_proofs));
self.state_sync = Some(StateSync::new(
self.client.clone(),
header,
None,
None,
*skip_proofs,
));
self.allowed_requests.set_all();
}
}
@@ -2163,6 +2206,33 @@ where
})
.collect()
}
/// Generate block request for downloading of the target block body during warp sync.
fn warp_target_block_request(&mut self) -> Option<(&PeerId, BlockRequest<B>)> {
if let Some(sync) = &self.warp_sync {
if self.allowed_requests.is_empty() ||
sync.is_complete() ||
self.peers
.iter()
.any(|(_, peer)| peer.state == PeerSyncState::DownloadingWarpTargetBlock)
{
// Only one pending warp target block request is allowed.
return None
}
if let Some((target_number, request)) = sync.next_target_block_request() {
// Find a random peer that has a block with the target number.
for (id, peer) in self.peers.iter_mut() {
if peer.state.is_available() && peer.best_number >= target_number {
trace!(target: "sync", "New warp target block request for {}", id);
peer.state = PeerSyncState::DownloadingWarpTargetBlock;
self.allowed_requests.clear();
return Some((id, request))
}
}
}
}
None
}
}
// This is purely during a backwards compatible transitionary period and should be removed
+21 -6
View File
@@ -26,7 +26,10 @@ use sc_consensus::ImportedState;
use sc_network_common::sync::StateDownloadProgress;
use smallvec::SmallVec;
use sp_core::storage::well_known_keys;
use sp_runtime::traits::{Block as BlockT, Header, NumberFor};
use sp_runtime::{
traits::{Block as BlockT, Header, NumberFor},
Justifications,
};
use std::{collections::HashMap, sync::Arc};
/// State sync state machine. Accumulates partial state data until it
@@ -35,6 +38,8 @@ pub struct StateSync<B: BlockT, Client> {
target_block: B::Hash,
target_header: B::Header,
target_root: B::Hash,
target_body: Option<Vec<B::Extrinsic>>,
target_justifications: Option<Justifications>,
last_key: SmallVec<[Vec<u8>; 2]>,
state: HashMap<Vec<u8>, (Vec<(Vec<u8>, Vec<u8>)>, Vec<Vec<u8>>)>,
complete: bool,
@@ -46,7 +51,7 @@ pub struct StateSync<B: BlockT, Client> {
/// Import state chunk result.
pub enum ImportResult<B: BlockT> {
/// State is complete and ready for import.
Import(B::Hash, B::Header, ImportedState<B>),
Import(B::Hash, B::Header, ImportedState<B>, Option<Vec<B::Extrinsic>>, Option<Justifications>),
/// Continue downloading.
Continue,
/// Bad state chunk.
@@ -59,12 +64,20 @@ where
Client: ProofProvider<B> + Send + Sync + 'static,
{
/// Create a new instance.
pub fn new(client: Arc<Client>, target: B::Header, skip_proof: bool) -> Self {
pub fn new(
client: Arc<Client>,
target_header: B::Header,
target_body: Option<Vec<B::Extrinsic>>,
target_justifications: Option<Justifications>,
skip_proof: bool,
) -> Self {
Self {
client,
target_block: target.hash(),
target_root: *target.state_root(),
target_header: target,
target_block: target_header.hash(),
target_root: *target_header.state_root(),
target_header,
target_body,
target_justifications,
last_key: SmallVec::default(),
state: HashMap::default(),
complete: false,
@@ -213,6 +226,8 @@ where
block: self.target_block,
state: std::mem::take(&mut self.state).into(),
},
self.target_body.clone(),
self.target_justifications.clone(),
)
} else {
ImportResult::Continue
+93 -12
View File
@@ -23,17 +23,21 @@ use crate::{
state::{ImportResult, StateSync},
};
use sc_client_api::ProofProvider;
use sc_network_common::sync::warp::{
EncodedProof, VerificationResult, WarpProofRequest, WarpSyncPhase, WarpSyncProgress,
WarpSyncProvider,
use sc_network_common::sync::{
message::{BlockAttributes, BlockData, BlockRequest, Direction, FromBlock},
warp::{
EncodedProof, VerificationResult, WarpProofRequest, WarpSyncPhase, WarpSyncProgress,
WarpSyncProvider,
},
};
use sp_blockchain::HeaderBackend;
use sp_finality_grandpa::{AuthorityList, SetId};
use sp_runtime::traits::{Block as BlockT, NumberFor, Zero};
use sp_runtime::traits::{Block as BlockT, Header, NumberFor, Zero};
use std::sync::Arc;
enum Phase<B: BlockT, Client> {
WarpProof { set_id: SetId, authorities: AuthorityList, last_hash: B::Hash },
TargetBlock(B::Header),
State(StateSync<B, Client>),
}
@@ -45,6 +49,14 @@ pub enum WarpProofImportResult {
BadResponse,
}
/// Import target block result.
pub enum TargetBlockImportResult {
/// Import was successful.
Success,
/// Invalid block.
BadResponse,
}
/// Warp sync state machine. Accumulates warp proofs and state.
pub struct WarpSync<B: BlockT, Client> {
phase: Phase<B, Client>,
@@ -72,7 +84,7 @@ where
/// Validate and import a state response.
pub fn import_state(&mut self, response: StateResponse) -> ImportResult<B> {
match &mut self.phase {
Phase::WarpProof { .. } => {
Phase::WarpProof { .. } | Phase::TargetBlock(_) => {
log::debug!(target: "sync", "Unexpected state response");
ImportResult::BadResponse
},
@@ -83,7 +95,7 @@ where
/// Validate and import a warp proof response.
pub fn import_warp_proof(&mut self, response: EncodedProof) -> WarpProofImportResult {
match &mut self.phase {
Phase::State(_) => {
Phase::State(_) | Phase::TargetBlock(_) => {
log::debug!(target: "sync", "Unexpected warp proof response");
WarpProofImportResult::BadResponse
},
@@ -104,8 +116,7 @@ where
Ok(VerificationResult::Complete(new_set_id, _, header)) => {
log::debug!(target: "sync", "Verified complete proof, set_id={:?}", new_set_id);
self.total_proof_bytes += response.0.len() as u64;
let state_sync = StateSync::new(self.client.clone(), header, false);
self.phase = Phase::State(state_sync);
self.phase = Phase::TargetBlock(header);
WarpProofImportResult::Success
},
}
@@ -113,35 +124,100 @@ where
}
}
/// Import the target block body.
pub fn import_target_block(&mut self, block: BlockData<B>) -> TargetBlockImportResult {
match &mut self.phase {
Phase::WarpProof { .. } | Phase::State(_) => {
log::debug!(target: "sync", "Unexpected target block response");
TargetBlockImportResult::BadResponse
},
Phase::TargetBlock(header) =>
if let Some(block_header) = &block.header {
if block_header == header {
if block.body.is_some() {
let state_sync = StateSync::new(
self.client.clone(),
header.clone(),
block.body,
block.justifications,
false,
);
self.phase = Phase::State(state_sync);
TargetBlockImportResult::Success
} else {
log::debug!(
target: "sync",
"Importing target block failed: missing body.",
);
TargetBlockImportResult::BadResponse
}
} else {
log::debug!(
target: "sync",
"Importing target block failed: different header.",
);
TargetBlockImportResult::BadResponse
}
} else {
log::debug!(target: "sync", "Importing target block failed: missing header.");
TargetBlockImportResult::BadResponse
},
}
}
/// Produce next state request.
pub fn next_state_request(&self) -> Option<StateRequest> {
match &self.phase {
Phase::WarpProof { .. } => None,
Phase::TargetBlock(_) => None,
Phase::State(sync) => Some(sync.next_request()),
}
}
/// Produce next warp proof request.
pub fn next_warp_poof_request(&self) -> Option<WarpProofRequest<B>> {
pub fn next_warp_proof_request(&self) -> Option<WarpProofRequest<B>> {
match &self.phase {
Phase::State(_) => None,
Phase::WarpProof { last_hash, .. } => Some(WarpProofRequest { begin: *last_hash }),
Phase::TargetBlock(_) => None,
Phase::State(_) => None,
}
}
/// Produce next target block request.
pub fn next_target_block_request(&self) -> Option<(NumberFor<B>, BlockRequest<B>)> {
match &self.phase {
Phase::WarpProof { .. } => None,
Phase::TargetBlock(header) => {
let request = BlockRequest::<B> {
id: 0,
fields: BlockAttributes::HEADER |
BlockAttributes::BODY | BlockAttributes::JUSTIFICATION,
from: FromBlock::Hash(header.hash()),
to: Some(header.hash()),
direction: Direction::Ascending,
max: Some(1),
};
Some((*header.number(), request))
},
Phase::State(_) => None,
}
}
/// Return target block hash if it is known.
pub fn target_block_hash(&self) -> Option<B::Hash> {
match &self.phase {
Phase::State(s) => Some(s.target()),
Phase::WarpProof { .. } => None,
Phase::TargetBlock(_) => None,
Phase::State(s) => Some(s.target()),
}
}
/// Return target block number if it is known.
pub fn target_block_number(&self) -> Option<NumberFor<B>> {
match &self.phase {
Phase::State(s) => Some(s.target_block_num()),
Phase::WarpProof { .. } => None,
Phase::TargetBlock(header) => Some(*header.number()),
Phase::State(s) => Some(s.target_block_num()),
}
}
@@ -149,6 +225,7 @@ where
pub fn is_complete(&self) -> bool {
match &self.phase {
Phase::WarpProof { .. } => false,
Phase::TargetBlock(_) => false,
Phase::State(sync) => sync.is_complete(),
}
}
@@ -160,6 +237,10 @@ where
phase: WarpSyncPhase::DownloadingWarpProofs,
total_bytes: self.total_proof_bytes,
},
Phase::TargetBlock(_) => WarpSyncProgress {
phase: WarpSyncPhase::DownloadingTargetBlock,
total_bytes: self.total_proof_bytes,
},
Phase::State(sync) => WarpSyncProgress {
phase: if self.is_complete() {
WarpSyncPhase::ImportingState