mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-14 16:51:03 +00:00
Enable parallel block download (#4014)
This commit is contained in:
committed by
Gavin Wood
parent
093f1c46e5
commit
4ca6f8d1b2
@@ -392,7 +392,8 @@ impl<B: BlockT> ChainSync<B> {
|
|||||||
state: PeerSyncState::Available,
|
state: PeerSyncState::Available,
|
||||||
recently_announced: Default::default(),
|
recently_announced: Default::default(),
|
||||||
});
|
});
|
||||||
return Ok(self.select_new_blocks(who).map(|(_, req)| req))
|
self.is_idle = false;
|
||||||
|
return Ok(None)
|
||||||
}
|
}
|
||||||
|
|
||||||
let common_best = std::cmp::min(self.best_queued_number, info.best_number);
|
let common_best = std::cmp::min(self.best_queued_number, info.best_number);
|
||||||
@@ -567,6 +568,7 @@ impl<B: BlockT> ChainSync<B> {
|
|||||||
trace!(target: "sync", "Too many blocks in the queue.");
|
trace!(target: "sync", "Too many blocks in the queue.");
|
||||||
return Either::Left(std::iter::empty())
|
return Either::Left(std::iter::empty())
|
||||||
}
|
}
|
||||||
|
let major_sync = self.status().state == SyncState::Downloading;
|
||||||
let blocks = &mut self.blocks;
|
let blocks = &mut self.blocks;
|
||||||
let attrs = &self.required_block_attributes;
|
let attrs = &self.required_block_attributes;
|
||||||
let fork_targets = &self.fork_targets;
|
let fork_targets = &self.fork_targets;
|
||||||
@@ -596,7 +598,7 @@ impl<B: BlockT> ChainSync<B> {
|
|||||||
peer.state = PeerSyncState::DownloadingStale(hash);
|
peer.state = PeerSyncState::DownloadingStale(hash);
|
||||||
have_requests = true;
|
have_requests = true;
|
||||||
Some((id.clone(), req))
|
Some((id.clone(), req))
|
||||||
} else if let Some((range, req)) = peer_block_request(id, peer, blocks, attrs) {
|
} else if let Some((range, req)) = peer_block_request(id, peer, blocks, attrs, major_sync) {
|
||||||
peer.state = PeerSyncState::DownloadingNew(range.start);
|
peer.state = PeerSyncState::DownloadingNew(range.start);
|
||||||
trace!(target: "sync", "New block request for {}", id);
|
trace!(target: "sync", "New block request for {}", id);
|
||||||
have_requests = true;
|
have_requests = true;
|
||||||
@@ -1123,39 +1125,6 @@ impl<B: BlockT> ChainSync<B> {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Select a range of new blocks to download from the given peer.
|
|
||||||
fn select_new_blocks(&mut self, who: PeerId) -> Option<(Range<NumberFor<B>>, BlockRequest<B>)> {
|
|
||||||
// when there are too many blocks in the queue => do not try to download new blocks
|
|
||||||
if self.queue_blocks.len() > MAX_IMPORTING_BLOCKS {
|
|
||||||
trace!(target: "sync", "Too many blocks in the queue.");
|
|
||||||
return None
|
|
||||||
}
|
|
||||||
|
|
||||||
let peer = self.peers.get_mut(&who)?;
|
|
||||||
|
|
||||||
if !peer.state.is_available() {
|
|
||||||
trace!(target: "sync", "Peer {} is busy", who);
|
|
||||||
return None
|
|
||||||
}
|
|
||||||
|
|
||||||
trace!(
|
|
||||||
target: "sync",
|
|
||||||
"Considering new block download from {}, common block is {}, best is {:?}",
|
|
||||||
who,
|
|
||||||
peer.common_number,
|
|
||||||
peer.best_number
|
|
||||||
);
|
|
||||||
|
|
||||||
if let Some((range, req)) = peer_block_request(&who, peer, &mut self.blocks, &self.required_block_attributes) {
|
|
||||||
trace!(target: "sync", "Requesting blocks from {}, ({} to {})", who, range.start, range.end);
|
|
||||||
peer.state = PeerSyncState::DownloadingNew(range.start);
|
|
||||||
Some((range, req))
|
|
||||||
} else {
|
|
||||||
trace!(target: "sync", "Nothing to request from {}", who);
|
|
||||||
None
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// What is the status of the block corresponding to the given hash?
|
/// What is the status of the block corresponding to the given hash?
|
||||||
fn block_status(&self, hash: &B::Hash) -> Result<BlockStatus, ClientError> {
|
fn block_status(&self, hash: &B::Hash) -> Result<BlockStatus, ClientError> {
|
||||||
if self.queue_blocks.contains(hash) {
|
if self.queue_blocks.contains(hash) {
|
||||||
@@ -1254,8 +1223,16 @@ fn peer_block_request<B: BlockT>(
|
|||||||
peer: &PeerSync<B>,
|
peer: &PeerSync<B>,
|
||||||
blocks: &mut BlockCollection<B>,
|
blocks: &mut BlockCollection<B>,
|
||||||
attrs: &message::BlockAttributes,
|
attrs: &message::BlockAttributes,
|
||||||
|
major_sync: bool,
|
||||||
) -> Option<(Range<NumberFor<B>>, BlockRequest<B>)> {
|
) -> Option<(Range<NumberFor<B>>, BlockRequest<B>)> {
|
||||||
if let Some(range) = blocks.needed_blocks(id.clone(), MAX_BLOCKS_TO_REQUEST, peer.best_number, peer.common_number) {
|
let max_parallel = if major_sync { 1 } else { 3 };
|
||||||
|
if let Some(range) = blocks.needed_blocks(
|
||||||
|
id.clone(),
|
||||||
|
MAX_BLOCKS_TO_REQUEST,
|
||||||
|
peer.best_number,
|
||||||
|
peer.common_number,
|
||||||
|
max_parallel,
|
||||||
|
) {
|
||||||
let request = message::generic::BlockRequest {
|
let request = message::generic::BlockRequest {
|
||||||
id: 0,
|
id: 0,
|
||||||
fields: attrs.clone(),
|
fields: attrs.clone(),
|
||||||
|
|||||||
@@ -24,8 +24,6 @@ use libp2p::PeerId;
|
|||||||
use sr_primitives::traits::{Block as BlockT, NumberFor, One};
|
use sr_primitives::traits::{Block as BlockT, NumberFor, One};
|
||||||
use crate::message;
|
use crate::message;
|
||||||
|
|
||||||
const MAX_PARALLEL_DOWNLOADS: u32 = 1;
|
|
||||||
|
|
||||||
/// Block data with origin.
|
/// Block data with origin.
|
||||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||||
pub struct BlockData<B: BlockT> {
|
pub struct BlockData<B: BlockT> {
|
||||||
@@ -84,9 +82,7 @@ impl<B: BlockT> BlockCollection<B> {
|
|||||||
|
|
||||||
match self.blocks.get(&start) {
|
match self.blocks.get(&start) {
|
||||||
Some(&BlockRangeState::Downloading { .. }) => {
|
Some(&BlockRangeState::Downloading { .. }) => {
|
||||||
trace!(target: "sync", "Ignored block data still marked as being downloaded: {}", start);
|
trace!(target: "sync", "Inserting block data still marked as being downloaded: {}", start);
|
||||||
debug_assert!(false);
|
|
||||||
return;
|
|
||||||
},
|
},
|
||||||
Some(&BlockRangeState::Complete(ref existing)) if existing.len() >= blocks.len() => {
|
Some(&BlockRangeState::Complete(ref existing)) if existing.len() >= blocks.len() => {
|
||||||
trace!(target: "sync", "Ignored block data already downloaded: {}", start);
|
trace!(target: "sync", "Ignored block data already downloaded: {}", start);
|
||||||
@@ -100,8 +96,15 @@ impl<B: BlockT> BlockCollection<B> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Returns a set of block hashes that require a header download. The returned set is marked as being downloaded.
|
/// Returns a set of block hashes that require a header download. The returned set is marked as being downloaded.
|
||||||
pub fn needed_blocks(&mut self, who: PeerId, count: usize, peer_best: NumberFor<B>, common: NumberFor<B>)
|
pub fn needed_blocks(
|
||||||
-> Option<Range<NumberFor<B>>> {
|
&mut self,
|
||||||
|
who: PeerId,
|
||||||
|
count: usize,
|
||||||
|
peer_best: NumberFor<B>,
|
||||||
|
common: NumberFor<B>,
|
||||||
|
max_parallel: u32,
|
||||||
|
) -> Option<Range<NumberFor<B>>>
|
||||||
|
{
|
||||||
// First block number that we need to download
|
// First block number that we need to download
|
||||||
let first_different = common + <NumberFor<B>>::one();
|
let first_different = common + <NumberFor<B>>::one();
|
||||||
let count = (count as u32).into();
|
let count = (count as u32).into();
|
||||||
@@ -112,7 +115,7 @@ impl<B: BlockT> BlockCollection<B> {
|
|||||||
let next = downloading_iter.next();
|
let next = downloading_iter.next();
|
||||||
break match &(prev, next) {
|
break match &(prev, next) {
|
||||||
&(Some((start, &BlockRangeState::Downloading { ref len, downloading })), _)
|
&(Some((start, &BlockRangeState::Downloading { ref len, downloading })), _)
|
||||||
if downloading < MAX_PARALLEL_DOWNLOADS =>
|
if downloading < max_parallel =>
|
||||||
(*start .. *start + *len, downloading),
|
(*start .. *start + *len, downloading),
|
||||||
&(Some((start, r)), Some((next_start, _))) if *start + r.len() < *next_start =>
|
&(Some((start, r)), Some((next_start, _))) if *start + r.len() < *next_start =>
|
||||||
(*start + r.len() .. cmp::min(*next_start, *start + r.len() + count), 0), // gap
|
(*start + r.len() .. cmp::min(*next_start, *start + r.len() + count), 0), // gap
|
||||||
@@ -185,7 +188,6 @@ impl<B: BlockT> BlockCollection<B> {
|
|||||||
true
|
true
|
||||||
},
|
},
|
||||||
_ => {
|
_ => {
|
||||||
debug_assert!(false);
|
|
||||||
false
|
false
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
@@ -242,18 +244,18 @@ mod test {
|
|||||||
let peer2 = PeerId::random();
|
let peer2 = PeerId::random();
|
||||||
|
|
||||||
let blocks = generate_blocks(150);
|
let blocks = generate_blocks(150);
|
||||||
assert_eq!(bc.needed_blocks(peer0.clone(), 40, 150, 0), Some(1 .. 41));
|
assert_eq!(bc.needed_blocks(peer0.clone(), 40, 150, 0, 1), Some(1 .. 41));
|
||||||
assert_eq!(bc.needed_blocks(peer1.clone(), 40, 150, 0), Some(41 .. 81));
|
assert_eq!(bc.needed_blocks(peer1.clone(), 40, 150, 0, 1), Some(41 .. 81));
|
||||||
assert_eq!(bc.needed_blocks(peer2.clone(), 40, 150, 0), Some(81 .. 121));
|
assert_eq!(bc.needed_blocks(peer2.clone(), 40, 150, 0, 1), Some(81 .. 121));
|
||||||
|
|
||||||
bc.clear_peer_download(&peer1);
|
bc.clear_peer_download(&peer1);
|
||||||
bc.insert(41, blocks[41..81].to_vec(), peer1.clone());
|
bc.insert(41, blocks[41..81].to_vec(), peer1.clone());
|
||||||
assert_eq!(bc.drain(1), vec![]);
|
assert_eq!(bc.drain(1), vec![]);
|
||||||
assert_eq!(bc.needed_blocks(peer1.clone(), 40, 150, 0), Some(121 .. 151));
|
assert_eq!(bc.needed_blocks(peer1.clone(), 40, 150, 0, 1), Some(121 .. 151));
|
||||||
bc.clear_peer_download(&peer0);
|
bc.clear_peer_download(&peer0);
|
||||||
bc.insert(1, blocks[1..11].to_vec(), peer0.clone());
|
bc.insert(1, blocks[1..11].to_vec(), peer0.clone());
|
||||||
|
|
||||||
assert_eq!(bc.needed_blocks(peer0.clone(), 40, 150, 0), Some(11 .. 41));
|
assert_eq!(bc.needed_blocks(peer0.clone(), 40, 150, 0, 1), Some(11 .. 41));
|
||||||
assert_eq!(bc.drain(1), blocks[1..11].iter()
|
assert_eq!(bc.drain(1), blocks[1..11].iter()
|
||||||
.map(|b| BlockData { block: b.clone(), origin: Some(peer0.clone()) }).collect::<Vec<_>>());
|
.map(|b| BlockData { block: b.clone(), origin: Some(peer0.clone()) }).collect::<Vec<_>>());
|
||||||
|
|
||||||
@@ -267,7 +269,7 @@ mod test {
|
|||||||
.map(|b| BlockData { block: b.clone(), origin: Some(peer1.clone()) }).collect::<Vec<_>>()[..]);
|
.map(|b| BlockData { block: b.clone(), origin: Some(peer1.clone()) }).collect::<Vec<_>>()[..]);
|
||||||
|
|
||||||
bc.clear_peer_download(&peer2);
|
bc.clear_peer_download(&peer2);
|
||||||
assert_eq!(bc.needed_blocks(peer2.clone(), 40, 150, 80), Some(81 .. 121));
|
assert_eq!(bc.needed_blocks(peer2.clone(), 40, 150, 80, 1), Some(81 .. 121));
|
||||||
bc.clear_peer_download(&peer2);
|
bc.clear_peer_download(&peer2);
|
||||||
bc.insert(81, blocks[81..121].to_vec(), peer2.clone());
|
bc.insert(81, blocks[81..121].to_vec(), peer2.clone());
|
||||||
bc.clear_peer_download(&peer1);
|
bc.clear_peer_download(&peer1);
|
||||||
@@ -292,7 +294,7 @@ mod test {
|
|||||||
bc.blocks.insert(114305, BlockRangeState::Complete(blocks));
|
bc.blocks.insert(114305, BlockRangeState::Complete(blocks));
|
||||||
|
|
||||||
let peer0 = PeerId::random();
|
let peer0 = PeerId::random();
|
||||||
assert_eq!(bc.needed_blocks(peer0.clone(), 128, 10000, 000), Some(1 .. 100));
|
assert_eq!(bc.needed_blocks(peer0.clone(), 128, 10000, 000, 1), Some(1 .. 100));
|
||||||
assert_eq!(bc.needed_blocks(peer0.clone(), 128, 10000, 600), Some(100 + 128 .. 100 + 128 + 128));
|
assert_eq!(bc.needed_blocks(peer0.clone(), 128, 10000, 600, 1), Some(100 + 128 .. 100 + 128 + 128));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user