Prioritize new blocks over old forks when syncing (#4414)

* Prioritize new blocks over old forks when syncing

* Fixed some test cases
This commit is contained in:
Arkadiy Paronyan
2020-01-09 19:00:57 +01:00
committed by Gavin Wood
parent 6d8b99cf5e
commit 6e572a9477
4 changed files with 157 additions and 119 deletions
+142 -110
View File
@@ -352,6 +352,11 @@ impl<B: BlockT> ChainSync<B> {
}
}
/// Number of active sync requests.
pub fn num_sync_requests(&self) -> usize {
self.fork_targets.len()
}
/// Handle a new connected peer.
///
/// Call this method whenever we connect to a new peer.
@@ -473,7 +478,7 @@ impl<B: BlockT> ChainSync<B> {
debug!(
target: "sync",
"Explicit sync request for block {:?} with no peers specified. \
Syncing from all connected peers {:?} instead.",
Syncing from all connected peers {:?} instead.",
hash, peers,
);
@@ -588,7 +593,27 @@ impl<B: BlockT> ChainSync<B> {
trace!(target: "sync", "Peer {} is busy", id);
return None
}
if let Some((hash, req)) = fork_sync_request(
if let Some((range, req)) = peer_block_request(
id,
peer,
blocks,
attrs,
max_parallel,
last_finalized,
best_queued,
) {
peer.state = PeerSyncState::DownloadingNew(range.start);
trace!(
target: "sync",
"New block request for {}, (best:{}, common:{}) {:?}",
id,
peer.best_number,
peer.common_number,
req,
);
have_requests = true;
Some((id.clone(), req))
} else if let Some((hash, req)) = fork_sync_request(
id,
fork_targets,
best_queued,
@@ -604,25 +629,6 @@ impl<B: BlockT> ChainSync<B> {
peer.state = PeerSyncState::DownloadingStale(hash);
have_requests = true;
Some((id.clone(), req))
} else if let Some((range, req)) = peer_block_request(
id,
peer,
blocks,
attrs,
max_parallel,
last_finalized
) {
peer.state = PeerSyncState::DownloadingNew(range.start);
trace!(
target: "sync",
"New block request for {}, (best:{}, common:{}) {:?}",
id,
peer.best_number,
peer.common_number,
req,
);
have_requests = true;
Some((id.clone(), req))
} else {
None
}
@@ -636,111 +642,127 @@ impl<B: BlockT> ChainSync<B> {
/// Handle a response from the remote to a block request that we made.
///
/// `request` must be the original request that triggered `response`.
/// or `None` if data comes from the block announcement.
///
/// If this corresponds to a valid block, this outputs the block that
/// must be imported in the import queue.
pub fn on_block_data
(&mut self, who: PeerId, request: BlockRequest<B>, response: BlockResponse<B>) -> Result<OnBlockData<B>, BadPeer>
(&mut self, who: PeerId, request: Option<BlockRequest<B>>, response: BlockResponse<B>) -> Result<OnBlockData<B>, BadPeer>
{
let new_blocks: Vec<IncomingBlock<B>> =
if let Some(peer) = self.peers.get_mut(&who) {
let mut blocks = response.blocks;
if request.direction == message::Direction::Descending {
if request.as_ref().map_or(false, |r| r.direction == message::Direction::Descending) {
trace!(target: "sync", "Reversing incoming block list");
blocks.reverse()
}
self.is_idle = false;
match &mut peer.state {
PeerSyncState::DownloadingNew(start_block) => {
self.blocks.clear_peer_download(&who);
self.blocks.insert(*start_block, blocks, who);
peer.state = PeerSyncState::Available;
self.blocks
.drain(self.best_queued_number + One::one())
.into_iter()
.map(|block_data| {
if request.is_some() {
match &mut peer.state {
PeerSyncState::DownloadingNew(start_block) => {
self.blocks.clear_peer_download(&who);
self.blocks.insert(*start_block, blocks, who);
peer.state = PeerSyncState::Available;
self.blocks
.drain(self.best_queued_number + One::one())
.into_iter()
.map(|block_data| {
IncomingBlock {
hash: block_data.block.hash,
header: block_data.block.header,
body: block_data.block.body,
justification: block_data.block.justification,
origin: block_data.origin,
allow_missing_state: true,
import_existing: false,
}
}).collect()
}
PeerSyncState::DownloadingStale(_) => {
peer.state = PeerSyncState::Available;
blocks.into_iter().map(|b| {
IncomingBlock {
hash: block_data.block.hash,
header: block_data.block.header,
body: block_data.block.body,
justification: block_data.block.justification,
origin: block_data.origin,
allow_missing_state: false,
hash: b.hash,
header: b.header,
body: b.body,
justification: b.justification,
origin: Some(who.clone()),
allow_missing_state: true,
import_existing: false,
}
}).collect()
}
PeerSyncState::DownloadingStale(_) => {
peer.state = PeerSyncState::Available;
blocks.into_iter().map(|b| {
IncomingBlock {
hash: b.hash,
header: b.header,
body: b.body,
justification: b.justification,
origin: Some(who.clone()),
allow_missing_state: true,
import_existing: false,
}
}).collect()
}
PeerSyncState::AncestorSearch(num, state) => {
let matching_hash = match (blocks.get(0), self.client.block_hash(*num)) {
(Some(block), Ok(maybe_our_block_hash)) => {
trace!(target: "sync", "Got ancestry block #{} ({}) from peer {}", num, block.hash, who);
maybe_our_block_hash.filter(|x| x == &block.hash)
},
(None, _) => {
debug!(target: "sync", "Invalid response when searching for ancestor from {}", who);
return Err(BadPeer(who, rep::UNKNOWN_ANCESTOR))
},
(_, Err(e)) => {
info!("Error answering legitimate blockchain query: {:?}", e);
return Err(BadPeer(who, rep::BLOCKCHAIN_READ_ERROR))
}
};
if matching_hash.is_some() && peer.common_number < *num {
peer.common_number = *num;
}
if matching_hash.is_none() && num.is_zero() {
trace!(target:"sync", "Ancestry search: genesis mismatch for peer {}", who);
return Err(BadPeer(who, rep::GENESIS_MISMATCH))
}
if let Some((next_state, next_num)) = handle_ancestor_search_state(state, *num, matching_hash.is_some()) {
peer.state = PeerSyncState::AncestorSearch(next_num, next_state);
return Ok(OnBlockData::Request(who, ancestry_request::<B>(next_num)))
} else {
// Ancestry search is complete. Check if peer is on a stale fork unknown to us and
// add it to sync targets if necessary.
trace!(target: "sync", "Ancestry search complete. Ours={} ({}), Theirs={} ({}), Common={:?} ({})",
self.best_queued_hash,
self.best_queued_number,
peer.best_hash,
peer.best_number,
matching_hash,
peer.common_number,
);
if peer.common_number < peer.best_number
&& peer.best_number < self.best_queued_number
{
trace!(target: "sync", "Added fork target {} for {}" , peer.best_hash, who);
self.fork_targets
.entry(peer.best_hash.clone())
.or_insert_with(|| ForkTarget {
number: peer.best_number,
parent_hash: None,
peers: Default::default(),
})
.peers.insert(who);
PeerSyncState::AncestorSearch(num, state) => {
let matching_hash = match (blocks.get(0), self.client.block_hash(*num)) {
(Some(block), Ok(maybe_our_block_hash)) => {
trace!(target: "sync", "Got ancestry block #{} ({}) from peer {}", num, block.hash, who);
maybe_our_block_hash.filter(|x| x == &block.hash)
},
(None, _) => {
debug!(target: "sync", "Invalid response when searching for ancestor from {}", who);
return Err(BadPeer(who, rep::UNKNOWN_ANCESTOR))
},
(_, Err(e)) => {
info!("Error answering legitimate blockchain query: {:?}", e);
return Err(BadPeer(who, rep::BLOCKCHAIN_READ_ERROR))
}
};
if matching_hash.is_some() && peer.common_number < *num {
peer.common_number = *num;
}
if matching_hash.is_none() && num.is_zero() {
trace!(target:"sync", "Ancestry search: genesis mismatch for peer {}", who);
return Err(BadPeer(who, rep::GENESIS_MISMATCH))
}
if let Some((next_state, next_num)) = handle_ancestor_search_state(state, *num, matching_hash.is_some()) {
peer.state = PeerSyncState::AncestorSearch(next_num, next_state);
return Ok(OnBlockData::Request(who, ancestry_request::<B>(next_num)))
} else {
// Ancestry search is complete. Check if peer is on a stale fork unknown to us and
// add it to sync targets if necessary.
trace!(target: "sync", "Ancestry search complete. Ours={} ({}), Theirs={} ({}), Common={:?} ({})",
self.best_queued_hash,
self.best_queued_number,
peer.best_hash,
peer.best_number,
matching_hash,
peer.common_number,
);
if peer.common_number < peer.best_number
&& peer.best_number < self.best_queued_number
{
trace!(target: "sync", "Added fork target {} for {}" , peer.best_hash, who);
self.fork_targets
.entry(peer.best_hash.clone())
.or_insert_with(|| ForkTarget {
number: peer.best_number,
parent_hash: None,
peers: Default::default(),
})
.peers.insert(who);
}
peer.state = PeerSyncState::Available;
Vec::new()
}
peer.state = PeerSyncState::Available;
Vec::new()
}
}
| PeerSyncState::Available
| PeerSyncState::DownloadingJustification(..)
| PeerSyncState::DownloadingFinalityProof(..) => Vec::new()
| PeerSyncState::Available
| PeerSyncState::DownloadingJustification(..)
| PeerSyncState::DownloadingFinalityProof(..) => Vec::new()
}
} else {
// When request.is_none() just acccept blocks
blocks.into_iter().map(|b| {
IncomingBlock {
hash: b.hash,
header: b.header,
body: b.body,
justification: b.justification,
origin: Some(who.clone()),
allow_missing_state: true,
import_existing: false,
}
}).collect()
}
} else {
Vec::new()
@@ -1255,10 +1277,15 @@ fn peer_block_request<B: BlockT>(
attrs: &message::BlockAttributes,
max_parallel_downloads: u32,
finalized: NumberFor<B>,
best_num: NumberFor<B>,
) -> Option<(Range<NumberFor<B>>, BlockRequest<B>)> {
if peer.common_number < finalized {
return None;
}
if best_num >= peer.best_number {
// Will be downloaded as alternative fork instead.
return None;
}
if let Some(range) = blocks.needed_blocks(
id.clone(),
MAX_BLOCKS_TO_REQUEST,
@@ -1291,11 +1318,16 @@ fn fork_sync_request<B: BlockT>(
check_block: impl Fn(&B::Hash) -> BlockStatus,
) -> Option<(B::Hash, BlockRequest<B>)>
{
targets.retain(|hash, r| if r.number > finalized {
targets.retain(|hash, r| {
if r.number <= finalized {
trace!(target: "sync", "Removed expired fork sync request {:?} (#{})", hash, r.number);
return false;
}
if check_block(hash) != BlockStatus::Unknown {
trace!(target: "sync", "Removed obsolete fork sync request {:?} (#{})", hash, r.number);
return false;
}
true
} else {
trace!(target: "sync", "Removed expired fork sync request {:?} (#{})", hash, r.number);
false
});
for (hash, r) in targets {
if !r.peers.contains(id) {