mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-04-26 15:47:58 +00:00
More robust sync (#5604)
* More robust ancestry search * Punish peers for being on the wrong fork * Update client/network/src/protocol/sync.rs Co-Authored-By: Bastian Köcher <bkchr@users.noreply.github.com> Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com>
This commit is contained in:
@@ -90,6 +90,9 @@ mod rep {
|
||||
/// Reputation change for peers which send us a known bad block.
|
||||
pub const BAD_BLOCK: Rep = Rep::new(-(1 << 29), "Bad block");
|
||||
|
||||
/// Reputation change for peers which send us a known block.
|
||||
pub const KNOWN_BLOCK: Rep = Rep::new(-(1 << 29), "Duplicate block");
|
||||
|
||||
/// Reputation change for peers which send us a block with bad justifications.
|
||||
pub const BAD_JUSTIFICATION: Rep = Rep::new(-(1 << 16), "Bad justification");
|
||||
|
||||
@@ -184,7 +187,11 @@ pub enum PeerSyncState<B: BlockT> {
|
||||
/// Available for sync requests.
|
||||
Available,
|
||||
/// Searching for ancestors the Peer has in common with us.
|
||||
AncestorSearch(NumberFor<B>, AncestorSearchState<B>),
|
||||
AncestorSearch {
|
||||
start: NumberFor<B>,
|
||||
current: NumberFor<B>,
|
||||
state: AncestorSearchState<B>,
|
||||
},
|
||||
/// Actively downloading new blocks, starting from the given Number.
|
||||
DownloadingNew(NumberFor<B>),
|
||||
/// Downloading a stale block with given Hash. Stale means that it is a
|
||||
@@ -432,10 +439,11 @@ impl<B: BlockT> ChainSync<B> {
|
||||
common_number: Zero::zero(),
|
||||
best_hash,
|
||||
best_number,
|
||||
state: PeerSyncState::AncestorSearch(
|
||||
common_best,
|
||||
AncestorSearchState::ExponentialBackoff(One::one())
|
||||
),
|
||||
state: PeerSyncState::AncestorSearch {
|
||||
current: common_best,
|
||||
start: self.best_queued_number,
|
||||
state: AncestorSearchState::ExponentialBackoff(One::one()),
|
||||
},
|
||||
recently_announced: Default::default()
|
||||
});
|
||||
self.is_idle = false;
|
||||
@@ -508,7 +516,7 @@ impl<B: BlockT> ChainSync<B> {
|
||||
self.is_idle = false;
|
||||
for peer_id in &peers {
|
||||
if let Some(peer) = self.peers.get_mut(peer_id) {
|
||||
if let PeerSyncState::AncestorSearch(_, _) = peer.state {
|
||||
if let PeerSyncState::AncestorSearch {..} = peer.state {
|
||||
continue;
|
||||
}
|
||||
|
||||
@@ -669,7 +677,7 @@ impl<B: BlockT> ChainSync<B> {
|
||||
match &mut peer.state {
|
||||
PeerSyncState::DownloadingNew(start_block) => {
|
||||
self.blocks.clear_peer_download(&who);
|
||||
self.blocks.insert(*start_block, blocks, who);
|
||||
self.blocks.insert(*start_block, blocks, who.clone());
|
||||
peer.state = PeerSyncState::Available;
|
||||
self.blocks
|
||||
.drain(self.best_queued_number + One::one())
|
||||
@@ -700,10 +708,10 @@ impl<B: BlockT> ChainSync<B> {
|
||||
}
|
||||
}).collect()
|
||||
}
|
||||
PeerSyncState::AncestorSearch(num, state) => {
|
||||
let matching_hash = match (blocks.get(0), self.client.hash(*num)) {
|
||||
PeerSyncState::AncestorSearch { current, start, state } => {
|
||||
let matching_hash = match (blocks.get(0), self.client.hash(*current)) {
|
||||
(Some(block), Ok(maybe_our_block_hash)) => {
|
||||
trace!(target: "sync", "Got ancestry block #{} ({}) from peer {}", num, block.hash, who);
|
||||
trace!(target: "sync", "Got ancestry block #{} ({}) from peer {}", current, block.hash, who);
|
||||
maybe_our_block_hash.filter(|x| x == &block.hash)
|
||||
},
|
||||
(None, _) => {
|
||||
@@ -715,15 +723,27 @@ impl<B: BlockT> ChainSync<B> {
|
||||
return Err(BadPeer(who, rep::BLOCKCHAIN_READ_ERROR))
|
||||
}
|
||||
};
|
||||
if matching_hash.is_some() && peer.common_number < *num {
|
||||
peer.common_number = *num;
|
||||
if matching_hash.is_some() {
|
||||
if *start < self.best_queued_number && self.best_queued_number <= peer.best_number {
|
||||
// We've made progress on this chain since the search was started.
|
||||
// Opportunistically set common number to updated number
|
||||
// instead of the one that started the search.
|
||||
peer.common_number = self.best_queued_number;
|
||||
}
|
||||
else if peer.common_number < *current {
|
||||
peer.common_number = *current;
|
||||
}
|
||||
}
|
||||
if matching_hash.is_none() && num.is_zero() {
|
||||
if matching_hash.is_none() && current.is_zero() {
|
||||
trace!(target:"sync", "Ancestry search: genesis mismatch for peer {}", who);
|
||||
return Err(BadPeer(who, rep::GENESIS_MISMATCH))
|
||||
}
|
||||
if let Some((next_state, next_num)) = handle_ancestor_search_state(state, *num, matching_hash.is_some()) {
|
||||
peer.state = PeerSyncState::AncestorSearch(next_num, next_state);
|
||||
if let Some((next_state, next_num)) = handle_ancestor_search_state(state, *current, matching_hash.is_some()) {
|
||||
peer.state = PeerSyncState::AncestorSearch {
|
||||
current: next_num,
|
||||
start: *start,
|
||||
state: next_state,
|
||||
};
|
||||
return Ok(OnBlockData::Request(who, ancestry_request::<B>(next_num)))
|
||||
} else {
|
||||
// Ancestry search is complete. Check if peer is on a stale fork unknown to us and
|
||||
@@ -747,7 +767,7 @@ impl<B: BlockT> ChainSync<B> {
|
||||
parent_hash: None,
|
||||
peers: Default::default(),
|
||||
})
|
||||
.peers.insert(who);
|
||||
.peers.insert(who.clone());
|
||||
}
|
||||
peer.state = PeerSyncState::Available;
|
||||
Vec::new()
|
||||
@@ -776,18 +796,28 @@ impl<B: BlockT> ChainSync<B> {
|
||||
Vec::new()
|
||||
};
|
||||
|
||||
let orig_len = new_blocks.len();
|
||||
new_blocks.retain(|b| !self.queue_blocks.contains(&b.hash));
|
||||
if new_blocks.len() != orig_len {
|
||||
debug!(target: "sync", "Ignoring {} blocks that are already queued", orig_len - new_blocks.len());
|
||||
}
|
||||
|
||||
// When doing initial sync we don't request blocks in parallel.
|
||||
// So the only way this can happen is when peers lie about the
|
||||
// common block.
|
||||
let is_recent = new_blocks.first()
|
||||
.map(|block| {
|
||||
self.peers.iter().any(|(_, peer)| peer.recently_announced.contains(&block.hash))
|
||||
})
|
||||
.unwrap_or(false);
|
||||
|
||||
if !is_recent && new_blocks.last().map_or(false, |b| self.is_known(&b.hash)) {
|
||||
// When doing initial sync we don't request blocks in parallel.
|
||||
// So the only way this can happen is when peers lie about the
|
||||
// common block.
|
||||
debug!(target: "sync", "Ignoring known blocks from {}", who);
|
||||
return Err(BadPeer(who, rep::KNOWN_BLOCK));
|
||||
}
|
||||
let orig_len = new_blocks.len();
|
||||
new_blocks.retain(|b| !self.queue_blocks.contains(&b.hash));
|
||||
if new_blocks.len() != orig_len {
|
||||
debug!(target: "sync", "Ignoring {} blocks that are already queued", orig_len - new_blocks.len());
|
||||
}
|
||||
|
||||
let origin =
|
||||
if is_recent {
|
||||
BlockOrigin::NetworkBroadcast
|
||||
@@ -1043,7 +1073,7 @@ impl<B: BlockT> ChainSync<B> {
|
||||
self.best_queued_hash = *hash;
|
||||
// Update common blocks
|
||||
for (n, peer) in self.peers.iter_mut() {
|
||||
if let PeerSyncState::AncestorSearch(_, _) = peer.state {
|
||||
if let PeerSyncState::AncestorSearch {..} = peer.state {
|
||||
// Wait for ancestry search to complete first.
|
||||
continue;
|
||||
}
|
||||
@@ -1103,7 +1133,7 @@ impl<B: BlockT> ChainSync<B> {
|
||||
peer.best_number = number;
|
||||
peer.best_hash = hash;
|
||||
}
|
||||
if let PeerSyncState::AncestorSearch(_, _) = peer.state {
|
||||
if let PeerSyncState::AncestorSearch {..} = peer.state {
|
||||
return OnBlockAnnounce::Nothing
|
||||
}
|
||||
// If the announced block is the best they have and is not ahead of us, our common number
|
||||
|
||||
Reference in New Issue
Block a user