Add a back-pressure-friendly alternative to NetworkService::write_notifications 🎉 (#6692)

* Add NetworkService::send_notifications

* Doc

* Doc

* API adjustment

* Address concerns

* Make it compile

* Start implementation

* Progress in the implementation

* Change implementation strategy again

* More work before weekend

* Finish changes

* Minor doc fix

* Revert some minor changes

* Apply suggestions from code review

* GroupError -> NotifsHandlerError

* Apply suggestions from code review

Co-authored-by: Roman Borschel <romanb@users.noreply.github.com>

* state_transition_waker -> close_waker

* Apply suggestions from code review

Co-authored-by: Roman Borschel <romanb@users.noreply.github.com>

* Finish renames in service.rs

* More renames

* More review suggestsions applied

* More review addressing

* Final change

* 512 -> 2048

Co-authored-by: Roman Borschel <romanb@users.noreply.github.com>
This commit is contained in:
Pierre Krieger
2020-07-29 13:23:19 +02:00
committed by GitHub
parent e674d64a72
commit 1ab7719314
12 changed files with 955 additions and 431 deletions
@@ -21,7 +21,7 @@
//! network, then performs the Substrate protocol handling on top.
pub use self::behaviour::{GenericProto, GenericProtoOut};
pub use self::handler::LegacyConnectionKillError;
pub use self::handler::{NotifsHandlerError, NotificationsSink, Ready, LegacyConnectionKillError};
mod behaviour;
mod handler;
@@ -15,8 +15,10 @@
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
use crate::config::ProtocolId;
use crate::protocol::generic_proto::handler::{NotifsHandlerProto, NotifsHandlerOut, NotifsHandlerIn};
use crate::protocol::generic_proto::upgrade::RegisteredProtocol;
use crate::protocol::generic_proto::{
handler::{NotificationsSink, NotifsHandlerProto, NotifsHandlerOut, NotifsHandlerIn},
upgrade::RegisteredProtocol
};
use bytes::BytesMut;
use fnv::FnvHashMap;
@@ -31,7 +33,6 @@ use libp2p::swarm::{
};
use log::{debug, error, trace, warn};
use parking_lot::RwLock;
use prometheus_endpoint::HistogramVec;
use rand::distributions::{Distribution as _, Uniform};
use smallvec::SmallVec;
use std::task::{Context, Poll};
@@ -149,9 +150,6 @@ pub struct GenericProto {
/// Events to produce from `poll()`.
events: VecDeque<NetworkBehaviourAction<NotifsHandlerIn, GenericProtoOut>>,
/// If `Some`, report the message queue sizes on this `Histogram`.
queue_size_report: Option<HistogramVec>,
}
/// Identifier for a delay firing.
@@ -189,7 +187,7 @@ enum PeerState {
/// We may still have ongoing traffic with that peer, but it should cease shortly.
Disabled {
/// The connections that are currently open for custom protocol traffic.
open: SmallVec<[ConnectionId; crate::MAX_CONNECTIONS_PER_PEER]>,
open: SmallVec<[(ConnectionId, NotificationsSink); crate::MAX_CONNECTIONS_PER_PEER]>,
/// If `Some`, any dial attempts to this peer are delayed until the given `Instant`.
banned_until: Option<Instant>,
},
@@ -199,7 +197,7 @@ enum PeerState {
/// but should get disconnected in a few seconds.
DisabledPendingEnable {
/// The connections that are currently open for custom protocol traffic.
open: SmallVec<[ConnectionId; crate::MAX_CONNECTIONS_PER_PEER]>,
open: SmallVec<[(ConnectionId, NotificationsSink); crate::MAX_CONNECTIONS_PER_PEER]>,
/// When to enable this remote. References an entry in `delays`.
timer: DelayId,
/// When the `timer` will trigger.
@@ -210,7 +208,7 @@ enum PeerState {
/// enabled state.
Enabled {
/// The connections that are currently open for custom protocol traffic.
open: SmallVec<[ConnectionId; crate::MAX_CONNECTIONS_PER_PEER]>,
open: SmallVec<[(ConnectionId, NotificationsSink); crate::MAX_CONNECTIONS_PER_PEER]>,
},
/// We received an incoming connection from this peer and forwarded that
@@ -227,15 +225,15 @@ impl PeerState {
self.get_open().is_some()
}
/// Returns the connection ID of the first established connection
/// Returns the [`NotificationsSink`] of the first established connection
/// that is open for custom protocol traffic.
fn get_open(&self) -> Option<ConnectionId> {
fn get_open(&self) -> Option<&NotificationsSink> {
match self {
PeerState::Disabled { open, .. } |
PeerState::DisabledPendingEnable { open, .. } |
PeerState::Enabled { open, .. } =>
if !open.is_empty() {
Some(open[0])
Some(&open[0].1)
} else {
None
}
@@ -284,9 +282,24 @@ pub enum GenericProtoOut {
/// Handshake that was sent to us.
/// This is normally a "Status" message, but this is out of the concern of this code.
received_handshake: Vec<u8>,
/// Object that permits sending notifications to the peer.
notifications_sink: NotificationsSink,
},
/// Closed a custom protocol with the remote.
/// The [`NotificationsSink`] object used to send notifications with the given peer must be
/// replaced with a new one.
///
/// This event is typically emitted when a transport-level connection is closed and we fall
/// back to a secondary connection.
CustomProtocolReplaced {
/// Id of the peer we are connected to.
peer_id: PeerId,
/// Replacement for the previous [`NotificationsSink`].
notifications_sink: NotificationsSink,
},
/// Closed a custom protocol with the remote. The existing [`NotificationsSink`] should
/// be dropped.
CustomProtocolClosed {
/// Id of the peer we were connected to.
peer_id: PeerId,
@@ -317,16 +330,12 @@ pub enum GenericProtoOut {
impl GenericProto {
/// Creates a `CustomProtos`.
///
/// The `queue_size_report` is an optional Prometheus metric that can report the size of the
/// messages queue. If passed, it must have one label for the protocol name.
pub fn new(
local_peer_id: PeerId,
protocol: impl Into<ProtocolId>,
versions: &[u8],
handshake_message: Vec<u8>,
peerset: sc_peerset::Peerset,
queue_size_report: Option<HistogramVec>,
) -> Self {
let legacy_handshake_message = Arc::new(RwLock::new(handshake_message));
let legacy_protocol = RegisteredProtocol::new(protocol, versions, legacy_handshake_message);
@@ -342,7 +351,6 @@ impl GenericProto {
incoming: SmallVec::new(),
next_incoming_index: sc_peerset::IncomingIndex(0),
events: VecDeque::new(),
queue_size_report,
}
}
@@ -394,6 +402,15 @@ impl GenericProto {
self.peers.get(peer_id).map(|p| p.is_open()).unwrap_or(false)
}
/// Returns the [`NotificationsSink`] that sends notifications to the given peer, or `None`
/// if the custom protocols aren't opened with this peer.
///
/// If [`GenericProto::is_open`] returns `true` for this `PeerId`, then this method is
/// guaranteed to return `Some`.
pub fn notifications_sink(&self, peer_id: &PeerId) -> Option<&NotificationsSink> {
self.peers.get(peer_id).and_then(|p| p.get_open())
}
/// Disconnects the given peer if we are connected to it.
pub fn disconnect_peer(&mut self, peer_id: &PeerId) {
debug!(target: "sub-libp2p", "External API => Disconnect {:?}", peer_id);
@@ -538,14 +555,14 @@ impl GenericProto {
message: impl Into<Vec<u8>>,
encoded_fallback_message: Vec<u8>,
) {
let conn = match self.peers.get(target).and_then(|p| p.get_open()) {
let notifs_sink = match self.peers.get(target).and_then(|p| p.get_open()) {
None => {
debug!(target: "sub-libp2p",
"Tried to sent notification to {:?} without an open channel.",
target);
return
},
Some(conn) => conn
Some(sink) => sink
};
trace!(
@@ -555,16 +572,11 @@ impl GenericProto {
str::from_utf8(&protocol_name)
);
trace!(target: "sub-libp2p", "Handler({:?}) <= Packet", target);
self.events.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id: target.clone(),
handler: NotifyHandler::One(conn),
event: NotifsHandlerIn::SendNotification {
message: message.into(),
encoded_fallback_message,
protocol_name,
},
});
notifs_sink.send_sync_notification(
&protocol_name,
encoded_fallback_message,
message
);
}
/// Sends a message to a peer.
@@ -574,25 +586,19 @@ impl GenericProto {
/// Also note that even we have a valid open substream, it may in fact be already closed
/// without us knowing, in which case the packet will not be received.
pub fn send_packet(&mut self, target: &PeerId, message: Vec<u8>) {
let conn = match self.peers.get(target).and_then(|p| p.get_open()) {
let notifs_sink = match self.peers.get(target).and_then(|p| p.get_open()) {
None => {
debug!(target: "sub-libp2p",
"Tried to sent packet to {:?} without an open channel.",
target);
return
}
Some(conn) => conn
Some(sink) => sink
};
trace!(target: "sub-libp2p", "External API => Packet for {:?}", target);
trace!(target: "sub-libp2p", "Handler({:?}) <= Packet", target);
self.events.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id: target.clone(),
handler: NotifyHandler::One(conn),
event: NotifsHandlerIn::SendLegacy {
message,
}
});
notifs_sink.send_legacy(message);
}
/// Returns the state of the peerset manager, for debugging purposes.
@@ -873,7 +879,6 @@ impl NetworkBehaviour for GenericProto {
NotifsHandlerProto::new(
self.legacy_protocol.clone(),
self.notif_protocols.clone(),
self.queue_size_report.clone()
)
}
@@ -985,15 +990,26 @@ impl NetworkBehaviour for GenericProto {
// i.e. there is no connection that is open for custom protocols,
// in which case `CustomProtocolClosed` was already emitted.
let closed = open.is_empty();
open.retain(|c| c != conn);
if open.is_empty() && !closed {
debug!(target: "sub-libp2p", "External API <= Closed({})", peer_id);
let event = GenericProtoOut::CustomProtocolClosed {
peer_id: peer_id.clone(),
reason: "Disconnected by libp2p".into(),
};
let sink_closed = open.get(0).map_or(false, |(c, _)| c == conn);
open.retain(|(c, _)| c != conn);
if !closed {
if let Some((_, sink)) = open.get(0) {
if sink_closed {
let event = GenericProtoOut::CustomProtocolReplaced {
peer_id: peer_id.clone(),
notifications_sink: sink.clone(),
};
self.events.push_back(NetworkBehaviourAction::GenerateEvent(event));
}
} else {
debug!(target: "sub-libp2p", "External API <= Closed({})", peer_id);
let event = GenericProtoOut::CustomProtocolClosed {
peer_id: peer_id.clone(),
reason: "Disconnected by libp2p".into(),
};
self.events.push_back(NetworkBehaviourAction::GenerateEvent(event));
self.events.push_back(NetworkBehaviourAction::GenerateEvent(event));
}
}
}
_ => {}
@@ -1140,9 +1156,11 @@ impl NetworkBehaviour for GenericProto {
return
};
let last = match mem::replace(entry.get_mut(), PeerState::Poisoned) {
let (last, new_notifications_sink) = match mem::replace(entry.get_mut(), PeerState::Poisoned) {
PeerState::Enabled { mut open } => {
if let Some(pos) = open.iter().position(|c| c == &connection) {
let pos = open.iter().position(|(c, _)| c == &connection);
let sink_closed = pos == Some(0);
if let Some(pos) = pos {
open.remove(pos);
} else {
debug_assert!(false);
@@ -1167,16 +1185,24 @@ impl NetworkBehaviour for GenericProto {
});
let last = open.is_empty();
let new_notifications_sink = open.iter().next().and_then(|(_, sink)|
if sink_closed {
Some(sink.clone())
} else {
None
});
*entry.into_mut() = PeerState::Disabled {
open,
banned_until: None
};
last
(last, new_notifications_sink)
},
PeerState::Disabled { mut open, banned_until } => {
if let Some(pos) = open.iter().position(|c| c == &connection) {
let pos = open.iter().position(|(c, _)| c == &connection);
let sink_closed = pos == Some(0);
if let Some(pos) = pos {
open.remove(pos);
} else {
debug_assert!(false);
@@ -1188,18 +1214,28 @@ impl NetworkBehaviour for GenericProto {
}
let last = open.is_empty();
let new_notifications_sink = open.iter().next().and_then(|(_, sink)|
if sink_closed {
Some(sink.clone())
} else {
None
});
*entry.into_mut() = PeerState::Disabled {
open,
banned_until
};
last
(last, new_notifications_sink)
},
PeerState::DisabledPendingEnable {
mut open,
timer,
timer_deadline
} => {
if let Some(pos) = open.iter().position(|c| c == &connection) {
let pos = open.iter().position(|(c, _)| c == &connection);
let sink_closed = pos == Some(0);
if let Some(pos) = pos {
open.remove(pos);
} else {
debug_assert!(false);
@@ -1211,12 +1247,20 @@ impl NetworkBehaviour for GenericProto {
}
let last = open.is_empty();
let new_notifications_sink = open.iter().next().and_then(|(_, sink)|
if sink_closed {
Some(sink.clone())
} else {
None
});
*entry.into_mut() = PeerState::DisabledPendingEnable {
open,
timer,
timer_deadline
};
last
(last, new_notifications_sink)
},
state => {
error!(target: "sub-libp2p",
@@ -1233,12 +1277,20 @@ impl NetworkBehaviour for GenericProto {
peer_id: source,
};
self.events.push_back(NetworkBehaviourAction::GenerateEvent(event));
} else {
if let Some(new_notifications_sink) = new_notifications_sink {
let event = GenericProtoOut::CustomProtocolReplaced {
peer_id: source,
notifications_sink: new_notifications_sink,
};
self.events.push_back(NetworkBehaviourAction::GenerateEvent(event));
}
debug!(target: "sub-libp2p", "Secondary connection closed custom protocol.");
}
}
NotifsHandlerOut::Open { endpoint, received_handshake } => {
NotifsHandlerOut::Open { endpoint, received_handshake, notifications_sink } => {
debug!(target: "sub-libp2p",
"Handler({:?}) => Endpoint {:?} open for custom protocols.",
source, endpoint);
@@ -1248,8 +1300,8 @@ impl NetworkBehaviour for GenericProto {
Some(PeerState::DisabledPendingEnable { ref mut open, .. }) |
Some(PeerState::Disabled { ref mut open, .. }) => {
let first = open.is_empty();
if !open.iter().any(|c| *c == connection) {
open.push(connection);
if !open.iter().any(|(c, _)| *c == connection) {
open.push((connection, notifications_sink.clone()));
} else {
error!(
target: "sub-libp2p",
@@ -1269,7 +1321,11 @@ impl NetworkBehaviour for GenericProto {
if first {
debug!(target: "sub-libp2p", "External API <= Open({:?})", source);
let event = GenericProtoOut::CustomProtocolOpen { peer_id: source, received_handshake };
let event = GenericProtoOut::CustomProtocolOpen {
peer_id: source,
received_handshake,
notifications_sink
};
self.events.push_back(NetworkBehaviourAction::GenerateEvent(event));
} else {
@@ -15,7 +15,10 @@
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
pub use self::group::{NotifsHandlerProto, NotifsHandler, NotifsHandlerIn, NotifsHandlerOut};
pub use self::group::{
NotificationsSink, NotifsHandlerError, Ready, NotifsHandlerProto, NotifsHandler, NotifsHandlerIn, NotifsHandlerOut
};
pub use self::legacy::ConnectionKillError as LegacyConnectionKillError;
mod group;
@@ -63,11 +63,21 @@ use libp2p::swarm::{
SubstreamProtocol,
NegotiatedSubstream,
};
use futures::{
channel::mpsc,
lock::{Mutex as FuturesMutex, MutexGuard as FuturesMutexGuard},
prelude::*
};
use log::{debug, error};
use parking_lot::RwLock;
use prometheus_endpoint::HistogramVec;
use parking_lot::{Mutex, RwLock};
use std::{borrow::Cow, error, io, str, sync::Arc, task::{Context, Poll}};
/// Number of pending notifications in asynchronous contexts.
/// See [`NotificationsSink::reserve_notification`] for context.
const ASYNC_NOTIFICATIONS_BUFFER_SIZE: usize = 8;
/// Number of pending notifications in synchronous contexts.
const SYNC_NOTIFICATIONS_BUFFER_SIZE: usize = 2048;
/// Implements the `IntoProtocolsHandler` trait of libp2p.
///
/// Every time a connection with a remote starts, an instance of this struct is created and
@@ -107,6 +117,18 @@ pub struct NotifsHandler {
/// we push the corresponding index here and process them when the handler
/// gets enabled/disabled.
pending_in: Vec<usize>,
/// If `Some`, contains the two `Receiver`s connected to the [`NotificationsSink`] that has
/// been sent out. The notifications to send out can be pulled from this receivers.
/// We use two different channels in order to have two different channel sizes, but from the
/// receiving point of view, the two channels are the same.
/// The receivers are fused in case the user drops the [`NotificationsSink`] entirely.
notifications_sink_rx: Option<
stream::Select<
stream::Fuse<mpsc::Receiver<NotificationsSinkMessage>>,
stream::Fuse<mpsc::Receiver<NotificationsSinkMessage>>
>
>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
@@ -140,6 +162,7 @@ impl IntoProtocolsHandler for NotifsHandlerProto {
legacy: self.legacy.into_handler(remote_peer_id, connected_point),
enabled: EnabledState::Initial,
pending_in: Vec::new(),
notifications_sink_rx: None,
}
}
}
@@ -152,32 +175,6 @@ pub enum NotifsHandlerIn {
/// The node should stop using custom protocols.
Disable,
/// Sends a message through the custom protocol substream.
///
/// > **Note**: This must **not** be a `ConsensusMessage`, `Transactions`, or
/// > `BlockAnnounce` message.
SendLegacy {
/// The message to send.
message: Vec<u8>,
},
/// Sends a notifications message.
SendNotification {
/// Name of the protocol for the message.
///
/// Must match one of the registered protocols. For backwards-compatibility reasons, if
/// the remote doesn't support this protocol, we use the legacy substream.
protocol_name: Cow<'static, [u8]>,
/// Message to send on the legacy substream if the protocol isn't available.
///
/// This corresponds to what you would have sent with `SendLegacy`.
encoded_fallback_message: Vec<u8>,
/// The message to send.
message: Vec<u8>,
},
}
/// Event that can be emitted by a `NotifsHandler`.
@@ -190,6 +187,8 @@ pub enum NotifsHandlerOut {
/// Handshake that was sent to us.
/// This is normally a "Status" message, but this out of the concern of this code.
received_handshake: Vec<u8>,
/// How notifications can be sent to this node.
notifications_sink: NotificationsSink,
},
/// The connection is closed for custom protocols.
@@ -227,19 +226,160 @@ pub enum NotifsHandlerOut {
},
}
/// Sink connected directly to the node background task. Allows sending notifications to the peer.
///
/// Can be cloned in order to obtain multiple references to the same peer.
#[derive(Debug, Clone)]
pub struct NotificationsSink {
inner: Arc<NotificationsSinkInner>,
}
#[derive(Debug)]
struct NotificationsSinkInner {
/// Sender to use in asynchronous contexts. Uses an asynchronous mutex.
async_channel: FuturesMutex<mpsc::Sender<NotificationsSinkMessage>>,
/// Sender to use in synchronous contexts. Uses a synchronous mutex.
/// This channel has a large capacity and is meant to be used in contexts where
/// back-pressure cannot be properly exerted.
/// It will be removed in a future version.
sync_channel: Mutex<mpsc::Sender<NotificationsSinkMessage>>,
}
/// Message emitted through the [`NotificationsSink`] and processed by the background task
/// dedicated to the peer.
#[derive(Debug)]
enum NotificationsSinkMessage {
/// Message emitted by [`NotificationsSink::send_legacy`].
Legacy {
message: Vec<u8>,
},
/// Message emitted by [`NotificationsSink::reserve_notification`] and
/// [`NotificationsSink::write_notification_now`].
Notification {
protocol_name: Vec<u8>,
encoded_fallback_message: Vec<u8>,
message: Vec<u8>,
},
/// Must close the connection.
ForceClose,
}
impl NotificationsSink {
/// Sends a message to the peer using the legacy substream.
///
/// If too many messages are already buffered, the message is silently discarded and the
/// connection to the peer will be closed shortly after.
///
/// This method will be removed in a future version.
pub fn send_legacy<'a>(&'a self, message: impl Into<Vec<u8>>) {
let mut lock = self.inner.sync_channel.lock();
let result = lock.try_send(NotificationsSinkMessage::Legacy {
message: message.into()
});
if result.is_err() {
// Cloning the `mpsc::Sender` guarantees the allocation of an extra spot in the
// buffer, and therefore that `try_send` will succeed.
let _result2 = lock.clone().try_send(NotificationsSinkMessage::ForceClose);
debug_assert!(_result2.map(|()| true).unwrap_or_else(|err| err.is_disconnected()));
}
}
/// Sends a notification to the peer.
///
/// If too many messages are already buffered, the notification is silently discarded and the
/// connection to the peer will be closed shortly after.
///
/// The protocol name is expected to be checked ahead of calling this method. It is a logic
/// error to send a notification using an unknown protocol.
///
/// This method will be removed in a future version.
pub fn send_sync_notification<'a>(
&'a self,
protocol_name: &[u8],
encoded_fallback_message: impl Into<Vec<u8>>,
message: impl Into<Vec<u8>>
) {
let mut lock = self.inner.sync_channel.lock();
let result = lock.try_send(NotificationsSinkMessage::Notification {
protocol_name: protocol_name.to_owned(),
encoded_fallback_message: encoded_fallback_message.into(),
message: message.into()
});
if result.is_err() {
// Cloning the `mpsc::Sender` guarantees the allocation of an extra spot in the
// buffer, and therefore that `try_send` will succeed.
let _result2 = lock.clone().try_send(NotificationsSinkMessage::ForceClose);
debug_assert!(_result2.map(|()| true).unwrap_or_else(|err| err.is_disconnected()));
}
}
/// Wait until the remote is ready to accept a notification.
///
/// Returns an error in the case where the connection is closed.
///
/// The protocol name is expected to be checked ahead of calling this method. It is a logic
/// error to send a notification using an unknown protocol.
pub async fn reserve_notification<'a>(&'a self, protocol_name: &[u8]) -> Result<Ready<'a>, ()> {
let mut lock = self.inner.async_channel.lock().await;
let poll_ready = future::poll_fn(|cx| lock.poll_ready(cx)).await;
if poll_ready.is_ok() {
Ok(Ready { protocol_name: protocol_name.to_owned(), lock })
} else {
Err(())
}
}
}
/// Notification slot is reserved and the notification can actually be sent.
#[must_use]
#[derive(Debug)]
pub struct Ready<'a> {
/// Guarded channel. The channel inside is guaranteed to not be full.
lock: FuturesMutexGuard<'a, mpsc::Sender<NotificationsSinkMessage>>,
/// Name of the protocol. Should match one of the protocols passed at initialization.
protocol_name: Vec<u8>,
}
impl<'a> Ready<'a> {
/// Consumes this slots reservation and actually queues the notification.
///
/// Returns an error if the substream has been closed.
pub fn send(
mut self,
encoded_fallback_message: impl Into<Vec<u8>>,
notification: impl Into<Vec<u8>>
) -> Result<(), ()> {
self.lock.start_send(NotificationsSinkMessage::Notification {
protocol_name: self.protocol_name,
encoded_fallback_message: encoded_fallback_message.into(),
message: notification.into(),
}).map_err(|_| ())
}
}
/// Error specific to the collection of protocols.
#[derive(Debug, derive_more::Display, derive_more::Error)]
pub enum NotifsHandlerError {
/// Channel of synchronous notifications is full.
SyncNotificationsClogged,
/// Error in legacy protocol.
Legacy(<LegacyProtoHandler as ProtocolsHandler>::Error),
}
impl NotifsHandlerProto {
/// Builds a new handler.
///
/// `list` is a list of notification protocols names, and the message to send as part of the
/// handshake. At the moment, the message is always the same whether we open a substream
/// ourselves or respond to handshake from the remote.
///
/// The `queue_size_report` is an optional Prometheus metric that can report the size of the
/// messages queue. If passed, it must have one label for the protocol name.
pub fn new(
legacy: RegisteredProtocol,
list: impl Into<Vec<(Cow<'static, [u8]>, Arc<RwLock<Vec<u8>>>)>>,
queue_size_report: Option<HistogramVec>
) -> Self {
let list = list.into();
@@ -247,16 +387,7 @@ impl NotifsHandlerProto {
.clone()
.into_iter()
.map(|(proto_name, initial_message)| {
let queue_size_report = queue_size_report.as_ref().and_then(|qs| {
if let Ok(utf8) = str::from_utf8(&proto_name) {
Some(qs.with_label_values(&[utf8]))
} else {
log::warn!("Ignoring Prometheus metric because {:?} isn't UTF-8", proto_name);
None
}
});
(NotifsOutHandlerProto::new(proto_name, queue_size_report), initial_message)
(NotifsOutHandlerProto::new(proto_name), initial_message)
}).collect();
let in_handlers = list.clone()
@@ -275,13 +406,7 @@ impl NotifsHandlerProto {
impl ProtocolsHandler for NotifsHandler {
type InEvent = NotifsHandlerIn;
type OutEvent = NotifsHandlerOut;
type Error = EitherError<
EitherError<
<NotifsInHandler as ProtocolsHandler>::Error,
<NotifsOutHandler as ProtocolsHandler>::Error,
>,
<LegacyProtoHandler as ProtocolsHandler>::Error,
>;
type Error = NotifsHandlerError;
type InboundProtocol = SelectUpgrade<UpgradeCollec<NotificationsIn>, RegisteredProtocol>;
type OutboundProtocol = EitherUpgrade<NotificationsOut, RegisteredProtocol>;
// Index within the `out_handlers`; None for legacy
@@ -363,24 +488,6 @@ impl ProtocolsHandler for NotifsHandler {
self.in_handlers[num].0.inject_event(NotifsInHandlerIn::Refuse);
}
},
NotifsHandlerIn::SendLegacy { message } =>
self.legacy.inject_event(LegacyProtoHandlerIn::SendCustomMessage { message }),
NotifsHandlerIn::SendNotification { message, encoded_fallback_message, protocol_name } => {
for (handler, _) in &mut self.out_handlers {
if handler.protocol_name() != &protocol_name[..] {
continue;
}
if handler.is_open() {
handler.inject_event(NotifsOutHandlerIn::Send(message));
return;
}
}
self.legacy.inject_event(LegacyProtoHandlerIn::SendCustomMessage {
message: encoded_fallback_message,
});
},
}
}
@@ -461,6 +568,60 @@ impl ProtocolsHandler for NotifsHandler {
) -> Poll<
ProtocolsHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::OutEvent, Self::Error>
> {
if let Some(notifications_sink_rx) = &mut self.notifications_sink_rx {
'poll_notifs_sink: loop {
// Before we poll the notifications sink receiver, check that all the notification
// channels are ready to send a message.
// TODO: it is planned that in the future we switch to one `NotificationsSink` per
// protocol, in which case each sink should wait only for its corresponding handler
// to be ready, and not all handlers
// see https://github.com/paritytech/substrate/issues/5670
for (out_handler, _) in &mut self.out_handlers {
match out_handler.poll_ready(cx) {
Poll::Ready(_) => {},
Poll::Pending => break 'poll_notifs_sink,
}
}
let message = match notifications_sink_rx.poll_next_unpin(cx) {
Poll::Ready(Some(msg)) => msg,
Poll::Ready(None) | Poll::Pending => break,
};
match message {
NotificationsSinkMessage::Legacy { message } => {
self.legacy.inject_event(LegacyProtoHandlerIn::SendCustomMessage {
message
});
}
NotificationsSinkMessage::Notification {
protocol_name,
encoded_fallback_message,
message
} => {
for (handler, _) in &mut self.out_handlers {
if handler.protocol_name() != &protocol_name[..] {
continue;
}
if handler.is_open() {
handler.send_or_discard(message);
}
continue 'poll_notifs_sink;
}
self.legacy.inject_event(LegacyProtoHandlerIn::SendCustomMessage {
message: encoded_fallback_message,
});
}
NotificationsSinkMessage::ForceClose => {
return Poll::Ready(ProtocolsHandlerEvent::Close(NotifsHandlerError::SyncNotificationsClogged));
}
}
}
}
if let Poll::Ready(ev) = self.legacy.poll(cx) {
return match ev {
ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol, info: () } =>
@@ -468,14 +629,37 @@ impl ProtocolsHandler for NotifsHandler {
protocol: protocol.map_upgrade(EitherUpgrade::B),
info: None,
}),
ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::CustomProtocolOpen { endpoint, received_handshake, .. }) =>
ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::CustomProtocolOpen {
endpoint,
received_handshake,
..
}) => {
let (async_tx, async_rx) = mpsc::channel(ASYNC_NOTIFICATIONS_BUFFER_SIZE);
let (sync_tx, sync_rx) = mpsc::channel(SYNC_NOTIFICATIONS_BUFFER_SIZE);
let notifications_sink = NotificationsSink {
inner: Arc::new(NotificationsSinkInner {
async_channel: FuturesMutex::new(async_tx),
sync_channel: Mutex::new(sync_tx),
}),
};
debug_assert!(self.notifications_sink_rx.is_none());
self.notifications_sink_rx = Some(stream::select(async_rx.fuse(), sync_rx.fuse()));
Poll::Ready(ProtocolsHandlerEvent::Custom(
NotifsHandlerOut::Open { endpoint, received_handshake }
)),
ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::CustomProtocolClosed { endpoint, reason }) =>
NotifsHandlerOut::Open { endpoint, received_handshake, notifications_sink }
))
},
ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::CustomProtocolClosed { endpoint, reason }) => {
// We consciously drop the receivers despite notifications being potentially
// still buffered up.
debug_assert!(self.notifications_sink_rx.is_some());
self.notifications_sink_rx = None;
Poll::Ready(ProtocolsHandlerEvent::Custom(
NotifsHandlerOut::Closed { endpoint, reason }
)),
))
},
ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::CustomMessage { message }) =>
Poll::Ready(ProtocolsHandlerEvent::Custom(
NotifsHandlerOut::CustomMessage { message }
@@ -485,7 +669,7 @@ impl ProtocolsHandler for NotifsHandler {
NotifsHandlerOut::ProtocolError { is_severe, error }
)),
ProtocolsHandlerEvent::Close(err) =>
Poll::Ready(ProtocolsHandlerEvent::Close(EitherError::B(err))),
Poll::Ready(ProtocolsHandlerEvent::Close(NotifsHandlerError::Legacy(err))),
}
}
@@ -34,8 +34,10 @@ use libp2p::swarm::{
NegotiatedSubstream,
};
use log::{debug, warn, error};
use prometheus_endpoint::Histogram;
use std::{borrow::Cow, collections::VecDeque, fmt, mem, pin::Pin, task::{Context, Poll}, time::Duration};
use std::{
borrow::Cow, collections::VecDeque, fmt, mem, pin::Pin, task::{Context, Poll, Waker},
time::Duration
};
use wasm_timer::Instant;
/// Maximum duration to open a substream and receive the handshake message. After that, we
@@ -56,17 +58,14 @@ const INITIAL_KEEPALIVE_TIME: Duration = Duration::from_secs(5);
pub struct NotifsOutHandlerProto {
/// Name of the protocol to negotiate.
protocol_name: Cow<'static, [u8]>,
/// Optional Prometheus histogram to report message queue size variations.
queue_size_report: Option<Histogram>,
}
impl NotifsOutHandlerProto {
/// Builds a new [`NotifsOutHandlerProto`]. Will use the given protocol name for the
/// notifications substream.
pub fn new(protocol_name: impl Into<Cow<'static, [u8]>>, queue_size_report: Option<Histogram>) -> Self {
pub fn new(protocol_name: impl Into<Cow<'static, [u8]>>) -> Self {
NotifsOutHandlerProto {
protocol_name: protocol_name.into(),
queue_size_report,
}
}
}
@@ -78,14 +77,12 @@ impl IntoProtocolsHandler for NotifsOutHandlerProto {
DeniedUpgrade
}
fn into_handler(self, peer_id: &PeerId, _: &ConnectedPoint) -> Self::Handler {
fn into_handler(self, _: &PeerId, _: &ConnectedPoint) -> Self::Handler {
NotifsOutHandler {
protocol_name: self.protocol_name,
when_connection_open: Instant::now(),
queue_size_report: self.queue_size_report,
state: State::Disabled,
events_queue: VecDeque::new(),
peer_id: peer_id.clone(),
}
}
}
@@ -108,17 +105,11 @@ pub struct NotifsOutHandler {
/// When the connection with the remote has been successfully established.
when_connection_open: Instant,
/// Optional prometheus histogram to report message queue sizes variations.
queue_size_report: Option<Histogram>,
/// Queue of events to send to the outside.
///
/// This queue must only ever be modified to insert elements at the back, or remove the first
/// element.
events_queue: VecDeque<ProtocolsHandlerEvent<NotificationsOut, (), NotifsOutHandlerOut, void::Void>>,
/// Who we are connected to.
peer_id: PeerId,
}
/// Our relationship with the node we're connected to.
@@ -153,6 +144,11 @@ enum State {
Open {
/// Substream that is currently open.
substream: NotificationsOutSubstream<NegotiatedSubstream>,
/// Waker for the last task that got `Poll::Pending` from `poll_ready`, to notify
/// when the open substream closes due to being disabled or encountering an
/// error, i.e. to notify the task as soon as the substream becomes unavailable,
/// without waiting for an underlying I/O task wakeup.
close_waker: Option<Waker>,
/// The initial message that we sent. Necessary if we need to re-open a substream.
initial_message: Vec<u8>,
},
@@ -173,11 +169,6 @@ pub enum NotifsOutHandlerIn {
/// Disables the notifications substream for this node. This is the default state.
Disable,
/// Sends a message on the notifications substream. Ignored if the substream isn't open.
///
/// It is only valid to send this if the notifications substream has been enabled.
Send(Vec<u8>),
}
/// Event that can be emitted by a `NotifsOutHandler`.
@@ -216,6 +207,41 @@ impl NotifsOutHandler {
pub fn protocol_name(&self) -> &[u8] {
&self.protocol_name
}
/// Polls whether the outbound substream is ready to send a notification.
///
/// - Returns `Poll::Pending` if the substream is open but not ready to send a notification.
/// - Returns `Poll::Ready(true)` if the substream is ready to send a notification.
/// - Returns `Poll::Ready(false)` if the substream is closed.
///
pub fn poll_ready(&mut self, cx: &mut Context) -> Poll<bool> {
if let State::Open { substream, close_waker, .. } = &mut self.state {
match substream.poll_ready_unpin(cx) {
Poll::Ready(Ok(())) => Poll::Ready(true),
Poll::Ready(Err(_)) => Poll::Ready(false),
Poll::Pending => {
*close_waker = Some(cx.waker().clone());
Poll::Pending
}
}
} else {
Poll::Ready(false)
}
}
/// Sends out a notification.
///
/// If the substream is closed, or not ready to send out a notification yet, then the
/// notification is silently discarded.
///
/// You are encouraged to call [`NotifsOutHandler::poll_ready`] beforehand to determine
/// whether this will succeed. If `Poll::Ready(true)` is returned, then this method will send
/// out a notification.
pub fn send_or_discard(&mut self, notification: Vec<u8>) {
if let State::Open { substream, .. } = &mut self.state {
let _ = substream.start_send_unpin(notification);
}
}
}
impl ProtocolsHandler for NotifsOutHandler {
@@ -247,7 +273,7 @@ impl ProtocolsHandler for NotifsOutHandler {
State::Opening { initial_message } => {
let ev = NotifsOutHandlerOut::Open { handshake: handshake_msg };
self.events_queue.push_back(ProtocolsHandlerEvent::Custom(ev));
self.state = State::Open { substream, initial_message };
self.state = State::Open { substream, initial_message, close_waker: None };
},
// If the handler was disabled while we were negotiating the protocol, immediately
// close it.
@@ -310,31 +336,15 @@ impl ProtocolsHandler for NotifsOutHandler {
}
State::Opening { .. } => self.state = State::DisabledOpening,
State::Refused => self.state = State::Disabled,
State::Open { substream, .. } => self.state = State::DisabledOpen(substream),
State::Open { substream, close_waker, .. } => {
if let Some(close_waker) = close_waker {
close_waker.wake();
}
self.state = State::DisabledOpen(substream)
},
State::Poisoned => error!("☎️ Notifications handler in a poisoned state"),
}
}
NotifsOutHandlerIn::Send(msg) =>
if let State::Open { substream, .. } = &mut self.state {
if substream.push_message(msg).is_err() {
warn!(
target: "sub-libp2p",
"📞 Notifications queue with peer {} is full, dropped message (protocol: {:?})",
self.peer_id,
self.protocol_name,
);
}
if let Some(metric) = &self.queue_size_report {
metric.observe(substream.queue_len() as f64);
}
} else {
// This is an API misuse.
warn!(
target: "sub-libp2p",
"📞 Tried to send a notification on a disabled handler"
);
},
}
}
@@ -375,10 +385,14 @@ impl ProtocolsHandler for NotifsOutHandler {
}
match &mut self.state {
State::Open { substream, initial_message } =>
State::Open { substream, initial_message, close_waker } =>
match Sink::poll_flush(Pin::new(substream), cx) {
Poll::Pending | Poll::Ready(Ok(())) => {},
Poll::Ready(Err(_)) => {
if let Some(close_waker) = close_waker.take() {
close_waker.wake();
}
// We try to re-open a substream.
let initial_message = mem::replace(initial_message, Vec::new());
self.state = State::Opening { initial_message: initial_message.clone() };
@@ -83,7 +83,7 @@ fn build_nodes() -> (Swarm<CustomProtoWithAddr>, Swarm<CustomProtoWithAddr>) {
});
let behaviour = CustomProtoWithAddr {
inner: GenericProto::new(local_peer_id, &b"test"[..], &[1], vec![], peerset, None),
inner: GenericProto::new(local_peer_id, &b"test"[..], &[1], vec![], peerset),
addrs: addrs
.iter()
.enumerate()
@@ -221,9 +221,10 @@ fn two_nodes_transfer_lots_of_packets() {
// We spawn two nodes, then make the first one send lots of packets to the second one. The test
// ends when the second one has received all of them.
// Note that if we go too high, we will reach the limit to the number of simultaneous
// substreams allowed by the multiplexer.
const NUM_PACKETS: u32 = 5000;
// This test consists in transferring this given number of packets. Considering that (by
// design) the connection gets closed if one of the remotes can't follow the pace, this number
// should not exceed the size of the buffer of pending notifications.
const NUM_PACKETS: u32 = 512;
let (mut service1, mut service2) = build_nodes();
@@ -174,7 +174,7 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin {
}
// Indicating that the remote is clogged if that's the case.
if self.send_queue.len() >= 2048 {
if self.send_queue.len() >= 1536 {
if !self.clogged_fuse {
// Note: this fuse is important not just for preventing us from flooding the logs;
// if you remove the fuse, then we will always return early from this function and
@@ -34,17 +34,15 @@
///
use bytes::BytesMut;
use futures::{prelude::*, ready};
use futures::prelude::*;
use futures_codec::Framed;
use libp2p::core::{UpgradeInfo, InboundUpgrade, OutboundUpgrade, upgrade};
use log::error;
use std::{borrow::Cow, collections::VecDeque, convert::TryFrom as _, io, iter, mem, pin::Pin, task::{Context, Poll}};
use std::{borrow::Cow, io, iter, mem, pin::Pin, task::{Context, Poll}};
use unsigned_varint::codec::UviBytes;
/// Maximum allowed size of the two handshake messages, in bytes.
const MAX_HANDSHAKE_SIZE: usize = 1024;
/// Maximum number of buffered messages before we refuse to accept more.
const MAX_PENDING_MESSAGES: usize = 512;
/// Upgrade that accepts a substream, sends back a status message, then becomes a unidirectional
/// stream of messages.
@@ -93,10 +91,6 @@ pub struct NotificationsOutSubstream<TSubstream> {
/// Substream where to send messages.
#[pin]
socket: Framed<TSubstream, UviBytes<io::Cursor<Vec<u8>>>>,
/// Queue of messages waiting to be sent.
messages_queue: VecDeque<Vec<u8>>,
/// If true, we need to flush `socket`.
need_flush: bool,
}
impl NotificationsIn {
@@ -272,80 +266,38 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + 'static,
Ok((handshake, NotificationsOutSubstream {
socket: Framed::new(socket, UviBytes::default()),
messages_queue: VecDeque::with_capacity(MAX_PENDING_MESSAGES),
need_flush: false,
}))
})
}
}
impl<TSubstream> NotificationsOutSubstream<TSubstream> {
/// Returns the number of items in the queue, capped to `u32::max_value()`.
pub fn queue_len(&self) -> u32 {
u32::try_from(self.messages_queue.len()).unwrap_or(u32::max_value())
}
/// Push a message to the queue of messages.
///
/// This has the same effect as the `Sink::start_send` implementation.
pub fn push_message(&mut self, item: Vec<u8>) -> Result<(), NotificationsOutError> {
if self.messages_queue.len() >= MAX_PENDING_MESSAGES {
return Err(NotificationsOutError::Clogged);
}
self.messages_queue.push_back(item);
Ok(())
}
}
impl<TSubstream> Sink<Vec<u8>> for NotificationsOutSubstream<TSubstream>
where TSubstream: AsyncRead + AsyncWrite + Unpin,
{
type Error = NotificationsOutError;
fn poll_ready(self: Pin<&mut Self>, _: &mut Context) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
let mut this = self.project();
Sink::poll_ready(this.socket.as_mut(), cx)
.map_err(NotificationsOutError::Io)
}
fn start_send(mut self: Pin<&mut Self>, item: Vec<u8>) -> Result<(), Self::Error> {
self.push_message(item)
fn start_send(self: Pin<&mut Self>, item: Vec<u8>) -> Result<(), Self::Error> {
let mut this = self.project();
Sink::start_send(this.socket.as_mut(), io::Cursor::new(item))
.map_err(NotificationsOutError::Io)
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
let mut this = self.project();
while !this.messages_queue.is_empty() {
match Sink::poll_ready(this.socket.as_mut(), cx) {
Poll::Ready(Err(err)) => return Poll::Ready(Err(From::from(err))),
Poll::Ready(Ok(())) => {
let msg = this.messages_queue.pop_front()
.expect("checked for !is_empty above; qed");
Sink::start_send(this.socket.as_mut(), io::Cursor::new(msg))?;
*this.need_flush = true;
},
Poll::Pending => return Poll::Pending,
}
}
if *this.need_flush {
match Sink::poll_flush(this.socket.as_mut(), cx) {
Poll::Ready(Err(err)) => return Poll::Ready(Err(From::from(err))),
Poll::Ready(Ok(())) => *this.need_flush = false,
Poll::Pending => return Poll::Pending,
}
}
Poll::Ready(Ok(()))
Sink::poll_flush(this.socket.as_mut(), cx)
.map_err(NotificationsOutError::Io)
}
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
ready!(Sink::poll_flush(self.as_mut(), cx))?;
let this = self.project();
match Sink::poll_close(this.socket, cx) {
Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
Poll::Ready(Err(err)) => Poll::Ready(Err(From::from(err))),
Poll::Pending => Poll::Pending,
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
let mut this = self.project();
Sink::poll_close(this.socket.as_mut(), cx)
.map_err(NotificationsOutError::Io)
}
}
@@ -386,13 +338,6 @@ impl From<unsigned_varint::io::ReadError> for NotificationsHandshakeError {
pub enum NotificationsOutError {
/// I/O error on the substream.
Io(io::Error),
/// Remote doesn't process our messages quickly enough.
///
/// > **Note**: This is not necessarily the remote's fault, and could also be caused by the
/// > local node sending data too quickly. Properly doing back-pressure, however,
/// > would require a deep refactoring effort in Substrate as a whole.
Clogged,
}
#[cfg(test)]
@@ -402,7 +347,6 @@ mod tests {
use async_std::net::{TcpListener, TcpStream};
use futures::{prelude::*, channel::oneshot};
use libp2p::core::upgrade;
use std::pin::Pin;
#[test]
fn basic_works() {
@@ -582,57 +526,4 @@ mod tests {
async_std::task::block_on(client);
}
#[test]
fn buffer_is_full_closes_connection() {
const PROTO_NAME: &'static [u8] = b"/test/proto/1";
let (listener_addr_tx, listener_addr_rx) = oneshot::channel();
let client = async_std::task::spawn(async move {
let socket = TcpStream::connect(listener_addr_rx.await.unwrap()).await.unwrap();
let (handshake, mut substream) = upgrade::apply_outbound(
socket,
NotificationsOut::new(PROTO_NAME, vec![]),
upgrade::Version::V1
).await.unwrap();
assert!(handshake.is_empty());
// Push an item and flush so that the test works.
substream.send(b"hello world".to_vec()).await.unwrap();
for _ in 0..32768 {
// Push an item on the sink without flushing until an error happens because the
// buffer is full.
let message = b"hello world!".to_vec();
if future::poll_fn(|cx| Sink::poll_ready(Pin::new(&mut substream), cx)).await.is_err() {
return Ok(());
}
if Sink::start_send(Pin::new(&mut substream), message).is_err() {
return Ok(());
}
}
Err(())
});
async_std::task::block_on(async move {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
listener_addr_tx.send(listener.local_addr().unwrap()).unwrap();
let (socket, _) = listener.accept().await.unwrap();
let (initial_message, mut substream) = upgrade::apply_inbound(
socket,
NotificationsIn::new(PROTO_NAME)
).await.unwrap();
assert!(initial_message.is_empty());
substream.send_handshake(vec![]);
// Process one message so that the handshake and all works.
let _ = substream.next().await.unwrap().unwrap();
client.await.unwrap();
});
}
}