Turn a SmallVec into VecDeque for performances (#6091)

* Turn a SmallVec into VecDeque for performances

* Fix the other SmallVecs
This commit is contained in:
Pierre Krieger
2020-05-20 21:12:49 +02:00
committed by GitHub
parent 717fa95bc7
commit c8339f9694
4 changed files with 58 additions and 59 deletions
@@ -34,7 +34,7 @@ use prometheus_endpoint::HistogramVec;
use rand::distributions::{Distribution as _, Uniform};
use smallvec::SmallVec;
use std::task::{Context, Poll};
use std::{borrow::Cow, cmp, collections::hash_map::Entry};
use std::{borrow::Cow, cmp, collections::{hash_map::Entry, VecDeque}};
use std::{error, mem, pin::Pin, str, time::Duration};
use wasm_timer::Instant;
@@ -135,7 +135,7 @@ pub struct GenericProto {
next_incoming_index: sc_peerset::IncomingIndex,
/// Events to produce from `poll()`.
events: SmallVec<[NetworkBehaviourAction<NotifsHandlerIn, GenericProtoOut>; 4]>,
events: VecDeque<NetworkBehaviourAction<NotifsHandlerIn, GenericProtoOut>>,
/// If `Some`, report the message queue sizes on this `Histogram`.
queue_size_report: Option<HistogramVec>,
@@ -340,7 +340,7 @@ impl GenericProto {
peers: FnvHashMap::default(),
incoming: SmallVec::new(),
next_incoming_index: sc_peerset::IncomingIndex(0),
events: SmallVec::new(),
events: VecDeque::new(),
queue_size_report,
}
}
@@ -374,7 +374,7 @@ impl GenericProto {
// Send an event to all the peers we're connected to, updating the handshake message.
for (peer_id, _) in self.peers.iter().filter(|(_, state)| state.is_connected()) {
self.events.push(NetworkBehaviourAction::NotifyHandler {
self.events.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id: peer_id.clone(),
handler: NotifyHandler::All,
event: NotifsHandlerIn::UpdateHandshake {
@@ -446,7 +446,7 @@ impl GenericProto {
debug!(target: "sub-libp2p", "PSM <= Dropped({:?})", peer_id);
self.peerset.dropped(peer_id.clone());
debug!(target: "sub-libp2p", "Handler({:?}) <= Disable", peer_id);
self.events.push(NetworkBehaviourAction::NotifyHandler {
self.events.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id: peer_id.clone(),
handler: NotifyHandler::All,
event: NotifsHandlerIn::Disable,
@@ -471,7 +471,7 @@ impl GenericProto {
inc.alive = false;
debug!(target: "sub-libp2p", "Handler({:?}) <= Disable", peer_id);
self.events.push(NetworkBehaviourAction::NotifyHandler {
self.events.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id: peer_id.clone(),
handler: NotifyHandler::All,
event: NotifsHandlerIn::Disable,
@@ -562,7 +562,7 @@ impl GenericProto {
);
trace!(target: "sub-libp2p", "Handler({:?}) <= Packet", target);
self.events.push(NetworkBehaviourAction::NotifyHandler {
self.events.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id: target.clone(),
handler: NotifyHandler::One(conn),
event: NotifsHandlerIn::SendNotification {
@@ -592,7 +592,7 @@ impl GenericProto {
trace!(target: "sub-libp2p", "External API => Packet for {:?}", target);
trace!(target: "sub-libp2p", "Handler({:?}) <= Packet", target);
self.events.push(NetworkBehaviourAction::NotifyHandler {
self.events.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id: target.clone(),
handler: NotifyHandler::One(conn),
event: NotifsHandlerIn::SendLegacy {
@@ -614,7 +614,7 @@ impl GenericProto {
// If there's no entry in `self.peers`, start dialing.
debug!(target: "sub-libp2p", "PSM => Connect({:?}): Starting to connect", entry.key());
debug!(target: "sub-libp2p", "Libp2p <= Dial {:?}", entry.key());
self.events.push(NetworkBehaviourAction::DialPeer {
self.events.push_back(NetworkBehaviourAction::DialPeer {
peer_id: entry.key().clone(),
condition: DialPeerCondition::Disconnected
});
@@ -638,7 +638,7 @@ impl GenericProto {
PeerState::Banned { .. } => {
debug!(target: "sub-libp2p", "PSM => Connect({:?}): Starting to connect", occ_entry.key());
debug!(target: "sub-libp2p", "Libp2p <= Dial {:?}", occ_entry.key());
self.events.push(NetworkBehaviourAction::DialPeer {
self.events.push_back(NetworkBehaviourAction::DialPeer {
peer_id: occ_entry.key().clone(),
condition: DialPeerCondition::Disconnected
});
@@ -662,7 +662,7 @@ impl GenericProto {
debug!(target: "sub-libp2p", "PSM => Connect({:?}): Enabling connections.",
occ_entry.key());
debug!(target: "sub-libp2p", "Handler({:?}) <= Enable", occ_entry.key());
self.events.push(NetworkBehaviourAction::NotifyHandler {
self.events.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id: occ_entry.key().clone(),
handler: NotifyHandler::All,
event: NotifsHandlerIn::Enable,
@@ -681,7 +681,7 @@ impl GenericProto {
incoming for incoming peer")
}
debug!(target: "sub-libp2p", "Handler({:?}) <= Enable", occ_entry.key());
self.events.push(NetworkBehaviourAction::NotifyHandler {
self.events.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id: occ_entry.key().clone(),
handler: NotifyHandler::All,
event: NotifsHandlerIn::Enable,
@@ -746,7 +746,7 @@ impl GenericProto {
PeerState::Enabled { open } => {
debug!(target: "sub-libp2p", "PSM => Drop({:?}): Disabling connections.", entry.key());
debug!(target: "sub-libp2p", "Handler({:?}) <= Disable", entry.key());
self.events.push(NetworkBehaviourAction::NotifyHandler {
self.events.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id: entry.key().clone(),
handler: NotifyHandler::All,
event: NotifsHandlerIn::Disable,
@@ -801,7 +801,7 @@ impl GenericProto {
debug!(target: "sub-libp2p", "PSM => Accept({:?}, {:?}): Enabling connections.",
index, incoming.peer_id);
debug!(target: "sub-libp2p", "Handler({:?}) <= Enable", incoming.peer_id);
self.events.push(NetworkBehaviourAction::NotifyHandler {
self.events.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id: incoming.peer_id,
handler: NotifyHandler::All,
event: NotifsHandlerIn::Enable,
@@ -834,7 +834,7 @@ impl GenericProto {
debug!(target: "sub-libp2p", "PSM => Reject({:?}, {:?}): Rejecting connections.",
index, incoming.peer_id);
debug!(target: "sub-libp2p", "Handler({:?}) <= Disable", incoming.peer_id);
self.events.push(NetworkBehaviourAction::NotifyHandler {
self.events.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id: incoming.peer_id,
handler: NotifyHandler::All,
event: NotifsHandlerIn::Disable,
@@ -881,7 +881,7 @@ impl NetworkBehaviour for GenericProto {
peer_id, endpoint
);
*st = PeerState::Enabled { open: SmallVec::new() };
self.events.push(NetworkBehaviourAction::NotifyHandler {
self.events.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id: peer_id.clone(),
handler: NotifyHandler::One(*conn),
event: NotifsHandlerIn::Enable
@@ -925,7 +925,7 @@ impl NetworkBehaviour for GenericProto {
"Libp2p => Connected({},{:?}): Not requested by PSM, disabling.",
peer_id, endpoint);
*st = PeerState::Disabled { open: SmallVec::new(), banned_until };
self.events.push(NetworkBehaviourAction::NotifyHandler {
self.events.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id: peer_id.clone(),
handler: NotifyHandler::One(*conn),
event: NotifsHandlerIn::Disable
@@ -941,7 +941,7 @@ impl NetworkBehaviour for GenericProto {
(PeerState::Enabled { .. }, _) => {
debug!(target: "sub-libp2p", "Handler({},{:?}) <= Enable secondary connection",
peer_id, conn);
self.events.push(NetworkBehaviourAction::NotifyHandler {
self.events.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id: peer_id.clone(),
handler: NotifyHandler::One(*conn),
event: NotifsHandlerIn::Enable
@@ -951,7 +951,7 @@ impl NetworkBehaviour for GenericProto {
(PeerState::Disabled { .. }, _) | (PeerState::DisabledPendingEnable { .. }, _) => {
debug!(target: "sub-libp2p", "Handler({},{:?}) <= Disable secondary connection",
peer_id, conn);
self.events.push(NetworkBehaviourAction::NotifyHandler {
self.events.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id: peer_id.clone(),
handler: NotifyHandler::One(*conn),
event: NotifsHandlerIn::Disable
@@ -979,7 +979,7 @@ impl NetworkBehaviour for GenericProto {
reason: "Disconnected by libp2p".into(),
};
self.events.push(NetworkBehaviourAction::GenerateEvent(event));
self.events.push_back(NetworkBehaviourAction::GenerateEvent(event));
}
}
_ => {}
@@ -1144,7 +1144,7 @@ impl NetworkBehaviour for GenericProto {
debug!(target: "sub-libp2p", "Handler({:?}) <= Disable", source);
debug!(target: "sub-libp2p", "PSM <= Dropped({:?})", source);
self.peerset.dropped(source.clone());
self.events.push(NetworkBehaviourAction::NotifyHandler {
self.events.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id: source.clone(),
handler: NotifyHandler::All,
event: NotifsHandlerIn::Disable,
@@ -1216,7 +1216,7 @@ impl NetworkBehaviour for GenericProto {
reason,
peer_id: source,
};
self.events.push(NetworkBehaviourAction::GenerateEvent(event));
self.events.push_back(NetworkBehaviourAction::GenerateEvent(event));
} else {
debug!(target: "sub-libp2p", "Secondary connection closed custom protocol.");
}
@@ -1254,7 +1254,7 @@ impl NetworkBehaviour for GenericProto {
if first {
debug!(target: "sub-libp2p", "External API <= Open({:?})", source);
let event = GenericProtoOut::CustomProtocolOpen { peer_id: source };
self.events.push(NetworkBehaviourAction::GenerateEvent(event));
self.events.push_back(NetworkBehaviourAction::GenerateEvent(event));
} else {
debug!(target: "sub-libp2p", "Secondary connection opened custom protocol.");
}
@@ -1269,7 +1269,7 @@ impl NetworkBehaviour for GenericProto {
message,
};
self.events.push(NetworkBehaviourAction::GenerateEvent(event));
self.events.push_back(NetworkBehaviourAction::GenerateEvent(event));
}
NotifsHandlerOut::Notification { protocol_name, message } => {
@@ -1287,7 +1287,7 @@ impl NetworkBehaviour for GenericProto {
message,
};
self.events.push(NetworkBehaviourAction::GenerateEvent(event));
self.events.push_back(NetworkBehaviourAction::GenerateEvent(event));
}
NotifsHandlerOut::Clogged { messages } => {
@@ -1296,7 +1296,7 @@ impl NetworkBehaviour for GenericProto {
trace!(target: "sub-libp2p", "External API <= Clogged({:?})", source);
warn!(target: "sub-libp2p", "Queue of packets to send to {:?} is \
pretty large", source);
self.events.push(NetworkBehaviourAction::GenerateEvent(GenericProtoOut::Clogged {
self.events.push_back(NetworkBehaviourAction::GenerateEvent(GenericProtoOut::Clogged {
peer_id: source,
messages,
}));
@@ -1335,6 +1335,10 @@ impl NetworkBehaviour for GenericProto {
Self::OutEvent,
>,
> {
if let Some(event) = self.events.pop_front() {
return Poll::Ready(event);
}
// Poll for instructions from the peerset.
// Note that the peerset is a *best effort* crate, and we have to use defensive programming.
loop {
@@ -1368,7 +1372,7 @@ impl NetworkBehaviour for GenericProto {
}
debug!(target: "sub-libp2p", "Libp2p <= Dial {:?} now that ban has expired", peer_id);
self.events.push(NetworkBehaviourAction::DialPeer {
self.events.push_back(NetworkBehaviourAction::DialPeer {
peer_id: peer_id.clone(),
condition: DialPeerCondition::Disconnected
});
@@ -1390,7 +1394,7 @@ impl NetworkBehaviour for GenericProto {
}
debug!(target: "sub-libp2p", "Handler({:?}) <= Enable (ban expired)", peer_id);
self.events.push(NetworkBehaviourAction::NotifyHandler {
self.events.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id: peer_id.clone(),
handler: NotifyHandler::All,
event: NotifsHandlerIn::Enable,
@@ -1402,8 +1406,8 @@ impl NetworkBehaviour for GenericProto {
}
}
if !self.events.is_empty() {
return Poll::Ready(self.events.remove(0))
if let Some(event) = self.events.pop_front() {
return Poll::Ready(event);
}
Poll::Pending
@@ -30,7 +30,7 @@ use libp2p::swarm::{
};
use log::{debug, error};
use smallvec::{smallvec, SmallVec};
use std::{borrow::Cow, error, fmt, io, mem, time::Duration};
use std::{borrow::Cow, collections::VecDeque, error, fmt, io, mem, time::Duration};
use std::{pin::Pin, task::{Context, Poll}};
/// Implements the `IntoProtocolsHandler` trait of libp2p.
@@ -117,7 +117,7 @@ impl IntoProtocolsHandler for LegacyProtoHandlerProto {
substreams: SmallVec::new(),
init_deadline: Delay::new(Duration::from_secs(20))
},
events_queue: SmallVec::new(),
events_queue: VecDeque::new(),
}
}
}
@@ -142,7 +142,7 @@ pub struct LegacyProtoHandler {
///
/// This queue must only ever be modified to insert elements at the back, or remove the first
/// element.
events_queue: SmallVec<[ProtocolsHandlerEvent<RegisteredProtocol, (), LegacyProtoHandlerOut, ConnectionKillError>; 16]>,
events_queue: VecDeque<ProtocolsHandlerEvent<RegisteredProtocol, (), LegacyProtoHandlerOut, ConnectionKillError>>,
}
/// State of the handler.
@@ -277,7 +277,7 @@ impl LegacyProtoHandler {
ProtocolState::Init { substreams: incoming, .. } => {
if incoming.is_empty() {
if let ConnectedPoint::Dialer { .. } = self.endpoint {
self.events_queue.push(ProtocolsHandlerEvent::OutboundSubstreamRequest {
self.events_queue.push_back(ProtocolsHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(self.protocol.clone()),
info: (),
});
@@ -290,7 +290,7 @@ impl LegacyProtoHandler {
version: incoming[0].protocol_version(),
endpoint: self.endpoint.clone()
};
self.events_queue.push(ProtocolsHandlerEvent::Custom(event));
self.events_queue.push_back(ProtocolsHandlerEvent::Custom(event));
ProtocolState::Normal {
substreams: incoming.into_iter().collect(),
shutdown: SmallVec::new()
@@ -488,7 +488,7 @@ impl LegacyProtoHandler {
version: substream.protocol_version(),
endpoint: self.endpoint.clone()
};
self.events_queue.push(ProtocolsHandlerEvent::Custom(event));
self.events_queue.push_back(ProtocolsHandlerEvent::Custom(event));
ProtocolState::Normal {
substreams: smallvec![substream],
shutdown: SmallVec::new()
@@ -565,7 +565,7 @@ impl ProtocolsHandler for LegacyProtoHandler {
_ => false,
};
self.events_queue.push(ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::ProtocolError {
self.events_queue.push_back(ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::ProtocolError {
is_severe,
error: Box::new(err),
}));
@@ -587,8 +587,7 @@ impl ProtocolsHandler for LegacyProtoHandler {
ProtocolsHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::OutEvent, Self::Error>
> {
// Flush the events queue if necessary.
if !self.events_queue.is_empty() {
let event = self.events_queue.remove(0);
if let Some(event) = self.events_queue.pop_front() {
return Poll::Ready(event)
}
@@ -37,8 +37,7 @@ use libp2p::swarm::{
NegotiatedSubstream,
};
use log::{error, warn};
use smallvec::SmallVec;
use std::{borrow::Cow, fmt, pin::Pin, task::{Context, Poll}};
use std::{borrow::Cow, collections::VecDeque, fmt, pin::Pin, task::{Context, Poll}};
/// Implements the `IntoProtocolsHandler` trait of libp2p.
///
@@ -70,7 +69,7 @@ pub struct NotifsInHandler {
///
/// This queue is only ever modified to insert elements at the back, or remove the first
/// element.
events_queue: SmallVec<[ProtocolsHandlerEvent<DeniedUpgrade, (), NotifsInHandlerOut, void::Void>; 16]>,
events_queue: VecDeque<ProtocolsHandlerEvent<DeniedUpgrade, (), NotifsInHandlerOut, void::Void>>,
}
/// Event that can be received by a `NotifsInHandler`.
@@ -130,7 +129,7 @@ impl IntoProtocolsHandler for NotifsInHandlerProto {
in_protocol: self.in_protocol,
substream: None,
pending_accept_refuses: 0,
events_queue: SmallVec::new(),
events_queue: VecDeque::new(),
}
}
}
@@ -160,7 +159,7 @@ impl ProtocolsHandler for NotifsInHandler {
) {
// If a substream already exists, we drop it and replace it with the new incoming one.
if self.substream.is_some() {
self.events_queue.push(ProtocolsHandlerEvent::Custom(NotifsInHandlerOut::Closed));
self.events_queue.push_back(ProtocolsHandlerEvent::Custom(NotifsInHandlerOut::Closed));
}
// Note that we drop the existing substream, which will send an equivalent to a TCP "RST"
@@ -171,7 +170,7 @@ impl ProtocolsHandler for NotifsInHandler {
// and we can't close "more" than that anyway.
self.substream = Some(proto);
self.events_queue.push(ProtocolsHandlerEvent::Custom(NotifsInHandlerOut::OpenRequest(msg)));
self.events_queue.push_back(ProtocolsHandlerEvent::Custom(NotifsInHandlerOut::OpenRequest(msg)));
self.pending_accept_refuses = self.pending_accept_refuses
.checked_add(1)
.unwrap_or_else(|| {
@@ -233,8 +232,7 @@ impl ProtocolsHandler for NotifsInHandler {
ProtocolsHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::OutEvent, Self::Error>
> {
// Flush the events queue if necessary.
if !self.events_queue.is_empty() {
let event = self.events_queue.remove(0);
if let Some(event) = self.events_queue.pop_front() {
return Poll::Ready(event)
}
@@ -35,8 +35,7 @@ use libp2p::swarm::{
};
use log::{debug, warn, error};
use prometheus_endpoint::Histogram;
use smallvec::SmallVec;
use std::{borrow::Cow, fmt, mem, pin::Pin, task::{Context, Poll}, time::Duration};
use std::{borrow::Cow, collections::VecDeque, fmt, mem, pin::Pin, task::{Context, Poll}, time::Duration};
use wasm_timer::Instant;
/// Maximum duration to open a substream and receive the handshake message. After that, we
@@ -85,7 +84,7 @@ impl IntoProtocolsHandler for NotifsOutHandlerProto {
when_connection_open: Instant::now(),
queue_size_report: self.queue_size_report,
state: State::Disabled,
events_queue: SmallVec::new(),
events_queue: VecDeque::new(),
peer_id: peer_id.clone(),
}
}
@@ -116,7 +115,7 @@ pub struct NotifsOutHandler {
///
/// This queue must only ever be modified to insert elements at the back, or remove the first
/// element.
events_queue: SmallVec<[ProtocolsHandlerEvent<NotificationsOut, (), NotifsOutHandlerOut, void::Void>; 16]>,
events_queue: VecDeque<ProtocolsHandlerEvent<NotificationsOut, (), NotifsOutHandlerOut, void::Void>>,
/// Who we are connected to.
peer_id: PeerId,
@@ -247,7 +246,7 @@ impl ProtocolsHandler for NotifsOutHandler {
match mem::replace(&mut self.state, State::Poisoned) {
State::Opening { initial_message } => {
let ev = NotifsOutHandlerOut::Open { handshake: handshake_msg };
self.events_queue.push(ProtocolsHandlerEvent::Custom(ev));
self.events_queue.push_back(ProtocolsHandlerEvent::Custom(ev));
self.state = State::Open { substream, initial_message };
},
// If the handler was disabled while we were negotiating the protocol, immediately
@@ -267,7 +266,7 @@ impl ProtocolsHandler for NotifsOutHandler {
match mem::replace(&mut self.state, State::Poisoned) {
State::Disabled => {
let proto = NotificationsOut::new(self.protocol_name.clone(), initial_message.clone());
self.events_queue.push(ProtocolsHandlerEvent::OutboundSubstreamRequest {
self.events_queue.push_back(ProtocolsHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(proto).with_timeout(OPEN_TIMEOUT),
info: (),
});
@@ -287,7 +286,7 @@ impl ProtocolsHandler for NotifsOutHandler {
}
let proto = NotificationsOut::new(self.protocol_name.clone(), initial_message.clone());
self.events_queue.push(ProtocolsHandlerEvent::OutboundSubstreamRequest {
self.events_queue.push_back(ProtocolsHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(proto).with_timeout(OPEN_TIMEOUT),
info: (),
});
@@ -347,7 +346,7 @@ impl ProtocolsHandler for NotifsOutHandler {
State::Opening { .. } => {
self.state = State::Refused;
let ev = NotifsOutHandlerOut::Refused;
self.events_queue.push(ProtocolsHandlerEvent::Custom(ev));
self.events_queue.push_back(ProtocolsHandlerEvent::Custom(ev));
},
State::DisabledOpening => self.state = State::Disabled,
State::Poisoned => error!("☎️ Notifications handler in a poisoned state"),
@@ -371,9 +370,8 @@ impl ProtocolsHandler for NotifsOutHandler {
cx: &mut Context,
) -> Poll<ProtocolsHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::OutEvent, Self::Error>> {
// Flush the events queue if necessary.
if !self.events_queue.is_empty() {
let event = self.events_queue.remove(0);
return Poll::Ready(event);
if let Some(event) = self.events_queue.pop_front() {
return Poll::Ready(event)
}
match &mut self.state {
@@ -385,7 +383,7 @@ impl ProtocolsHandler for NotifsOutHandler {
let initial_message = mem::replace(initial_message, Vec::new());
self.state = State::Opening { initial_message: initial_message.clone() };
let proto = NotificationsOut::new(self.protocol_name.clone(), initial_message);
self.events_queue.push(ProtocolsHandlerEvent::OutboundSubstreamRequest {
self.events_queue.push_back(ProtocolsHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(proto).with_timeout(OPEN_TIMEOUT),
info: (),
});