Optimize collecting pending block requests (#5829)

* Optimized collecting pending block requests

* Make sure request iterator is consumed
This commit is contained in:
Arkadiy Paronyan
2020-04-30 09:50:08 +02:00
committed by GitHub
parent 48832fedc3
commit fdde90077b
2 changed files with 72 additions and 25 deletions
+66 -22
View File
@@ -106,6 +106,50 @@ mod rep {
pub const UNKNOWN_ANCESTOR:Rep = Rep::new(-(1 << 16), "DB Error");
}
enum PendingRequests {
Some(HashSet<PeerId>),
All,
}
impl PendingRequests {
fn add(&mut self, id: &PeerId) {
match self {
PendingRequests::Some(set) => {
set.insert(id.clone());
}
PendingRequests::All => {},
}
}
fn take(&mut self) -> PendingRequests {
std::mem::replace(self, Default::default())
}
fn set_all(&mut self) {
*self = PendingRequests::All;
}
fn contains(&self, id: &PeerId) -> bool {
match self {
PendingRequests::Some(set) => set.contains(id),
PendingRequests::All => true,
}
}
fn is_empty(&self) -> bool {
match self {
PendingRequests::Some(set) => set.is_empty(),
PendingRequests::All => false,
}
}
}
impl Default for PendingRequests {
fn default() -> Self {
PendingRequests::Some(HashSet::default())
}
}
/// The main data structure which contains all the state for a chains
/// active syncing strategy.
pub struct ChainSync<B: BlockT> {
@@ -138,8 +182,8 @@ pub struct ChainSync<B: BlockT> {
request_builder: Option<BoxFinalityProofRequestBuilder<B>>,
/// Fork sync targets.
fork_targets: HashMap<B::Hash, ForkTarget<B>>,
/// A flag that caches idle state with no pending requests.
is_idle: bool,
/// A set of peers for which there might be potential block requests
pending_requests: PendingRequests,
/// A type to check incoming block announcements.
block_announce_validator: Box<dyn BlockAnnounceValidator<B> + Send>,
/// Maximum number of peers to ask the same blocks in parallel.
@@ -327,7 +371,7 @@ impl<B: BlockT> ChainSync<B> {
queue_blocks: Default::default(),
request_builder,
fork_targets: Default::default(),
is_idle: false,
pending_requests: Default::default(),
block_announce_validator,
max_parallel_downloads,
processed_blocks: 0,
@@ -426,7 +470,7 @@ impl<B: BlockT> ChainSync<B> {
state: PeerSyncState::Available,
recently_announced: Default::default(),
});
self.is_idle = false;
self.pending_requests.add(&who);
return Ok(None)
}
@@ -438,6 +482,7 @@ impl<B: BlockT> ChainSync<B> {
best_number
);
self.pending_requests.add(&who);
self.peers.insert(who, PeerSync {
common_number: Zero::zero(),
best_hash,
@@ -449,7 +494,6 @@ impl<B: BlockT> ChainSync<B> {
},
recently_announced: Default::default()
});
self.is_idle = false;
Ok(Some(ancestry_request::<B>(common_best)))
}
@@ -462,7 +506,7 @@ impl<B: BlockT> ChainSync<B> {
state: PeerSyncState::Available,
recently_announced: Default::default(),
});
self.is_idle = false;
self.pending_requests.add(&who);
Ok(None)
}
}
@@ -516,7 +560,6 @@ impl<B: BlockT> ChainSync<B> {
}
trace!(target: "sync", "Downloading requested old fork {:?}", hash);
self.is_idle = false;
for peer_id in &peers {
if let Some(peer) = self.peers.get_mut(peer_id) {
if let PeerSyncState::AncestorSearch {..} = peer.state {
@@ -527,6 +570,7 @@ impl<B: BlockT> ChainSync<B> {
peer.best_number = number;
peer.best_hash = hash.clone();
}
self.pending_requests.add(peer_id);
}
}
@@ -590,7 +634,7 @@ impl<B: BlockT> ChainSync<B> {
/// Get an iterator over all block requests of all peers.
pub fn block_requests(&mut self) -> impl Iterator<Item = (PeerId, BlockRequest<B>)> + '_ {
if self.is_idle {
if self.pending_requests.is_empty() {
return Either::Left(std::iter::empty())
}
if self.queue_blocks.len() > MAX_IMPORTING_BLOCKS {
@@ -606,10 +650,13 @@ impl<B: BlockT> ChainSync<B> {
let best_queued = self.best_queued_number;
let client = &self.client;
let queue = &self.queue_blocks;
let pending_requests = self.pending_requests.take();
let max_parallel = if major_sync { 1 } else { self.max_parallel_downloads };
let iter = self.peers.iter_mut().filter_map(move |(id, peer)| {
if !peer.state.is_available() {
trace!(target: "sync", "Peer {} is busy", id);
return None
}
if !pending_requests.contains(id) {
return None
}
if let Some((range, req)) = peer_block_request(
@@ -652,9 +699,6 @@ impl<B: BlockT> ChainSync<B> {
None
}
});
if !have_requests {
self.is_idle = true;
}
Either::Right(iter)
}
@@ -675,7 +719,7 @@ impl<B: BlockT> ChainSync<B> {
trace!(target: "sync", "Reversing incoming block list");
blocks.reverse()
}
self.is_idle = false;
self.pending_requests.add(&who);
if request.is_some() {
match &mut peer.state {
PeerSyncState::DownloadingNew(start_block) => {
@@ -859,7 +903,7 @@ impl<B: BlockT> ChainSync<B> {
return Ok(OnBlockJustification::Nothing)
};
self.is_idle = false;
self.pending_requests.add(&who);
if let PeerSyncState::DownloadingJustification(hash) = peer.state {
peer.state = PeerSyncState::Available;
@@ -906,7 +950,7 @@ impl<B: BlockT> ChainSync<B> {
return Ok(OnBlockFinalityProof::Nothing)
};
self.is_idle = false;
self.pending_requests.add(&who);
if let PeerSyncState::DownloadingFinalityProof(hash) = peer.state {
peer.state = PeerSyncState::Available;
@@ -1029,7 +1073,7 @@ impl<B: BlockT> ChainSync<B> {
};
}
self.is_idle = false;
self.pending_requests.set_all();
output.into_iter()
}
@@ -1038,12 +1082,12 @@ impl<B: BlockT> ChainSync<B> {
pub fn on_justification_import(&mut self, hash: B::Hash, number: NumberFor<B>, success: bool) {
let finalization_result = if success { Ok((hash, number)) } else { Err(()) };
self.extra_justifications.try_finalize_root((hash, number), finalization_result, true);
self.is_idle = false;
self.pending_requests.set_all();
}
pub fn on_finality_proof_import(&mut self, req: (B::Hash, NumberFor<B>), res: Result<(B::Hash, NumberFor<B>), ()>) {
self.extra_finality_proofs.try_finalize_root(req, res, true);
self.is_idle = false;
self.pending_requests.set_all();
}
/// Notify about finalization of the given block.
@@ -1101,7 +1145,7 @@ impl<B: BlockT> ChainSync<B> {
peer.common_number = new_common_number;
}
}
self.is_idle = false;
self.pending_requests.set_all();
}
/// Call when a node announces a new block.
@@ -1154,7 +1198,7 @@ impl<B: BlockT> ChainSync<B> {
peer.common_number = number - One::one();
}
}
self.is_idle = false;
self.pending_requests.add(&who);
// known block case
if known || self.is_already_downloading(&hash) {
@@ -1214,7 +1258,7 @@ impl<B: BlockT> ChainSync<B> {
self.peers.remove(&who);
self.extra_justifications.peer_disconnected(&who);
self.extra_finality_proofs.peer_disconnected(&who);
self.is_idle = false;
self.pending_requests.set_all();
}
/// Restart the sync process.
@@ -1224,7 +1268,7 @@ impl<B: BlockT> ChainSync<B> {
let info = self.client.info();
self.best_queued_hash = info.best_hash;
self.best_queued_number = std::cmp::max(info.best_number, self.best_imported_number);
self.is_idle = false;
self.pending_requests.set_all();
debug!(target:"sync", "Restarted with {} ({})", self.best_queued_number, self.best_queued_hash);
let old_peers = std::mem::replace(&mut self.peers, HashMap::new());
old_peers.into_iter().filter_map(move |(id, p)| {