Fix lots of small nits in sc-network (#6028)

* Fix lots of small nits in sc-network

* Update client/network/src/protocol/sync/blocks.rs

Co-authored-by: Arkadiy Paronyan <arkady.paronyan@gmail.com>

* Fix warning

* Yes. The line width.

Co-authored-by: Arkadiy Paronyan <arkady.paronyan@gmail.com>
This commit is contained in:
Pierre Krieger
2020-05-15 13:58:52 +02:00
committed by GitHub
parent 1dbd761192
commit 6ecdf20a1f
14 changed files with 111 additions and 112 deletions
@@ -507,7 +507,7 @@ impl GenericProto {
///
/// Can be called multiple times with the same `PeerId`s.
pub fn add_discovered_nodes(&mut self, peer_ids: impl Iterator<Item = PeerId>) {
self.peerset.discovered(peer_ids.into_iter().map(|peer_id| {
self.peerset.discovered(peer_ids.map(|peer_id| {
debug!(target: "sub-libp2p", "PSM <= Discovered({:?})", peer_id);
peer_id
}));
@@ -616,8 +616,8 @@ impl GenericProto {
debug!(target: "sub-libp2p", "PSM => Connect({:?}): Will start to connect at \
until {:?}", occ_entry.key(), until);
*occ_entry.into_mut() = PeerState::PendingRequest {
timer: futures_timer::Delay::new(until.clone() - now),
timer_deadline: until.clone(),
timer: futures_timer::Delay::new(*until - now),
timer_deadline: *until,
};
},
@@ -639,8 +639,8 @@ impl GenericProto {
occ_entry.key(), banned);
*occ_entry.into_mut() = PeerState::DisabledPendingEnable {
open,
timer: futures_timer::Delay::new(banned.clone() - now),
timer_deadline: banned.clone(),
timer: futures_timer::Delay::new(*banned - now),
timer_deadline: *banned,
};
},
@@ -879,7 +879,7 @@ impl NetworkBehaviour for GenericProto {
// this peer", and not "banned" in the sense that we would refuse the peer altogether.
(st @ &mut PeerState::Poisoned, endpoint @ ConnectedPoint::Listener { .. }) |
(st @ &mut PeerState::Banned { .. }, endpoint @ ConnectedPoint::Listener { .. }) => {
let incoming_id = self.next_incoming_index.clone();
let incoming_id = self.next_incoming_index;
self.next_incoming_index.0 = match self.next_incoming_index.0.checked_add(1) {
Some(v) => v,
None => {
@@ -1200,7 +1200,7 @@ impl NetworkBehaviour for GenericProto {
debug!(target: "sub-libp2p", "External API <= Closed({:?})", source);
let event = GenericProtoOut::CustomProtocolClosed {
reason,
peer_id: source.clone(),
peer_id: source,
};
self.events.push(NetworkBehaviourAction::GenerateEvent(event));
} else {
@@ -1384,7 +1384,7 @@ impl NetworkBehaviour for GenericProto {
*peer_state = PeerState::Enabled { open };
}
st @ _ => *peer_state = st,
st => *peer_state = st,
}
}
@@ -483,35 +483,35 @@ impl ProtocolsHandler for NotifsHandler {
) -> Poll<
ProtocolsHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::OutEvent, Self::Error>
> {
while let Poll::Ready(ev) = self.legacy.poll(cx) {
match ev {
if let Poll::Ready(ev) = self.legacy.poll(cx) {
return match ev {
ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol, info: () } =>
return Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest {
Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest {
protocol: protocol.map_upgrade(EitherUpgrade::B),
info: None,
}),
ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::CustomProtocolOpen { endpoint, .. }) =>
return Poll::Ready(ProtocolsHandlerEvent::Custom(
Poll::Ready(ProtocolsHandlerEvent::Custom(
NotifsHandlerOut::Open { endpoint }
)),
ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::CustomProtocolClosed { endpoint, reason }) =>
return Poll::Ready(ProtocolsHandlerEvent::Custom(
Poll::Ready(ProtocolsHandlerEvent::Custom(
NotifsHandlerOut::Closed { endpoint, reason }
)),
ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::CustomMessage { message }) =>
return Poll::Ready(ProtocolsHandlerEvent::Custom(
Poll::Ready(ProtocolsHandlerEvent::Custom(
NotifsHandlerOut::CustomMessage { message }
)),
ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::Clogged { messages }) =>
return Poll::Ready(ProtocolsHandlerEvent::Custom(
Poll::Ready(ProtocolsHandlerEvent::Custom(
NotifsHandlerOut::Clogged { messages }
)),
ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::ProtocolError { is_severe, error }) =>
return Poll::Ready(ProtocolsHandlerEvent::Custom(
Poll::Ready(ProtocolsHandlerEvent::Custom(
NotifsHandlerOut::ProtocolError { is_severe, error }
)),
ProtocolsHandlerEvent::Close(err) =>
return Poll::Ready(ProtocolsHandlerEvent::Close(EitherError::B(err))),
Poll::Ready(ProtocolsHandlerEvent::Close(EitherError::B(err))),
}
}
@@ -390,8 +390,8 @@ pub enum NotificationsOutError {
/// Remote doesn't process our messages quickly enough.
///
/// > **Note**: This is not necessarily the remote's fault, and could also be caused by the
/// > local node sending data too quickly. Properly doing back-pressure, however,
/// > would require a deep refactoring effort in Substrate as a whole.
/// > local node sending data too quickly. Properly doing back-pressure, however,
/// > would require a deep refactoring effort in Substrate as a whole.
Clogged,
}
+39 -35
View File
@@ -574,7 +574,7 @@ impl<B: BlockT> ChainSync<B> {
if number > peer.best_number {
peer.best_number = number;
peer.best_hash = hash.clone();
peer.best_hash = *hash;
}
self.pending_requests.add(peer_id);
}
@@ -639,7 +639,7 @@ impl<B: BlockT> ChainSync<B> {
}
/// Get an iterator over all block requests of all peers.
pub fn block_requests(&mut self) -> impl Iterator<Item = (PeerId, BlockRequest<B>)> + '_ {
pub fn block_requests(&mut self) -> impl Iterator<Item = (&PeerId, BlockRequest<B>)> + '_ {
if self.pending_requests.is_empty() {
return Either::Left(std::iter::empty())
}
@@ -682,7 +682,7 @@ impl<B: BlockT> ChainSync<B> {
req,
);
have_requests = true;
Some((id.clone(), req))
Some((id, req))
} else if let Some((hash, req)) = fork_sync_request(
id,
fork_targets,
@@ -698,7 +698,7 @@ impl<B: BlockT> ChainSync<B> {
trace!(target: "sync", "Downloading fork {:?} from {}", hash, id);
peer.state = PeerSyncState::DownloadingStale(hash);
have_requests = true;
Some((id.clone(), req))
Some((id, req))
} else {
None
}
@@ -713,24 +713,27 @@ impl<B: BlockT> ChainSync<B> {
///
/// If this corresponds to a valid block, this outputs the block that
/// must be imported in the import queue.
pub fn on_block_data
(&mut self, who: PeerId, request: Option<BlockRequest<B>>, response: BlockResponse<B>) -> Result<OnBlockData<B>, BadPeer>
{
pub fn on_block_data(
&mut self,
who: &PeerId,
request: Option<BlockRequest<B>>,
response: BlockResponse<B>
) -> Result<OnBlockData<B>, BadPeer> {
let mut new_blocks: Vec<IncomingBlock<B>> =
if let Some(peer) = self.peers.get_mut(&who) {
if let Some(peer) = self.peers.get_mut(who) {
let mut blocks = response.blocks;
if request.as_ref().map_or(false, |r| r.direction == message::Direction::Descending) {
trace!(target: "sync", "Reversing incoming block list");
blocks.reverse()
}
self.pending_requests.add(&who);
self.pending_requests.add(who);
if request.is_some() {
match &mut peer.state {
PeerSyncState::DownloadingNew(start_block) => {
self.blocks.clear_peer_download(&who);
self.blocks.clear_peer_download(who);
let start_block = *start_block;
peer.state = PeerSyncState::Available;
validate_blocks::<B>(&blocks, &who)?;
validate_blocks::<B>(&blocks, who)?;
self.blocks.insert(start_block, blocks, who.clone());
self.blocks
.drain(self.best_queued_number + One::one())
@@ -751,9 +754,9 @@ impl<B: BlockT> ChainSync<B> {
peer.state = PeerSyncState::Available;
if blocks.is_empty() {
debug!(target: "sync", "Empty block response from {}", who);
return Err(BadPeer(who, rep::NO_BLOCK));
return Err(BadPeer(who.clone(), rep::NO_BLOCK));
}
validate_blocks::<B>(&blocks, &who)?;
validate_blocks::<B>(&blocks, who)?;
blocks.into_iter().map(|b| {
IncomingBlock {
hash: b.hash,
@@ -774,11 +777,11 @@ impl<B: BlockT> ChainSync<B> {
},
(None, _) => {
debug!(target: "sync", "Invalid response when searching for ancestor from {}", who);
return Err(BadPeer(who, rep::UNKNOWN_ANCESTOR))
return Err(BadPeer(who.clone(), rep::UNKNOWN_ANCESTOR))
},
(_, Err(e)) => {
info!("❌ Error answering legitimate blockchain query: {:?}", e);
return Err(BadPeer(who, rep::BLOCKCHAIN_READ_ERROR))
return Err(BadPeer(who.clone(), rep::BLOCKCHAIN_READ_ERROR))
}
};
if matching_hash.is_some() {
@@ -794,7 +797,7 @@ impl<B: BlockT> ChainSync<B> {
}
if matching_hash.is_none() && current.is_zero() {
trace!(target:"sync", "Ancestry search: genesis mismatch for peer {}", who);
return Err(BadPeer(who, rep::GENESIS_MISMATCH))
return Err(BadPeer(who.clone(), rep::GENESIS_MISMATCH))
}
if let Some((next_state, next_num)) = handle_ancestor_search_state(state, *current, matching_hash.is_some()) {
peer.state = PeerSyncState::AncestorSearch {
@@ -802,7 +805,7 @@ impl<B: BlockT> ChainSync<B> {
start: *start,
state: next_state,
};
return Ok(OnBlockData::Request(who, ancestry_request::<B>(next_num)))
return Ok(OnBlockData::Request(who.clone(), ancestry_request::<B>(next_num)))
} else {
// Ancestry search is complete. Check if peer is on a stale fork unknown to us and
// add it to sync targets if necessary.
@@ -838,7 +841,7 @@ impl<B: BlockT> ChainSync<B> {
}
} else {
// When request.is_none() this is a block announcement. Just accept blocks.
validate_blocks::<B>(&blocks, &who)?;
validate_blocks::<B>(&blocks, who)?;
blocks.into_iter().map(|b| {
IncomingBlock {
hash: b.hash,
@@ -869,7 +872,7 @@ impl<B: BlockT> ChainSync<B> {
// So the only way this can happen is when peers lie about the
// common block.
debug!(target: "sync", "Ignoring known blocks from {}", who);
return Err(BadPeer(who, rep::KNOWN_BLOCK));
return Err(BadPeer(who.clone(), rep::KNOWN_BLOCK));
}
let orig_len = new_blocks.len();
new_blocks.retain(|b| !self.queue_blocks.contains(&b.hash));
@@ -1124,7 +1127,7 @@ impl<B: BlockT> ChainSync<B> {
/// Updates our internal state for best queued block and then goes
/// through all peers to update our view of their state as well.
fn on_block_queued(&mut self, hash: &B::Hash, number: NumberFor<B>) {
if let Some(_) = self.fork_targets.remove(&hash) {
if self.fork_targets.remove(&hash).is_some() {
trace!(target: "sync", "Completed fork sync {:?}", hash);
}
if number > self.best_queued_number {
@@ -1162,7 +1165,7 @@ impl<B: BlockT> ChainSync<B> {
/// header (call `on_block_data`). The network request isn't sent
/// in this case. Both hash and header is passed as an optimization
/// to avoid rehashing the header.
pub fn on_block_announce(&mut self, who: PeerId, hash: B::Hash, announce: &BlockAnnounce<B::Header>, is_best: bool)
pub fn on_block_announce(&mut self, who: &PeerId, hash: B::Hash, announce: &BlockAnnounce<B::Header>, is_best: bool)
-> OnBlockAnnounce
{
let header = &announce.header;
@@ -1177,7 +1180,7 @@ impl<B: BlockT> ChainSync<B> {
let ancient_parent = parent_status == BlockStatus::InChainPruned;
let known = self.is_known(&hash);
let peer = if let Some(peer) = self.peers.get_mut(&who) {
let peer = if let Some(peer) = self.peers.get_mut(who) {
peer
} else {
error!(target: "sync", "💔 Called on_block_announce with a bad peer ID");
@@ -1206,13 +1209,13 @@ impl<B: BlockT> ChainSync<B> {
peer.common_number = number - One::one();
}
}
self.pending_requests.add(&who);
self.pending_requests.add(who);
// known block case
if known || self.is_already_downloading(&hash) {
trace!(target: "sync", "Known block announce from {}: {}", who, hash);
if let Some(target) = self.fork_targets.get_mut(&hash) {
target.peers.insert(who);
target.peers.insert(who.clone());
}
return OnBlockAnnounce::Nothing
}
@@ -1251,21 +1254,21 @@ impl<B: BlockT> ChainSync<B> {
.entry(hash.clone())
.or_insert_with(|| ForkTarget {
number,
parent_hash: Some(header.parent_hash().clone()),
parent_hash: Some(*header.parent_hash()),
peers: Default::default(),
})
.peers.insert(who);
.peers.insert(who.clone());
}
OnBlockAnnounce::Nothing
}
/// Call when a peer has disconnected.
pub fn peer_disconnected(&mut self, who: PeerId) {
self.blocks.clear_peer_download(&who);
self.peers.remove(&who);
self.extra_justifications.peer_disconnected(&who);
self.extra_finality_proofs.peer_disconnected(&who);
pub fn peer_disconnected(&mut self, who: &PeerId) {
self.blocks.clear_peer_download(who);
self.peers.remove(who);
self.extra_justifications.peer_disconnected(who);
self.extra_finality_proofs.peer_disconnected(who);
self.pending_requests.set_all();
}
@@ -1471,11 +1474,12 @@ fn fork_sync_request<B: BlockT>(
}
if r.number <= best_num {
let parent_status = r.parent_hash.as_ref().map_or(BlockStatus::Unknown, check_block);
let mut count = (r.number - finalized).saturated_into::<u32>(); // up to the last finalized block
if parent_status != BlockStatus::Unknown {
let count = if parent_status == BlockStatus::Unknown {
(r.number - finalized).saturated_into::<u32>() // up to the last finalized block
} else {
// request only single block
count = 1;
}
1
};
trace!(target: "sync", "Downloading requested fork {:?} from {}, {} blocks", hash, id, count);
return Some((hash.clone(), message::generic::BlockRequest {
id: 0,
@@ -18,7 +18,6 @@
use std::cmp;
use std::ops::Range;
use std::collections::{HashMap, BTreeMap};
use std::collections::hash_map::Entry;
use log::trace;
use libp2p::PeerId;
use sp_runtime::traits::{Block as BlockT, NumberFor, One};
@@ -117,17 +116,17 @@ impl<B: BlockT> BlockCollection<B> {
let mut prev: Option<(&NumberFor<B>, &BlockRangeState<B>)> = None;
loop {
let next = downloading_iter.next();
break match &(prev, next) {
&(Some((start, &BlockRangeState::Downloading { ref len, downloading })), _)
break match (prev, next) {
(Some((start, &BlockRangeState::Downloading { ref len, downloading })), _)
if downloading < max_parallel =>
(*start .. *start + *len, downloading),
&(Some((start, r)), Some((next_start, _))) if *start + r.len() < *next_start =>
(Some((start, r)), Some((next_start, _))) if *start + r.len() < *next_start =>
(*start + r.len() .. cmp::min(*next_start, *start + r.len() + count), 0), // gap
&(Some((start, r)), None) =>
(Some((start, r)), None) =>
(*start + r.len() .. *start + r.len() + count, 0), // last range
&(None, None) =>
(None, None) =>
(first_different .. first_different + count, 0), // empty
&(None, Some((start, _))) if *start > first_different =>
(None, Some((start, _))) if *start > first_different =>
(first_different .. cmp::min(first_different + count, *start), 0), // gap at the start
_ => {
prev = next;
@@ -168,7 +167,7 @@ impl<B: BlockT> BlockCollection<B> {
let mut prev = from;
for (start, range_data) in &mut self.blocks {
match range_data {
&mut BlockRangeState::Complete(ref mut blocks) if *start <= prev => {
BlockRangeState::Complete(blocks) if *start <= prev => {
prev = *start + (blocks.len() as u32).into();
// Remove all elements from `blocks` and add them to `drained`
drained.append(blocks);
@@ -186,26 +185,22 @@ impl<B: BlockT> BlockCollection<B> {
}
pub fn clear_peer_download(&mut self, who: &PeerId) {
match self.peer_requests.entry(who.clone()) {
Entry::Occupied(entry) => {
let start = entry.remove();
let remove = match self.blocks.get_mut(&start) {
Some(&mut BlockRangeState::Downloading { ref mut downloading, .. }) if *downloading > 1 => {
*downloading = *downloading - 1;
false
},
Some(&mut BlockRangeState::Downloading { .. }) => {
true
},
_ => {
false
}
};
if remove {
self.blocks.remove(&start);
if let Some(start) = self.peer_requests.remove(who) {
let remove = match self.blocks.get_mut(&start) {
Some(&mut BlockRangeState::Downloading { ref mut downloading, .. }) if *downloading > 1 => {
*downloading -= 1;
false
},
Some(&mut BlockRangeState::Downloading { .. }) => {
true
},
_ => {
false
}
},
_ => (),
};
if remove {
self.blocks.remove(&start);
}
}
}
}
@@ -103,11 +103,9 @@ impl<B: BlockT> ExtraRequests<B> {
// we have finalized further than the given request, presumably
// by some other part of the system (not sync). we can safely
// ignore the `Revert` error.
return;
},
Err(err) => {
debug!(target: "sync", "Failed to insert request {:?} into tree: {:?}", request, err);
return;
}
_ => ()
}