mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-04-26 22:47:56 +00:00
Update common block in sync after importing blocks of a peer, please read UPDATE (#7733)
* Update common block in sync after importing blocks of a peer This updates the sync code to update the common block of a peer, after we have imported blocks from this peer. This fixes a bug for when we are connected to one or more nodes that are doing a full sync as our node. Nodes in full sync will not announce new blocks, as we don't send import notifications on full sync. The problem as now that we were connected to some peer that reported some low number as its best and we tried to sync these blocks. But, as we did not update the common block of this peer, we would sync these blocks over and over again. Being captured in some time warp. The solution to this problem is that we increase the common number as we import blocks from this peer. * Test * Test name.. * Fix test * Cleanup some code and write some new regression test * Implement the ancestor search * Check that the common number is smaller than the last finalized block * Update client/network/src/protocol/sync.rs Co-authored-by: André Silva <123550+andresilva@users.noreply.github.com> * Update client/network/src/protocol/sync.rs Co-authored-by: André Silva <123550+andresilva@users.noreply.github.com> * Update client/network/src/protocol/sync.rs Co-authored-by: André Silva <123550+andresilva@users.noreply.github.com> * Change the way we build the status messages * Start some new test... * Finish test * Rename test * Update client/network/src/protocol.rs Co-authored-by: André Silva <123550+andresilva@users.noreply.github.com> Co-authored-by: André Silva <123550+andresilva@users.noreply.github.com>
This commit is contained in:
@@ -67,6 +67,10 @@ const MAX_IMPORTING_BLOCKS: usize = 2048;
|
||||
/// Maximum blocks to download ahead of any gap.
|
||||
const MAX_DOWNLOAD_AHEAD: u32 = 2048;
|
||||
|
||||
/// Maximum blocks to look backwards. The gap is the difference between the highest block and the
|
||||
/// common block of a node.
|
||||
const MAX_BLOCKS_TO_LOOK_BACKWARDS: u32 = MAX_DOWNLOAD_AHEAD / 2;
|
||||
|
||||
/// Maximum number of concurrent block announce validations.
|
||||
///
|
||||
/// If the queue reaches the maximum, we drop any new block
|
||||
@@ -211,6 +215,8 @@ pub struct ChainSync<B: BlockT> {
|
||||
/// All the data we have about a Peer that we are trying to sync with
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct PeerSync<B: BlockT> {
|
||||
/// Peer id of this peer.
|
||||
pub peer_id: PeerId,
|
||||
/// The common number is the block number that is a common point of
|
||||
/// ancestry for both our chains (as far as we know).
|
||||
pub common_number: NumberFor<B>,
|
||||
@@ -223,6 +229,22 @@ pub struct PeerSync<B: BlockT> {
|
||||
pub state: PeerSyncState<B>,
|
||||
}
|
||||
|
||||
impl<B: BlockT> PeerSync<B> {
|
||||
/// Update the `common_number` iff `new_common > common_number`.
|
||||
fn update_common_number(&mut self, new_common: NumberFor<B>) {
|
||||
if self.common_number < new_common {
|
||||
trace!(
|
||||
target: "sync",
|
||||
"Updating peer {} common number from={} => to={}.",
|
||||
self.peer_id,
|
||||
self.common_number,
|
||||
new_common,
|
||||
);
|
||||
self.common_number = new_common;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// The sync status of a peer we are trying to sync with
|
||||
#[derive(Debug)]
|
||||
pub struct PeerInfo<B: BlockT> {
|
||||
@@ -264,11 +286,7 @@ pub enum PeerSyncState<B: BlockT> {
|
||||
|
||||
impl<B: BlockT> PeerSyncState<B> {
|
||||
pub fn is_available(&self) -> bool {
|
||||
if let PeerSyncState::Available = self {
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
matches!(self, Self::Available)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -315,6 +333,18 @@ pub enum OnBlockData<B: BlockT> {
|
||||
Request(PeerId, BlockRequest<B>)
|
||||
}
|
||||
|
||||
impl<B: BlockT> OnBlockData<B> {
|
||||
/// Returns `self` as request.
|
||||
#[cfg(test)]
|
||||
fn into_request(self) -> Option<(PeerId, BlockRequest<B>)> {
|
||||
if let Self::Request(peer, req) = self {
|
||||
Some((peer, req))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Result of [`ChainSync::poll_block_announce_validation`].
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub enum PollBlockAnnounceValidation<H> {
|
||||
@@ -512,7 +542,8 @@ impl<B: BlockT> ChainSync<B> {
|
||||
self.best_queued_hash,
|
||||
self.best_queued_number
|
||||
);
|
||||
self.peers.insert(who, PeerSync {
|
||||
self.peers.insert(who.clone(), PeerSync {
|
||||
peer_id: who,
|
||||
common_number: self.best_queued_number,
|
||||
best_hash,
|
||||
best_number,
|
||||
@@ -522,43 +553,55 @@ impl<B: BlockT> ChainSync<B> {
|
||||
}
|
||||
|
||||
// If we are at genesis, just start downloading.
|
||||
if self.best_queued_number.is_zero() {
|
||||
debug!(target:"sync", "New peer with best hash {} ({}).", best_hash, best_number);
|
||||
self.peers.insert(who.clone(), PeerSync {
|
||||
common_number: Zero::zero(),
|
||||
let (state, req) = if self.best_queued_number.is_zero() {
|
||||
debug!(
|
||||
target:"sync",
|
||||
"New peer with best hash {} ({}).",
|
||||
best_hash,
|
||||
best_number,
|
||||
state: PeerSyncState::Available,
|
||||
});
|
||||
self.pending_requests.add(&who);
|
||||
return Ok(None)
|
||||
}
|
||||
);
|
||||
|
||||
let common_best = std::cmp::min(self.best_queued_number, best_number);
|
||||
(PeerSyncState::Available, None)
|
||||
} else {
|
||||
let common_best = std::cmp::min(self.best_queued_number, best_number);
|
||||
|
||||
debug!(target:"sync",
|
||||
"New peer with unknown best hash {} ({}), searching for common ancestor.",
|
||||
best_hash,
|
||||
best_number
|
||||
);
|
||||
debug!(
|
||||
target:"sync",
|
||||
"New peer with unknown best hash {} ({}), searching for common ancestor.",
|
||||
best_hash,
|
||||
best_number
|
||||
);
|
||||
|
||||
(
|
||||
PeerSyncState::AncestorSearch {
|
||||
current: common_best,
|
||||
start: self.best_queued_number,
|
||||
state: AncestorSearchState::ExponentialBackoff(One::one()),
|
||||
},
|
||||
Some(ancestry_request::<B>(common_best))
|
||||
)
|
||||
};
|
||||
|
||||
self.pending_requests.add(&who);
|
||||
self.peers.insert(who, PeerSync {
|
||||
self.peers.insert(who.clone(), PeerSync {
|
||||
peer_id: who,
|
||||
common_number: Zero::zero(),
|
||||
best_hash,
|
||||
best_number,
|
||||
state: PeerSyncState::AncestorSearch {
|
||||
current: common_best,
|
||||
start: self.best_queued_number,
|
||||
state: AncestorSearchState::ExponentialBackoff(One::one()),
|
||||
},
|
||||
state,
|
||||
});
|
||||
|
||||
Ok(Some(ancestry_request::<B>(common_best)))
|
||||
Ok(req)
|
||||
}
|
||||
Ok(BlockStatus::Queued) | Ok(BlockStatus::InChainWithState) | Ok(BlockStatus::InChainPruned) => {
|
||||
debug!(target:"sync", "New peer with known best hash {} ({}).", best_hash, best_number);
|
||||
debug!(
|
||||
target: "sync",
|
||||
"New peer with known best hash {} ({}).",
|
||||
best_hash,
|
||||
best_number,
|
||||
);
|
||||
self.peers.insert(who.clone(), PeerSync {
|
||||
peer_id: who.clone(),
|
||||
common_number: best_number,
|
||||
best_hash,
|
||||
best_number,
|
||||
@@ -687,7 +730,21 @@ impl<B: BlockT> ChainSync<B> {
|
||||
return None
|
||||
}
|
||||
|
||||
if let Some((range, req)) = peer_block_request(
|
||||
// If our best queued is more than `MAX_BLOCKS_TO_LOOK_BACKWARDS` blocks away from the
|
||||
// common number, the peer best number is higher than our best queued and the common
|
||||
// number is smaller than the last finalized block number, we should do an ancestor
|
||||
// search to find a better common block.
|
||||
if best_queued.saturating_sub(peer.common_number) > MAX_BLOCKS_TO_LOOK_BACKWARDS.into()
|
||||
&& best_queued < peer.best_number && peer.common_number < last_finalized
|
||||
{
|
||||
let current = std::cmp::min(peer.best_number, best_queued);
|
||||
peer.state = PeerSyncState::AncestorSearch {
|
||||
current,
|
||||
start: best_queued,
|
||||
state: AncestorSearchState::ExponentialBackoff(One::one()),
|
||||
};
|
||||
Some((id, ancestry_request::<B>(current)))
|
||||
} else if let Some((range, req)) = peer_block_request(
|
||||
id,
|
||||
peer,
|
||||
blocks,
|
||||
@@ -795,15 +852,29 @@ impl<B: BlockT> ChainSync<B> {
|
||||
PeerSyncState::AncestorSearch { current, start, state } => {
|
||||
let matching_hash = match (blocks.get(0), self.client.hash(*current)) {
|
||||
(Some(block), Ok(maybe_our_block_hash)) => {
|
||||
trace!(target: "sync", "Got ancestry block #{} ({}) from peer {}", current, block.hash, who);
|
||||
trace!(
|
||||
target: "sync",
|
||||
"Got ancestry block #{} ({}) from peer {}",
|
||||
current,
|
||||
block.hash,
|
||||
who,
|
||||
);
|
||||
maybe_our_block_hash.filter(|x| x == &block.hash)
|
||||
},
|
||||
(None, _) => {
|
||||
debug!(target: "sync", "Invalid response when searching for ancestor from {}", who);
|
||||
debug!(
|
||||
target: "sync",
|
||||
"Invalid response when searching for ancestor from {}",
|
||||
who,
|
||||
);
|
||||
return Err(BadPeer(who.clone(), rep::UNKNOWN_ANCESTOR))
|
||||
},
|
||||
(_, Err(e)) => {
|
||||
info!("❌ Error answering legitimate blockchain query: {:?}", e);
|
||||
info!(
|
||||
target: "sync",
|
||||
"❌ Error answering legitimate blockchain query: {:?}",
|
||||
e,
|
||||
);
|
||||
return Err(BadPeer(who.clone(), rep::BLOCKCHAIN_READ_ERROR))
|
||||
}
|
||||
};
|
||||
@@ -822,17 +893,23 @@ impl<B: BlockT> ChainSync<B> {
|
||||
trace!(target:"sync", "Ancestry search: genesis mismatch for peer {}", who);
|
||||
return Err(BadPeer(who.clone(), rep::GENESIS_MISMATCH))
|
||||
}
|
||||
if let Some((next_state, next_num)) = handle_ancestor_search_state(state, *current, matching_hash.is_some()) {
|
||||
if let Some((next_state, next_num)) =
|
||||
handle_ancestor_search_state(state, *current, matching_hash.is_some())
|
||||
{
|
||||
peer.state = PeerSyncState::AncestorSearch {
|
||||
current: next_num,
|
||||
start: *start,
|
||||
state: next_state,
|
||||
};
|
||||
return Ok(OnBlockData::Request(who.clone(), ancestry_request::<B>(next_num)))
|
||||
return Ok(
|
||||
OnBlockData::Request(who.clone(), ancestry_request::<B>(next_num))
|
||||
)
|
||||
} else {
|
||||
// Ancestry search is complete. Check if peer is on a stale fork unknown to us and
|
||||
// add it to sync targets if necessary.
|
||||
trace!(target: "sync", "Ancestry search complete. Ours={} ({}), Theirs={} ({}), Common={:?} ({})",
|
||||
trace!(
|
||||
target: "sync",
|
||||
"Ancestry search complete. Ours={} ({}), Theirs={} ({}), Common={:?} ({})",
|
||||
self.best_queued_hash,
|
||||
self.best_queued_number,
|
||||
peer.best_hash,
|
||||
@@ -843,7 +920,12 @@ impl<B: BlockT> ChainSync<B> {
|
||||
if peer.common_number < peer.best_number
|
||||
&& peer.best_number < self.best_queued_number
|
||||
{
|
||||
trace!(target: "sync", "Added fork target {} for {}" , peer.best_hash, who);
|
||||
trace!(
|
||||
target: "sync",
|
||||
"Added fork target {} for {}",
|
||||
peer.best_hash,
|
||||
who,
|
||||
);
|
||||
self.fork_targets
|
||||
.entry(peer.best_hash.clone())
|
||||
.or_insert_with(|| ForkTarget {
|
||||
@@ -991,7 +1073,11 @@ impl<B: BlockT> ChainSync<B> {
|
||||
}
|
||||
|
||||
match result {
|
||||
Ok(BlockImportResult::ImportedKnown(_number)) => {}
|
||||
Ok(BlockImportResult::ImportedKnown(number, who)) => {
|
||||
if let Some(peer) = who.and_then(|p| self.peers.get_mut(&p)) {
|
||||
peer.update_common_number(number);
|
||||
}
|
||||
}
|
||||
Ok(BlockImportResult::ImportedUnknown(number, aux, who)) => {
|
||||
if aux.clear_justification_requests {
|
||||
trace!(
|
||||
@@ -1004,38 +1090,61 @@ impl<B: BlockT> ChainSync<B> {
|
||||
}
|
||||
|
||||
if aux.needs_justification {
|
||||
trace!(target: "sync", "Block imported but requires justification {}: {:?}", number, hash);
|
||||
trace!(
|
||||
target: "sync",
|
||||
"Block imported but requires justification {}: {:?}",
|
||||
number,
|
||||
hash,
|
||||
);
|
||||
self.request_justification(&hash, number);
|
||||
}
|
||||
|
||||
if aux.bad_justification {
|
||||
if let Some(peer) = who {
|
||||
if let Some(ref peer) = who {
|
||||
info!("💔 Sent block with bad justification to import");
|
||||
output.push(Err(BadPeer(peer, rep::BAD_JUSTIFICATION)));
|
||||
output.push(Err(BadPeer(peer.clone(), rep::BAD_JUSTIFICATION)));
|
||||
}
|
||||
}
|
||||
|
||||
if number > self.best_imported_number {
|
||||
self.best_imported_number = number;
|
||||
}
|
||||
|
||||
if let Some(peer) = who.and_then(|p| self.peers.get_mut(&p)) {
|
||||
peer.update_common_number(number);
|
||||
}
|
||||
},
|
||||
Err(BlockImportError::IncompleteHeader(who)) => {
|
||||
if let Some(peer) = who {
|
||||
warn!("💔 Peer sent block with incomplete header to import");
|
||||
warn!(
|
||||
target: "sync",
|
||||
"💔 Peer sent block with incomplete header to import",
|
||||
);
|
||||
output.push(Err(BadPeer(peer, rep::INCOMPLETE_HEADER)));
|
||||
output.extend(self.restart());
|
||||
}
|
||||
},
|
||||
Err(BlockImportError::VerificationFailed(who, e)) => {
|
||||
if let Some(peer) = who {
|
||||
warn!("💔 Verification failed for block {:?} received from peer: {}, {:?}", hash, peer, e);
|
||||
warn!(
|
||||
target: "sync",
|
||||
"💔 Verification failed for block {:?} received from peer: {}, {:?}",
|
||||
hash,
|
||||
peer,
|
||||
e,
|
||||
);
|
||||
output.push(Err(BadPeer(peer, rep::VERIFICATION_FAIL)));
|
||||
output.extend(self.restart());
|
||||
}
|
||||
},
|
||||
Err(BlockImportError::BadBlock(who)) => {
|
||||
if let Some(peer) = who {
|
||||
info!("💔 Block {:?} received from peer {} has been blacklisted", hash, peer);
|
||||
info!(
|
||||
target: "sync",
|
||||
"💔 Block {:?} received from peer {} has been blacklisted",
|
||||
hash,
|
||||
peer,
|
||||
);
|
||||
output.push(Err(BadPeer(peer, rep::BAD_BLOCK)));
|
||||
}
|
||||
},
|
||||
@@ -1074,7 +1183,11 @@ impl<B: BlockT> ChainSync<B> {
|
||||
});
|
||||
|
||||
if let Err(err) = r {
|
||||
warn!(target: "sync", "💔 Error cleaning up pending extra justification data requests: {:?}", err);
|
||||
warn!(
|
||||
target: "sync",
|
||||
"💔 Error cleaning up pending extra justification data requests: {:?}",
|
||||
err,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1279,6 +1392,12 @@ impl<B: BlockT> ChainSync<B> {
|
||||
&mut self,
|
||||
pre_validation_result: PreValidateBlockAnnounce<B::Header>,
|
||||
) -> PollBlockAnnounceValidation<B::Header> {
|
||||
trace!(
|
||||
target: "sync",
|
||||
"Finished block announce validation: {:?}",
|
||||
pre_validation_result,
|
||||
);
|
||||
|
||||
let (announce, is_best, who) = match pre_validation_result {
|
||||
PreValidateBlockAnnounce::Nothing { is_best, who, announce } => {
|
||||
self.peer_block_announce_validation_finished(&who);
|
||||
@@ -1316,6 +1435,7 @@ impl<B: BlockT> ChainSync<B> {
|
||||
}
|
||||
|
||||
if let PeerSyncState::AncestorSearch {..} = peer.state {
|
||||
trace!(target: "sync", "Peer state is ancestor search.");
|
||||
return PollBlockAnnounceValidation::Nothing { is_best, who, header }
|
||||
}
|
||||
|
||||
@@ -1323,11 +1443,11 @@ impl<B: BlockT> ChainSync<B> {
|
||||
// is either one further ahead or it's the one they just announced, if we know about it.
|
||||
if is_best {
|
||||
if known && self.best_queued_number >= number {
|
||||
peer.common_number = number
|
||||
peer.update_common_number(number);
|
||||
} else if header.parent_hash() == &self.best_queued_hash
|
||||
|| known_parent && self.best_queued_number >= number
|
||||
{
|
||||
peer.common_number = number - One::one();
|
||||
peer.update_common_number(number - One::one());
|
||||
}
|
||||
}
|
||||
self.pending_requests.add(&who);
|
||||
@@ -1367,6 +1487,7 @@ impl<B: BlockT> ChainSync<B> {
|
||||
.peers.insert(who.clone());
|
||||
}
|
||||
|
||||
trace!(target: "sync", "Announce validation result is nothing");
|
||||
PollBlockAnnounceValidation::Nothing { is_best, who, header }
|
||||
}
|
||||
|
||||
@@ -1485,7 +1606,7 @@ pub enum AncestorSearchState<B: BlockT> {
|
||||
fn handle_ancestor_search_state<B: BlockT>(
|
||||
state: &AncestorSearchState<B>,
|
||||
curr_block_num: NumberFor<B>,
|
||||
block_hash_match: bool
|
||||
block_hash_match: bool,
|
||||
) -> Option<(AncestorSearchState<B>, NumberFor<B>)> {
|
||||
let two = <NumberFor<B>>::one() + <NumberFor<B>>::one();
|
||||
match state {
|
||||
@@ -1536,44 +1657,41 @@ fn peer_block_request<B: BlockT>(
|
||||
if best_num >= peer.best_number {
|
||||
// Will be downloaded as alternative fork instead.
|
||||
return None;
|
||||
}
|
||||
if peer.common_number < finalized {
|
||||
} else if peer.common_number < finalized {
|
||||
trace!(
|
||||
target: "sync",
|
||||
"Requesting pre-finalized chain from {:?}, common={}, finalized={}, peer best={}, our best={}",
|
||||
id, peer.common_number, finalized, peer.best_number, best_num,
|
||||
);
|
||||
}
|
||||
if let Some(range) = blocks.needed_blocks(
|
||||
let range = blocks.needed_blocks(
|
||||
id.clone(),
|
||||
MAX_BLOCKS_TO_REQUEST,
|
||||
peer.best_number,
|
||||
peer.common_number,
|
||||
max_parallel_downloads,
|
||||
MAX_DOWNLOAD_AHEAD,
|
||||
) {
|
||||
// The end is not part of the range.
|
||||
let last = range.end.saturating_sub(One::one());
|
||||
)?;
|
||||
|
||||
let from = if peer.best_number == last {
|
||||
message::FromBlock::Hash(peer.best_hash)
|
||||
} else {
|
||||
message::FromBlock::Number(last)
|
||||
};
|
||||
// The end is not part of the range.
|
||||
let last = range.end.saturating_sub(One::one());
|
||||
|
||||
let request = message::generic::BlockRequest {
|
||||
id: 0,
|
||||
fields: attrs.clone(),
|
||||
from,
|
||||
to: None,
|
||||
direction: message::Direction::Descending,
|
||||
max: Some((range.end - range.start).saturated_into::<u32>())
|
||||
};
|
||||
|
||||
Some((range, request))
|
||||
let from = if peer.best_number == last {
|
||||
message::FromBlock::Hash(peer.best_hash)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
message::FromBlock::Number(last)
|
||||
};
|
||||
|
||||
let request = message::generic::BlockRequest {
|
||||
id: 0,
|
||||
fields: attrs.clone(),
|
||||
from,
|
||||
to: None,
|
||||
direction: message::Direction::Descending,
|
||||
max: Some((range.end - range.start).saturated_into::<u32>())
|
||||
};
|
||||
|
||||
Some((range, request))
|
||||
}
|
||||
|
||||
/// Get pending fork sync targets for a peer.
|
||||
@@ -1750,7 +1868,7 @@ mod test {
|
||||
use substrate_test_runtime_client::{
|
||||
runtime::{Block, Hash, Header},
|
||||
ClientBlockImportExt, DefaultTestClientBuilderExt, TestClientBuilder, TestClientBuilderExt,
|
||||
BlockBuilderExt,
|
||||
BlockBuilderExt, TestClient, ClientExt,
|
||||
};
|
||||
use futures::{future::poll_fn, executor::block_on};
|
||||
|
||||
@@ -1948,11 +2066,14 @@ mod test {
|
||||
/// Get a block request from `sync` and check that is matches the expected request.
|
||||
fn get_block_request(
|
||||
sync: &mut ChainSync<Block>,
|
||||
from: message::FromBlock<Hash, u64>,
|
||||
from: FromBlock<Hash, u64>,
|
||||
max: u32,
|
||||
peer: &PeerId,
|
||||
) -> BlockRequest<Block> {
|
||||
let requests = sync.block_requests().collect::<Vec<_>>();
|
||||
|
||||
log::trace!(target: "sync", "Requests: {:?}", requests);
|
||||
|
||||
assert_eq!(1, requests.len());
|
||||
assert_eq!(peer, requests[0].0);
|
||||
|
||||
@@ -1963,6 +2084,26 @@ mod test {
|
||||
request
|
||||
}
|
||||
|
||||
/// Build and import a new best block.
|
||||
fn build_block(client: &mut Arc<TestClient>, at: Option<Hash>, fork: bool) -> Block {
|
||||
let at = at.unwrap_or_else(|| client.info().best_hash);
|
||||
|
||||
let mut block_builder = client.new_block_at(
|
||||
&BlockId::Hash(at),
|
||||
Default::default(),
|
||||
false,
|
||||
).unwrap();
|
||||
|
||||
if fork {
|
||||
block_builder.push_storage_change(vec![1, 2, 3], Some(vec![4, 5, 6])).unwrap();
|
||||
}
|
||||
|
||||
let block = block_builder.build().unwrap().block;
|
||||
|
||||
client.import(BlockOrigin::Own, block.clone()).unwrap();
|
||||
block
|
||||
}
|
||||
|
||||
/// This test is a regression test as observed on a real network.
|
||||
///
|
||||
/// The node is connected to multiple peers. Both of these peers are having a best block (1) that
|
||||
@@ -1990,14 +2131,6 @@ mod test {
|
||||
let peer_id1 = PeerId::random();
|
||||
let peer_id2 = PeerId::random();
|
||||
|
||||
let mut client2 = client.clone();
|
||||
let mut build_block = || {
|
||||
let block = client2.new_block(Default::default()).unwrap().build().unwrap().block;
|
||||
client2.import(BlockOrigin::Own, block.clone()).unwrap();
|
||||
|
||||
block
|
||||
};
|
||||
|
||||
let mut client2 = client.clone();
|
||||
let mut build_block_at = |at, import| {
|
||||
let mut block_builder = client2.new_block_at(&BlockId::Hash(at), Default::default(), false)
|
||||
@@ -2014,9 +2147,9 @@ mod test {
|
||||
block
|
||||
};
|
||||
|
||||
let block1 = build_block();
|
||||
let block2 = build_block();
|
||||
let block3 = build_block();
|
||||
let block1 = build_block(&mut client, None, false);
|
||||
let block2 = build_block(&mut client, None, false);
|
||||
let block3 = build_block(&mut client, None, false);
|
||||
let block3_fork = build_block_at(block2.hash(), false);
|
||||
|
||||
// Add two peers which are on block 1.
|
||||
@@ -2073,4 +2206,253 @@ mod test {
|
||||
// Nothing to import
|
||||
assert!(matches!(res, OnBlockData::Import(_, blocks) if blocks.is_empty()));
|
||||
}
|
||||
|
||||
fn unwrap_from_block_number(from: FromBlock<Hash, u64>) -> u64 {
|
||||
if let FromBlock::Number(from) = from {
|
||||
from
|
||||
} else {
|
||||
panic!("Expected a number!");
|
||||
}
|
||||
}
|
||||
|
||||
/// A regression test for a behavior we have seen on a live network.
|
||||
///
|
||||
/// The scenario is that the node is doing a full resync and is connected to some node that is
|
||||
/// doing a major sync as well. This other node that is doing a major sync will finish before
|
||||
/// our node and send a block announcement message, but we don't have seen any block announcement
|
||||
/// from this node in its sync process. Meaning our common number didn't change. It is now expected
|
||||
/// that we start an ancestor search to find the common number.
|
||||
#[test]
|
||||
fn do_ancestor_search_when_common_block_to_best_qeued_gap_is_to_big() {
|
||||
sp_tracing::try_init_simple();
|
||||
|
||||
let blocks = {
|
||||
let mut client = Arc::new(TestClientBuilder::new().build());
|
||||
(0..MAX_DOWNLOAD_AHEAD * 2).map(|_| build_block(&mut client, None, false)).collect::<Vec<_>>()
|
||||
};
|
||||
|
||||
let mut client = Arc::new(TestClientBuilder::new().build());
|
||||
let info = client.info();
|
||||
|
||||
let mut sync = ChainSync::new(
|
||||
Roles::AUTHORITY,
|
||||
client.clone(),
|
||||
&info,
|
||||
Box::new(DefaultBlockAnnounceValidator),
|
||||
5,
|
||||
);
|
||||
|
||||
let peer_id1 = PeerId::random();
|
||||
let peer_id2 = PeerId::random();
|
||||
|
||||
let best_block = blocks.last().unwrap().clone();
|
||||
// Connect the node we will sync from
|
||||
sync.new_peer(peer_id1.clone(), best_block.hash(), *best_block.header().number()).unwrap();
|
||||
sync.new_peer(peer_id2.clone(), info.best_hash, 0).unwrap();
|
||||
|
||||
let mut best_block_num = 0;
|
||||
while best_block_num < MAX_DOWNLOAD_AHEAD {
|
||||
let request = get_block_request(
|
||||
&mut sync,
|
||||
FromBlock::Number(MAX_BLOCKS_TO_REQUEST as u64 + best_block_num as u64),
|
||||
MAX_BLOCKS_TO_REQUEST as u32,
|
||||
&peer_id1,
|
||||
);
|
||||
|
||||
let from = unwrap_from_block_number(request.from.clone());
|
||||
|
||||
let mut resp_blocks = blocks[best_block_num as usize..from as usize].to_vec();
|
||||
resp_blocks.reverse();
|
||||
|
||||
let response = create_block_response(resp_blocks.clone());
|
||||
|
||||
let res = sync.on_block_data(&peer_id1, Some(request), response).unwrap();
|
||||
assert!(
|
||||
matches!(
|
||||
res,
|
||||
OnBlockData::Import(_, blocks) if blocks.len() == MAX_BLOCKS_TO_REQUEST
|
||||
),
|
||||
);
|
||||
|
||||
best_block_num += MAX_BLOCKS_TO_REQUEST as u32;
|
||||
|
||||
resp_blocks.into_iter()
|
||||
.rev()
|
||||
.for_each(|b| client.import_as_final(BlockOrigin::Own, b).unwrap());
|
||||
}
|
||||
|
||||
// Let peer2 announce that it finished syncing
|
||||
send_block_announce(best_block.header().clone(), &peer_id2, &mut sync);
|
||||
|
||||
let (peer1_req, peer2_req) = sync.block_requests().fold((None, None), |res, req| {
|
||||
if req.0 == &peer_id1 {
|
||||
(Some(req.1), res.1)
|
||||
} else if req.0 == &peer_id2 {
|
||||
(res.0, Some(req.1))
|
||||
} else {
|
||||
panic!("Unexpected req: {:?}", req)
|
||||
}
|
||||
});
|
||||
|
||||
// We should now do an ancestor search to find the correct common block.
|
||||
let peer2_req = peer2_req.unwrap();
|
||||
assert_eq!(Some(1), peer2_req.max);
|
||||
assert_eq!(FromBlock::Number(best_block_num as u64), peer2_req.from);
|
||||
|
||||
let response = create_block_response(vec![blocks[(best_block_num - 1) as usize].clone()]);
|
||||
let res = sync.on_block_data(&peer_id2, Some(peer2_req), response).unwrap();
|
||||
assert!(
|
||||
matches!(
|
||||
res,
|
||||
OnBlockData::Import(_, blocks) if blocks.is_empty()
|
||||
),
|
||||
);
|
||||
|
||||
let peer1_from = unwrap_from_block_number(peer1_req.unwrap().from);
|
||||
|
||||
// As we are on the same chain, we should directly continue with requesting blocks from
|
||||
// peer 2 as well.
|
||||
get_block_request(
|
||||
&mut sync,
|
||||
FromBlock::Number(peer1_from + MAX_BLOCKS_TO_REQUEST as u64),
|
||||
MAX_BLOCKS_TO_REQUEST as u32,
|
||||
&peer_id2,
|
||||
);
|
||||
}
|
||||
|
||||
/// A test that ensures that we can sync a huge fork.
|
||||
///
|
||||
/// The following scenario:
|
||||
/// A peer connects to us and we both have the common block 512. The last finalized is 2048.
|
||||
/// Our best block is 4096. The peer send us a block announcement with 4097 from a fork.
|
||||
///
|
||||
/// We will first do an ancestor search to find the common block. After that we start to sync
|
||||
/// the fork and finish it ;)
|
||||
#[test]
|
||||
fn can_sync_huge_fork() {
|
||||
sp_tracing::try_init_simple();
|
||||
|
||||
let mut client = Arc::new(TestClientBuilder::new().build());
|
||||
let blocks = (0..MAX_BLOCKS_TO_LOOK_BACKWARDS * 4)
|
||||
.map(|_| build_block(&mut client, None, false))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let fork_blocks = {
|
||||
let mut client = Arc::new(TestClientBuilder::new().build());
|
||||
let fork_blocks = blocks[..MAX_BLOCKS_TO_LOOK_BACKWARDS as usize * 2]
|
||||
.into_iter()
|
||||
.inspect(|b| client.import(BlockOrigin::Own, (*b).clone()).unwrap())
|
||||
.cloned()
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
fork_blocks.into_iter().chain(
|
||||
(0..MAX_BLOCKS_TO_LOOK_BACKWARDS * 2 + 1)
|
||||
.map(|_| build_block(&mut client, None, true))
|
||||
).collect::<Vec<_>>()
|
||||
};
|
||||
|
||||
let info = client.info();
|
||||
|
||||
let mut sync = ChainSync::new(
|
||||
Roles::AUTHORITY,
|
||||
client.clone(),
|
||||
&info,
|
||||
Box::new(DefaultBlockAnnounceValidator),
|
||||
5,
|
||||
);
|
||||
|
||||
let finalized_block = blocks[MAX_BLOCKS_TO_LOOK_BACKWARDS as usize * 2 - 1].clone();
|
||||
client.finalize_block(BlockId::Hash(finalized_block.hash()), Some(Vec::new())).unwrap();
|
||||
sync.update_chain_info(&info.best_hash, info.best_number);
|
||||
|
||||
let peer_id1 = PeerId::random();
|
||||
|
||||
let common_block = blocks[MAX_BLOCKS_TO_LOOK_BACKWARDS as usize / 2].clone();
|
||||
// Connect the node we will sync from
|
||||
sync.new_peer(peer_id1.clone(), common_block.hash(), *common_block.header().number()).unwrap();
|
||||
|
||||
send_block_announce(fork_blocks.last().unwrap().header().clone(), &peer_id1, &mut sync);
|
||||
|
||||
let mut request = get_block_request(
|
||||
&mut sync,
|
||||
FromBlock::Number(info.best_number),
|
||||
1,
|
||||
&peer_id1,
|
||||
);
|
||||
|
||||
// Do the ancestor search
|
||||
loop {
|
||||
let block = &fork_blocks[unwrap_from_block_number(request.from.clone()) as usize - 1];
|
||||
let response = create_block_response(vec![block.clone()]);
|
||||
|
||||
let on_block_data = sync.on_block_data(&peer_id1, Some(request), response).unwrap();
|
||||
request = match on_block_data.into_request() {
|
||||
Some(req) => req.1,
|
||||
// We found the ancenstor
|
||||
None => break,
|
||||
};
|
||||
|
||||
log::trace!(target: "sync", "Request: {:?}", request);
|
||||
}
|
||||
|
||||
// Now request and import the fork.
|
||||
let mut best_block_num = finalized_block.header().number().clone() as u32;
|
||||
while best_block_num < *fork_blocks.last().unwrap().header().number() as u32 - 1 {
|
||||
let request = get_block_request(
|
||||
&mut sync,
|
||||
FromBlock::Number(MAX_BLOCKS_TO_REQUEST as u64 + best_block_num as u64),
|
||||
MAX_BLOCKS_TO_REQUEST as u32,
|
||||
&peer_id1,
|
||||
);
|
||||
|
||||
let from = unwrap_from_block_number(request.from.clone());
|
||||
|
||||
let mut resp_blocks = fork_blocks[best_block_num as usize..from as usize].to_vec();
|
||||
resp_blocks.reverse();
|
||||
|
||||
let response = create_block_response(resp_blocks.clone());
|
||||
|
||||
let res = sync.on_block_data(&peer_id1, Some(request), response).unwrap();
|
||||
assert!(
|
||||
matches!(
|
||||
res,
|
||||
OnBlockData::Import(_, blocks) if blocks.len() == MAX_BLOCKS_TO_REQUEST
|
||||
),
|
||||
);
|
||||
|
||||
best_block_num += MAX_BLOCKS_TO_REQUEST as u32;
|
||||
|
||||
let _ = sync.on_blocks_processed(
|
||||
MAX_BLOCKS_TO_REQUEST as usize,
|
||||
MAX_BLOCKS_TO_REQUEST as usize,
|
||||
resp_blocks.iter()
|
||||
.rev()
|
||||
.map(|b|
|
||||
(
|
||||
Ok(
|
||||
BlockImportResult::ImportedUnknown(
|
||||
b.header().number().clone(),
|
||||
Default::default(),
|
||||
Some(peer_id1.clone()),
|
||||
)
|
||||
),
|
||||
b.hash(),
|
||||
)
|
||||
)
|
||||
.collect()
|
||||
);
|
||||
|
||||
resp_blocks.into_iter()
|
||||
.rev()
|
||||
.for_each(|b| client.import(BlockOrigin::Own, b).unwrap());
|
||||
}
|
||||
|
||||
// Request the tip
|
||||
get_block_request(
|
||||
&mut sync,
|
||||
FromBlock::Hash(fork_blocks.last().unwrap().hash()),
|
||||
1,
|
||||
&peer_id1,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user