Fix sync stalling on moving head (best block) prior to the tip of the chain sometimes (#4091)

* Work around finalization woes

* Fixed check_block

* Added a test
This commit is contained in:
Arkadiy Paronyan
2019-11-12 13:54:11 +01:00
committed by Gavin Wood
parent c91d42712a
commit d40abed4a5
6 changed files with 184 additions and 55 deletions
+8 -8
View File
@@ -1009,12 +1009,14 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
behaviour: &mut self.behaviour,
peerset: self.peerset_handle.clone(),
}, who.clone(), status.roles, status.best_number);
match self.sync.new_peer(who.clone(), info) {
Ok(None) => (),
Ok(Some(req)) => self.send_request(&who, GenericMessage::BlockRequest(req)),
Err(sync::BadPeer(id, repu)) => {
self.behaviour.disconnect_peer(&id);
self.peerset_handle.report_peer(id, repu)
if info.roles.is_full() {
match self.sync.new_peer(who.clone(), info.best_hash, info.best_number) {
Ok(None) => (),
Ok(Some(req)) => self.send_request(&who, GenericMessage::BlockRequest(req)),
Err(sync::BadPeer(id, repu)) => {
self.behaviour.disconnect_peer(&id);
self.peerset_handle.report_peer(id, repu)
}
}
}
let mut context = ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle);
@@ -1341,12 +1343,10 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
count: usize,
results: Vec<(Result<BlockImportResult<NumberFor<B>>, BlockImportError>, B::Hash)>
) {
let peers = self.context_data.peers.clone();
let results = self.sync.on_blocks_processed(
imported,
count,
results,
|peer_id| peers.get(peer_id).map(|i| i.info.clone())
);
for result in results {
match result {
+34 -34
View File
@@ -37,7 +37,6 @@ use crate::{
config::{Roles, BoxFinalityProofRequestBuilder},
message::{self, generic::FinalityProofRequest, BlockAnnounce, BlockAttributes, BlockRequest, BlockResponse,
FinalityProofResponse},
protocol
};
use either::Either;
use extra_requests::ExtraRequests;
@@ -347,23 +346,22 @@ impl<B: BlockT> ChainSync<B> {
/// Handle a new connected peer.
///
/// Call this method whenever we connect to a new peer.
pub fn new_peer(&mut self, who: PeerId, info: protocol::PeerInfo<B>) -> Result<Option<BlockRequest<B>>, BadPeer> {
pub fn new_peer(&mut self, who: PeerId, best_hash: B::Hash, best_number: NumberFor<B>)
-> Result<Option<BlockRequest<B>>, BadPeer>
{
// There is nothing sync can get from the node that has no blockchain data.
if !info.roles.is_full() {
return Ok(None)
}
match self.block_status(&info.best_hash) {
match self.block_status(&best_hash) {
Err(e) => {
debug!(target:"sync", "Error reading blockchain: {:?}", e);
Err(BadPeer(who, BLOCKCHAIN_STATUS_READ_ERROR_REPUTATION_CHANGE))
}
Ok(BlockStatus::KnownBad) => {
info!("New peer with known bad best block {} ({}).", info.best_hash, info.best_number);
info!("New peer with known bad best block {} ({}).", best_hash, best_number);
Err(BadPeer(who, i32::min_value()))
}
Ok(BlockStatus::Unknown) => {
if info.best_number.is_zero() {
info!("New peer with unknown genesis hash {} ({}).", info.best_hash, info.best_number);
if best_number.is_zero() {
info!("New peer with unknown genesis hash {} ({}).", best_hash, best_number);
return Err(BadPeer(who, i32::min_value()))
}
// If there are more than `MAJOR_SYNC_BLOCKS` in the import queue then we have
@@ -378,8 +376,8 @@ impl<B: BlockT> ChainSync<B> {
);
self.peers.insert(who, PeerSync {
common_number: self.best_queued_number,
best_hash: info.best_hash,
best_number: info.best_number,
best_hash,
best_number,
state: PeerSyncState::Available,
recently_announced: Default::default()
});
@@ -388,11 +386,11 @@ impl<B: BlockT> ChainSync<B> {
// If we are at genesis, just start downloading.
if self.best_queued_number.is_zero() {
debug!(target:"sync", "New peer with best hash {} ({}).", info.best_hash, info.best_number);
debug!(target:"sync", "New peer with best hash {} ({}).", best_hash, best_number);
self.peers.insert(who.clone(), PeerSync {
common_number: Zero::zero(),
best_hash: info.best_hash,
best_number: info.best_number,
best_hash,
best_number,
state: PeerSyncState::Available,
recently_announced: Default::default(),
});
@@ -400,18 +398,18 @@ impl<B: BlockT> ChainSync<B> {
return Ok(None)
}
let common_best = std::cmp::min(self.best_queued_number, info.best_number);
let common_best = std::cmp::min(self.best_queued_number, best_number);
debug!(target:"sync",
"New peer with unknown best hash {} ({}), searching for common ancestor.",
info.best_hash,
info.best_number
best_hash,
best_number
);
self.peers.insert(who, PeerSync {
common_number: Zero::zero(),
best_hash: info.best_hash,
best_number: info.best_number,
best_hash,
best_number,
state: PeerSyncState::AncestorSearch(
common_best,
AncestorSearchState::ExponentialBackoff(One::one())
@@ -423,11 +421,11 @@ impl<B: BlockT> ChainSync<B> {
Ok(Some(ancestry_request::<B>(common_best)))
}
Ok(BlockStatus::Queued) | Ok(BlockStatus::InChainWithState) | Ok(BlockStatus::InChainPruned) => {
debug!(target:"sync", "New peer with known best hash {} ({}).", info.best_hash, info.best_number);
debug!(target:"sync", "New peer with known best hash {} ({}).", best_hash, best_number);
self.peers.insert(who.clone(), PeerSync {
common_number: info.best_number,
best_hash: info.best_hash,
best_number: info.best_number,
common_number: best_number,
best_hash,
best_number,
state: PeerSyncState::Available,
recently_announced: Default::default(),
});
@@ -849,7 +847,6 @@ impl<B: BlockT> ChainSync<B> {
imported: usize,
count: usize,
results: Vec<(Result<BlockImportResult<NumberFor<B>>, BlockImportError>, B::Hash)>,
mut peer_info: impl FnMut(&PeerId) -> Option<protocol::PeerInfo<B>>
) -> impl Iterator<Item = Result<(PeerId, BlockRequest<B>), BadPeer>> + 'a {
trace!(target: "sync", "Imported {} of {}", imported, count);
@@ -902,27 +899,33 @@ impl<B: BlockT> ChainSync<B> {
if let Some(peer) = who {
info!("Peer sent block with incomplete header to import");
output.push(Err(BadPeer(peer, INCOMPLETE_HEADER_REPUTATION_CHANGE)));
output.extend(self.restart(&mut peer_info));
output.extend(self.restart());
}
},
Err(BlockImportError::VerificationFailed(who, e)) => {
if let Some(peer) = who {
info!("Verification failed from peer: {}", e);
output.push(Err(BadPeer(peer, VERIFICATION_FAIL_REPUTATION_CHANGE)));
output.extend(self.restart(&mut peer_info));
output.extend(self.restart());
}
},
Err(BlockImportError::BadBlock(who)) => {
if let Some(peer) = who {
info!("Bad block");
output.push(Err(BadPeer(peer, BAD_BLOCK_REPUTATION_CHANGE)));
output.extend(self.restart(&mut peer_info));
output.extend(self.restart());
}
},
Err(BlockImportError::MissingState) => {
// This may happen if the chain we were requesting upon has been discarded
// in the meantime becasue other chain has been finalized.
// Don't mark it as bad as it still may be synced if explicitly requested.
trace!(target: "sync", "Obsolete block");
},
Err(BlockImportError::UnknownParent) |
Err(BlockImportError::Cancelled) |
Err(BlockImportError::Other(_)) => {
output.extend(self.restart(&mut peer_info));
output.extend(self.restart());
},
};
}
@@ -1121,9 +1124,7 @@ impl<B: BlockT> ChainSync<B> {
}
/// Restart the sync process.
fn restart<'a, F>
(&'a mut self, mut peer_info: F) -> impl Iterator<Item = Result<(PeerId, BlockRequest<B>), BadPeer>> + 'a
where F: FnMut(&PeerId) -> Option<protocol::PeerInfo<B>> + 'a
fn restart<'a>(&'a mut self) -> impl Iterator<Item = Result<(PeerId, BlockRequest<B>), BadPeer>> + 'a
{
self.queue_blocks.clear();
self.best_importing_number = Zero::zero();
@@ -1134,9 +1135,8 @@ impl<B: BlockT> ChainSync<B> {
self.is_idle = false;
debug!(target:"sync", "Restarted with {} ({})", self.best_queued_number, self.best_queued_hash);
let old_peers = std::mem::replace(&mut self.peers, HashMap::new());
old_peers.into_iter().filter_map(move |(id, _)| {
let info = peer_info(&id)?;
match self.new_peer(id.clone(), info) {
old_peers.into_iter().filter_map(move |(id, p)| {
match self.new_peer(id.clone(), p.best_hash, p.best_number) {
Ok(None) => None,
Ok(Some(x)) => Some(Ok((id, x))),
Err(e) => Some(Err(e))