mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-04-26 11:07:56 +00:00
Drain blocks on peer disconnect (#8553)
* Drain blocks on peer disconnect * Finish comment * Fixed test * Update client/network/src/protocol/sync.rs Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com> Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com>
This commit is contained in:
@@ -738,10 +738,19 @@ impl<B: BlockT> ChainSync<B> {
|
||||
// If our best queued is more than `MAX_BLOCKS_TO_LOOK_BACKWARDS` blocks away from the
|
||||
// common number, the peer best number is higher than our best queued and the common
|
||||
// number is smaller than the last finalized block number, we should do an ancestor
|
||||
// search to find a better common block.
|
||||
// search to find a better common block. If the queue is full we wait till all blocks are
|
||||
// imported though.
|
||||
if best_queued.saturating_sub(peer.common_number) > MAX_BLOCKS_TO_LOOK_BACKWARDS.into()
|
||||
&& best_queued < peer.best_number && peer.common_number < last_finalized
|
||||
&& queue.len() <= MAJOR_SYNC_BLOCKS.into()
|
||||
{
|
||||
trace!(
|
||||
target: "sync",
|
||||
"Peer {:?} common block {} too far behind of our best {}. Starting ancestry search.",
|
||||
id,
|
||||
peer.common_number,
|
||||
best_queued,
|
||||
);
|
||||
let current = std::cmp::min(peer.best_number, best_queued);
|
||||
peer.state = PeerSyncState::AncestorSearch {
|
||||
current,
|
||||
@@ -804,7 +813,7 @@ impl<B: BlockT> ChainSync<B> {
|
||||
response: BlockResponse<B>
|
||||
) -> Result<OnBlockData<B>, BadPeer> {
|
||||
self.downloaded_blocks += response.blocks.len();
|
||||
let mut new_blocks: Vec<IncomingBlock<B>> =
|
||||
let new_blocks: Vec<IncomingBlock<B>> =
|
||||
if let Some(peer) = self.peers.get_mut(who) {
|
||||
let mut blocks = response.blocks;
|
||||
if request.as_ref().map_or(false, |r| r.direction == message::Direction::Descending) {
|
||||
@@ -970,6 +979,13 @@ impl<B: BlockT> ChainSync<B> {
|
||||
return Err(BadPeer(who.clone(), rep::NOT_REQUESTED));
|
||||
};
|
||||
|
||||
Ok(self.validate_and_queue_blocks(new_blocks))
|
||||
}
|
||||
|
||||
fn validate_and_queue_blocks(
|
||||
&mut self,
|
||||
mut new_blocks: Vec<IncomingBlock<B>>,
|
||||
) -> OnBlockData<B> {
|
||||
let orig_len = new_blocks.len();
|
||||
new_blocks.retain(|b| !self.queue_blocks.contains(&b.hash));
|
||||
if new_blocks.len() != orig_len {
|
||||
@@ -992,10 +1008,8 @@ impl<B: BlockT> ChainSync<B> {
|
||||
);
|
||||
self.on_block_queued(h, n)
|
||||
}
|
||||
|
||||
self.queue_blocks.extend(new_blocks.iter().map(|b| b.hash));
|
||||
|
||||
Ok(OnBlockData::Import(origin, new_blocks))
|
||||
OnBlockData::Import(origin, new_blocks)
|
||||
}
|
||||
|
||||
/// Handle a response from the remote to a justification request that we made.
|
||||
@@ -1353,7 +1367,7 @@ impl<B: BlockT> ChainSync<B> {
|
||||
PreValidateBlockAnnounce::Failure { who, disconnect }
|
||||
}
|
||||
Err(e) => {
|
||||
error!(
|
||||
debug!(
|
||||
target: "sync",
|
||||
"💔 Block announcement validation of block {:?} errored: {}",
|
||||
hash,
|
||||
@@ -1542,11 +1556,34 @@ impl<B: BlockT> ChainSync<B> {
|
||||
}
|
||||
|
||||
/// Call when a peer has disconnected.
|
||||
pub fn peer_disconnected(&mut self, who: &PeerId) {
|
||||
/// Canceled obsolete block request may result in some blocks being ready for
|
||||
/// import, so this functions checks for such blocks and returns them.
|
||||
pub fn peer_disconnected(&mut self, who: &PeerId) -> Option<OnBlockData<B>> {
|
||||
self.blocks.clear_peer_download(who);
|
||||
self.peers.remove(who);
|
||||
self.extra_justifications.peer_disconnected(who);
|
||||
self.pending_requests.set_all();
|
||||
let blocks: Vec<_> = self.blocks
|
||||
.drain(self.best_queued_number + One::one())
|
||||
.into_iter()
|
||||
.map(|block_data| {
|
||||
let justifications =
|
||||
legacy_justification_mapping(block_data.block.justification);
|
||||
IncomingBlock {
|
||||
hash: block_data.block.hash,
|
||||
header: block_data.block.header,
|
||||
body: block_data.block.body,
|
||||
justifications,
|
||||
origin: block_data.origin,
|
||||
allow_missing_state: true,
|
||||
import_existing: false,
|
||||
}
|
||||
}).collect();
|
||||
if !blocks.is_empty() {
|
||||
Some(self.validate_and_queue_blocks(blocks))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
/// Restart the sync process. This will reset all pending block requests and return an iterator
|
||||
@@ -2349,6 +2386,9 @@ mod test {
|
||||
.for_each(|b| block_on(client.import_as_final(BlockOrigin::Own, b)).unwrap());
|
||||
}
|
||||
|
||||
// "Wait" for the queue to clear
|
||||
sync.queue_blocks.clear();
|
||||
|
||||
// Let peer2 announce that it finished syncing
|
||||
send_block_announce(best_block.header().clone(), &peer_id2, &mut sync);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user