Fixed a couple of syncing issues (#5277)

* Don't queue duplicate blocks

* Keep queue_blocks on restart
This commit is contained in:
Arkadiy Paronyan
2020-03-19 14:02:07 +01:00
committed by GitHub
parent cfa179f2ce
commit a66615446d
5 changed files with 84 additions and 20 deletions
+32 -20
View File
@@ -85,7 +85,7 @@ mod rep {
pub const INCOMPLETE_HEADER: Rep = Rep::new(-(1 << 20), "Incomplete header");
/// Reputation change for peers which send us a block which we fail to verify.
pub const VERIFICATION_FAIL: Rep = Rep::new(-(1 << 20), "Block verification failed");
pub const VERIFICATION_FAIL: Rep = Rep::new(-(1 << 29), "Block verification failed");
/// Reputation change for peers which send us a known bad block.
pub const BAD_BLOCK: Rep = Rep::new(-(1 << 29), "Bad block");
@@ -138,6 +138,8 @@ pub struct ChainSync<B: BlockT> {
block_announce_validator: Box<dyn BlockAnnounceValidator<B> + Send>,
/// Maximum number of peers to ask the same blocks in parallel.
max_parallel_downloads: u32,
/// Total number of processed blocks (imported or failed).
processed_blocks: usize,
}
/// All the data we have about a Peer that we are trying to sync with
@@ -318,6 +320,7 @@ impl<B: BlockT> ChainSync<B> {
is_idle: false,
block_announce_validator,
max_parallel_downloads,
processed_blocks: 0,
}
}
@@ -357,6 +360,11 @@ impl<B: BlockT> ChainSync<B> {
self.fork_targets.len()
}
/// Number of processed blocks.
pub fn num_processed_blocks(&self) -> usize {
self.processed_blocks
}
/// Handle a new connected peer.
///
/// Call this method whenever we connect to a new peer.
@@ -649,7 +657,7 @@ impl<B: BlockT> ChainSync<B> {
pub fn on_block_data
(&mut self, who: PeerId, request: Option<BlockRequest<B>>, response: BlockResponse<B>) -> Result<OnBlockData<B>, BadPeer>
{
let new_blocks: Vec<IncomingBlock<B>> =
let mut new_blocks: Vec<IncomingBlock<B>> =
if let Some(peer) = self.peers.get_mut(&who) {
let mut blocks = response.blocks;
if request.as_ref().map_or(false, |r| r.direction == message::Direction::Descending) {
@@ -768,6 +776,12 @@ impl<B: BlockT> ChainSync<B> {
Vec::new()
};
let orig_len = new_blocks.len();
new_blocks.retain(|b| !self.queue_blocks.contains(&b.hash));
if new_blocks.len() != orig_len {
debug!(target: "sync", "Ignoring {} blocks that are already queued", orig_len - new_blocks.len());
}
let is_recent = new_blocks.first()
.map(|block| {
self.peers.iter().any(|(_, peer)| peer.recently_announced.contains(&block.hash))
@@ -895,10 +909,12 @@ impl<B: BlockT> ChainSync<B> {
let mut output = Vec::new();
let mut has_error = false;
let mut hashes = vec![];
for (result, hash) in results {
hashes.push(hash);
for (_, hash) in &results {
self.queue_blocks.remove(&hash);
}
self.processed_blocks += results.len();
for (result, hash) in results {
if has_error {
continue;
}
@@ -943,43 +959,39 @@ impl<B: BlockT> ChainSync<B> {
},
Err(BlockImportError::IncompleteHeader(who)) => {
if let Some(peer) = who {
info!("Peer sent block with incomplete header to import");
warn!("Peer sent block with incomplete header to import");
output.push(Err(BadPeer(peer, rep::INCOMPLETE_HEADER)));
output.extend(self.restart());
}
},
Err(BlockImportError::VerificationFailed(who, e)) => {
if let Some(peer) = who {
info!("Verification failed from peer: {}", e);
warn!("Verification failed for block {:?} received from peer: {}, {:?}", hash, peer, e);
output.push(Err(BadPeer(peer, rep::VERIFICATION_FAIL)));
output.extend(self.restart());
}
},
Err(BlockImportError::BadBlock(who)) => {
if let Some(peer) = who {
info!("Block received from peer has been blacklisted");
info!("Block {:?} received from peer {} has been blacklisted", hash, peer);
output.push(Err(BadPeer(peer, rep::BAD_BLOCK)));
output.extend(self.restart());
}
},
Err(BlockImportError::MissingState) => {
// This may happen if the chain we were requesting upon has been discarded
// in the meantime because other chain has been finalized.
// Don't mark it as bad as it still may be synced if explicitly requested.
trace!(target: "sync", "Obsolete block");
trace!(target: "sync", "Obsolete block {:?}", hash);
},
Err(BlockImportError::UnknownParent) |
Err(BlockImportError::Cancelled) |
Err(BlockImportError::Other(_)) => {
e @ Err(BlockImportError::UnknownParent) |
e @ Err(BlockImportError::Other(_)) => {
warn!(target: "sync", "Error importing block {:?}: {:?}", hash, e);
output.extend(self.restart());
},
Err(BlockImportError::Cancelled) => {}
};
}
for hash in hashes {
self.queue_blocks.remove(&hash);
}
self.is_idle = false;
output.into_iter()
}
@@ -1094,9 +1106,9 @@ impl<B: BlockT> ChainSync<B> {
if let PeerSyncState::AncestorSearch(_, _) = peer.state {
return OnBlockAnnounce::Nothing
}
// If the announced block is the best they have seen, our common number
// If the announced block is the best they have and is not ahead of us, our common number
// is either one further ahead or it's the one they just announced, if we know about it.
if is_best {
if is_best && self.best_queued_number >= number {
if known {
peer.common_number = number
} else if header.parent_hash() == &self.best_queued_hash || known_parent {
@@ -1168,7 +1180,7 @@ impl<B: BlockT> ChainSync<B> {
/// Restart the sync process.
fn restart<'a>(&'a mut self) -> impl Iterator<Item = Result<(PeerId, BlockRequest<B>), BadPeer>> + 'a {
self.queue_blocks.clear();
self.processed_blocks = 0;
self.blocks.clear();
let info = self.client.info();
self.best_queued_hash = info.best_hash;