Don't iterate over peers in generic_proto::behaviour::poll (#6142)

* Don't iterate over peers in generic_proto::behaviour::poll

* Improve comment

* Rework to use DelayIds
This commit is contained in:
Pierre Krieger
2020-05-26 21:23:32 +02:00
committed by GitHub
parent 288953942e
commit 9906a2e39d
@@ -126,6 +126,18 @@ pub struct GenericProto {
/// List of peers in our state.
peers: FnvHashMap<PeerId, PeerState>,
/// The elements in `peers` occasionally contain `Delay` objects that we would normally have
/// to be polled one by one. In order to avoid doing so, as an optimization, every `Delay` is
/// instead put inside of `delays` and reference by a [`DelayId`]. This stream
/// yields `PeerId`s whose `DelayId` is potentially ready.
///
/// By design, we never remove elements from this list. Elements are removed only when the
/// `Delay` triggers. As such, this stream may produce obsolete elements.
delays: stream::FuturesUnordered<Pin<Box<dyn Future<Output = (DelayId, PeerId)> + Send>>>,
/// [`DelayId`] to assign to the next delay.
next_delay_id: DelayId,
/// List of incoming messages we have sent to the peer set manager and that are waiting for an
/// answer.
incoming: SmallVec<[IncomingPeer; 6]>,
@@ -141,6 +153,10 @@ pub struct GenericProto {
queue_size_report: Option<HistogramVec>,
}
/// Identifier for a delay firing.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
struct DelayId(u64);
/// State of a peer we're connected to.
#[derive(Debug)]
enum PeerState {
@@ -158,8 +174,8 @@ enum PeerState {
/// The peerset requested that we connect to this peer. We are currently not connected.
PendingRequest {
/// When to actually start dialing.
timer: futures_timer::Delay,
/// When to actually start dialing. References an entry in `delays`.
timer: DelayId,
/// When the `timer` will trigger.
timer_deadline: Instant,
},
@@ -183,8 +199,8 @@ enum PeerState {
DisabledPendingEnable {
/// The connections that are currently open for custom protocol traffic.
open: SmallVec<[ConnectionId; crate::MAX_CONNECTIONS_PER_PEER]>,
/// When to enable this remote.
timer: futures_timer::Delay,
/// When to enable this remote. References an entry in `delays`.
timer: DelayId,
/// When the `timer` will trigger.
timer_deadline: Instant,
},
@@ -338,6 +354,8 @@ impl GenericProto {
notif_protocols: Vec::new(),
peerset,
peers: FnvHashMap::default(),
delays: Default::default(),
next_delay_id: DelayId(0),
incoming: SmallVec::new(),
next_incoming_index: sc_peerset::IncomingIndex(0),
events: VecDeque::new(),
@@ -627,10 +645,20 @@ impl GenericProto {
match mem::replace(occ_entry.get_mut(), PeerState::Poisoned) {
PeerState::Banned { ref until } if *until > now => {
let peer_id = occ_entry.key().clone();
debug!(target: "sub-libp2p", "PSM => Connect({:?}): Will start to connect at \
until {:?}", occ_entry.key(), until);
until {:?}", peer_id, until);
let delay_id = self.next_delay_id;
self.next_delay_id.0 += 1;
let delay = futures_timer::Delay::new(*until - now);
self.delays.push(async move {
delay.await;
(delay_id, peer_id)
}.boxed());
*occ_entry.into_mut() = PeerState::PendingRequest {
timer: futures_timer::Delay::new(*until - now),
timer: delay_id,
timer_deadline: *until,
};
},
@@ -649,11 +677,21 @@ impl GenericProto {
open,
banned_until: Some(ref banned)
} if *banned > now => {
let peer_id = occ_entry.key().clone();
debug!(target: "sub-libp2p", "PSM => Connect({:?}): But peer is banned until {:?}",
occ_entry.key(), banned);
peer_id, banned);
let delay_id = self.next_delay_id;
self.next_delay_id.0 += 1;
let delay = futures_timer::Delay::new(*banned - now);
self.delays.push(async move {
delay.await;
(delay_id, peer_id)
}.boxed());
*occ_entry.into_mut() = PeerState::DisabledPendingEnable {
open,
timer: futures_timer::Delay::new(*banned - now),
timer: delay_id,
timer_deadline: *banned,
};
},
@@ -1363,34 +1401,37 @@ impl NetworkBehaviour for GenericProto {
}
}
for (peer_id, peer_state) in self.peers.iter_mut() {
match peer_state {
PeerState::PendingRequest { timer, .. } => {
if let Poll::Pending = Pin::new(timer).poll(cx) {
continue;
}
while let Poll::Ready(Some((delay_id, peer_id))) =
Pin::new(&mut self.delays).poll_next(cx) {
let peer_state = match self.peers.get_mut(&peer_id) {
Some(s) => s,
// We intentionally never remove elements from `delays`, and it may
// thus contain peers which are now gone. This is a normal situation.
None => continue,
};
match peer_state {
PeerState::PendingRequest { timer, .. } if *timer == delay_id => {
debug!(target: "sub-libp2p", "Libp2p <= Dial {:?} now that ban has expired", peer_id);
self.events.push_back(NetworkBehaviourAction::DialPeer {
peer_id: peer_id.clone(),
peer_id,
condition: DialPeerCondition::Disconnected
});
*peer_state = PeerState::Requested;
}
PeerState::DisabledPendingEnable { timer, open, .. } => {
if let Poll::Pending = Pin::new(timer).poll(cx) {
continue;
}
PeerState::DisabledPendingEnable { timer, open, .. } if *timer == delay_id => {
debug!(target: "sub-libp2p", "Handler({:?}) <= Enable (ban expired)", peer_id);
self.events.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id: peer_id.clone(),
peer_id,
handler: NotifyHandler::All,
event: NotifsHandlerIn::Enable,
});
*peer_state = PeerState::Enabled { open: mem::replace(open, Default::default()) };
}
// We intentionally never remove elements from `delays`, and it may
// thus contain obsolete entries. This is a normal situation.
_ => {},
}
}