Use inbound peerslot slots when a substream is received, rather than a connection (#7464)

* Use inbound peerslot slots when a substream is received, rather than a connection

* Refactor PeerState

* Some bugfixes

* Fix warnings so that CI runs, gmlrlblbl

* Bugfixes

* Update docs

* Apply suggestions from code review

Co-authored-by: Roman Borschel <romanb@users.noreply.github.com>

* Clean up Banned state

* Refactor connections state

* Fix possibility of Enabled with no Opening or Open connection

* Line width

* Add some debug_asserts! and fix TODO

* Refactor legacy handler

* Rewrite group.rs entirely [part 1]

* Rewrite group.rs entirely [part 2]

* Remove faulty assertion

Because of the asynchronous nature of the behaviour <-> handler communications, it is possible to receive notifications while in the Closing state

* Don't poll the legacy substream is not Open

* Tolerate when not all substreams are accepted

* Remove TODOs

* Dummy commit to make CI log interesting things

* Try race condition fix

* Revert "Try race condition fix"

This reverts commit 0675c659d06195c30f8c5bc13e2d88141d57a3ba.

* Correctly rebuild pending_opening

* Minor tweaks

* Printlns for CI debugging

* Revert "Printlns for CI debugging"

This reverts commit e7852a231f4fc418898767aaa27c9a4358e12e8b.

* Revert "Dummy commit to make CI log interesting things"

This reverts commit 259ddd74088e53e7c6a9b0a62a8d1573a0063ce3.

* mv group.rs ../handler.rs

* Apply suggestions from code review

Co-authored-by: Max Inden <mail@max-inden.de>

* Banned => Backoff

* Mention the actual PeerStates

* OpenDesired -> OpenDesiredByRemote

* OpeningThenClosing

* Add doc links to PeerState

* Simplify increment logic

* One more debug_assert

* debug_assert!

* OpenDesiredByRemote

* Update client/network/src/protocol/generic_proto/behaviour.rs

Co-authored-by: Max Inden <mail@max-inden.de>

Co-authored-by: Roman Borschel <romanb@users.noreply.github.com>
Co-authored-by: Max Inden <mail@max-inden.de>
This commit is contained in:
Pierre Krieger
2020-11-16 16:46:36 +01:00
committed by GitHub
parent 2f5f4fe858
commit 80a74acdd1
10 changed files with 2227 additions and 2630 deletions
@@ -21,7 +21,7 @@
//! network, then performs the Substrate protocol handling on top.
pub use self::behaviour::{GenericProto, GenericProtoOut};
pub use self::handler::{NotifsHandlerError, NotificationsSink, Ready, LegacyConnectionKillError};
pub use self::handler::{NotifsHandlerError, NotificationsSink, Ready};
mod behaviour;
mod handler;
File diff suppressed because it is too large Load Diff
File diff suppressed because it is too large Load Diff
@@ -1,737 +0,0 @@
// Copyright 2019-2020 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
//! Implementations of the `IntoProtocolsHandler` and `ProtocolsHandler` traits for both incoming
//! and outgoing substreams for all gossiping protocols together.
//!
//! This is the main implementation of `ProtocolsHandler` in this crate, that handles all the
//! protocols that are Substrate-related and outside of the scope of libp2p.
//!
//! # Usage
//!
//! The handler can be in one of the following states: `Initial`, `Enabled`, `Disabled`.
//!
//! The `Initial` state is the state that the handler initially is in. It is a temporary state
//! during which the user must either enable or disable the handler. After that, the handler stays
//! either enabled or disabled.
//!
//! On the wire, we try to open the following substreams:
//!
//! - One substream for each notification protocol passed as parameter to the
//! `NotifsHandlerProto::new` function.
//! - One "legacy" substream used for anything non-related to gossiping, and used as a fallback
//! in case the notification protocol can't be opened.
//!
//! When the handler is in the `Enabled` state, we immediately open and try to maintain all the
//! aforementioned substreams. When the handler is in the `Disabled` state, we immediately close
//! (or abort opening) all these substreams. It is intended that in the future we allow states in
//! which some protocols are open and not others. Symmetrically, we allow incoming
//! Substrate-related substreams if and only if we are in the `Enabled` state.
//!
//! The user has the choice between sending a message with `SendNotification`, to send a
//! notification, and `SendLegacy`, to send any other kind of message.
//!
use crate::protocol::generic_proto::{
handler::legacy::{LegacyProtoHandler, LegacyProtoHandlerProto, LegacyProtoHandlerIn, LegacyProtoHandlerOut},
handler::notif_in::{NotifsInHandlerProto, NotifsInHandler, NotifsInHandlerIn, NotifsInHandlerOut},
handler::notif_out::{NotifsOutHandlerProto, NotifsOutHandler, NotifsOutHandlerIn, NotifsOutHandlerOut},
upgrade::{NotificationsIn, NotificationsOut, NotificationsHandshakeError, RegisteredProtocol, UpgradeCollec},
};
use bytes::BytesMut;
use libp2p::core::{either::EitherOutput, ConnectedPoint, PeerId};
use libp2p::core::upgrade::{UpgradeError, SelectUpgrade, InboundUpgrade, OutboundUpgrade};
use libp2p::swarm::{
ProtocolsHandler, ProtocolsHandlerEvent,
IntoProtocolsHandler,
KeepAlive,
ProtocolsHandlerUpgrErr,
SubstreamProtocol,
NegotiatedSubstream,
};
use futures::{
channel::mpsc,
lock::{Mutex as FuturesMutex, MutexGuard as FuturesMutexGuard},
prelude::*
};
use log::{debug, error};
use parking_lot::{Mutex, RwLock};
use std::{borrow::Cow, str, sync::Arc, task::{Context, Poll}};
/// Number of pending notifications in asynchronous contexts.
/// See [`NotificationsSink::reserve_notification`] for context.
const ASYNC_NOTIFICATIONS_BUFFER_SIZE: usize = 8;
/// Number of pending notifications in synchronous contexts.
const SYNC_NOTIFICATIONS_BUFFER_SIZE: usize = 2048;
/// Implements the `IntoProtocolsHandler` trait of libp2p.
///
/// Every time a connection with a remote starts, an instance of this struct is created and
/// sent to a background task dedicated to this connection. Once the connection is established,
/// it is turned into a [`NotifsHandler`].
///
/// See the documentation at the module level for more information.
pub struct NotifsHandlerProto {
/// Prototypes for handlers for inbound substreams, and the message we respond with in the
/// handshake.
in_handlers: Vec<(NotifsInHandlerProto, Arc<RwLock<Vec<u8>>>)>,
/// Prototypes for handlers for outbound substreams, and the initial handshake message we send.
out_handlers: Vec<(NotifsOutHandlerProto, Arc<RwLock<Vec<u8>>>)>,
/// Prototype for handler for backwards-compatibility.
legacy: LegacyProtoHandlerProto,
}
/// The actual handler once the connection has been established.
///
/// See the documentation at the module level for more information.
pub struct NotifsHandler {
/// Handlers for inbound substreams, and the message we respond with in the handshake.
in_handlers: Vec<(NotifsInHandler, Arc<RwLock<Vec<u8>>>)>,
/// Handlers for outbound substreams, and the initial handshake message we send.
out_handlers: Vec<(NotifsOutHandler, Arc<RwLock<Vec<u8>>>)>,
/// Whether we are the connection dialer or listener.
endpoint: ConnectedPoint,
/// Handler for backwards-compatibility.
legacy: LegacyProtoHandler,
/// In the situation where either the legacy substream has been opened or the handshake-bearing
/// notifications protocol is open, but we haven't sent out any [`NotifsHandlerOut::Open`]
/// event yet, this contains the received handshake waiting to be reported through the
/// external API.
pending_handshake: Option<Vec<u8>>,
/// State of this handler.
enabled: EnabledState,
/// If we receive inbound substream requests while in initialization mode,
/// we push the corresponding index here and process them when the handler
/// gets enabled/disabled.
pending_in: Vec<usize>,
/// If `Some`, contains the two `Receiver`s connected to the [`NotificationsSink`] that has
/// been sent out. The notifications to send out can be pulled from this receivers.
/// We use two different channels in order to have two different channel sizes, but from the
/// receiving point of view, the two channels are the same.
/// The receivers are fused in case the user drops the [`NotificationsSink`] entirely.
///
/// Contains `Some` if and only if it has been reported to the user that the substreams are
/// open.
notifications_sink_rx: Option<
stream::Select<
stream::Fuse<mpsc::Receiver<NotificationsSinkMessage>>,
stream::Fuse<mpsc::Receiver<NotificationsSinkMessage>>
>
>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
enum EnabledState {
Initial,
Enabled,
Disabled,
}
impl IntoProtocolsHandler for NotifsHandlerProto {
type Handler = NotifsHandler;
fn inbound_protocol(&self) -> SelectUpgrade<UpgradeCollec<NotificationsIn>, RegisteredProtocol> {
let in_handlers = self.in_handlers.iter()
.map(|(h, _)| h.inbound_protocol())
.collect::<UpgradeCollec<_>>();
SelectUpgrade::new(in_handlers, self.legacy.inbound_protocol())
}
fn into_handler(self, remote_peer_id: &PeerId, connected_point: &ConnectedPoint) -> Self::Handler {
NotifsHandler {
in_handlers: self.in_handlers
.into_iter()
.map(|(proto, msg)| (proto.into_handler(remote_peer_id, connected_point), msg))
.collect(),
out_handlers: self.out_handlers
.into_iter()
.map(|(proto, msg)| (proto.into_handler(remote_peer_id, connected_point), msg))
.collect(),
endpoint: connected_point.clone(),
legacy: self.legacy.into_handler(remote_peer_id, connected_point),
pending_handshake: None,
enabled: EnabledState::Initial,
pending_in: Vec::new(),
notifications_sink_rx: None,
}
}
}
/// Event that can be received by a `NotifsHandler`.
#[derive(Debug, Clone)]
pub enum NotifsHandlerIn {
/// The node should start using custom protocols.
Enable,
/// The node should stop using custom protocols.
Disable,
}
/// Event that can be emitted by a `NotifsHandler`.
#[derive(Debug)]
pub enum NotifsHandlerOut {
/// The connection is open for custom protocols.
Open {
/// The endpoint of the connection that is open for custom protocols.
endpoint: ConnectedPoint,
/// Handshake that was sent to us.
/// This is normally a "Status" message, but this out of the concern of this code.
received_handshake: Vec<u8>,
/// How notifications can be sent to this node.
notifications_sink: NotificationsSink,
},
/// The connection is closed for custom protocols.
Closed {
/// The reason for closing, for diagnostic purposes.
reason: Cow<'static, str>,
/// The endpoint of the connection that closed for custom protocols.
endpoint: ConnectedPoint,
},
/// Received a non-gossiping message on the legacy substream.
CustomMessage {
/// Message that has been received.
///
/// Keep in mind that this can be a `ConsensusMessage` message, which then contains a
/// notification.
message: BytesMut,
},
/// Received a message on a custom protocol substream.
Notification {
/// Name of the protocol of the message.
protocol_name: Cow<'static, str>,
/// Message that has been received.
message: BytesMut,
},
}
/// Sink connected directly to the node background task. Allows sending notifications to the peer.
///
/// Can be cloned in order to obtain multiple references to the same peer.
#[derive(Debug, Clone)]
pub struct NotificationsSink {
inner: Arc<NotificationsSinkInner>,
}
#[derive(Debug)]
struct NotificationsSinkInner {
/// Sender to use in asynchronous contexts. Uses an asynchronous mutex.
async_channel: FuturesMutex<mpsc::Sender<NotificationsSinkMessage>>,
/// Sender to use in synchronous contexts. Uses a synchronous mutex.
/// This channel has a large capacity and is meant to be used in contexts where
/// back-pressure cannot be properly exerted.
/// It will be removed in a future version.
sync_channel: Mutex<mpsc::Sender<NotificationsSinkMessage>>,
}
/// Message emitted through the [`NotificationsSink`] and processed by the background task
/// dedicated to the peer.
#[derive(Debug)]
enum NotificationsSinkMessage {
/// Message emitted by [`NotificationsSink::reserve_notification`] and
/// [`NotificationsSink::write_notification_now`].
Notification {
protocol_name: Cow<'static, str>,
message: Vec<u8>,
},
/// Must close the connection.
ForceClose,
}
impl NotificationsSink {
/// Sends a notification to the peer.
///
/// If too many messages are already buffered, the notification is silently discarded and the
/// connection to the peer will be closed shortly after.
///
/// The protocol name is expected to be checked ahead of calling this method. It is a logic
/// error to send a notification using an unknown protocol.
///
/// This method will be removed in a future version.
pub fn send_sync_notification<'a>(
&'a self,
protocol_name: Cow<'static, str>,
message: impl Into<Vec<u8>>
) {
let mut lock = self.inner.sync_channel.lock();
let result = lock.try_send(NotificationsSinkMessage::Notification {
protocol_name,
message: message.into()
});
if result.is_err() {
// Cloning the `mpsc::Sender` guarantees the allocation of an extra spot in the
// buffer, and therefore that `try_send` will succeed.
let _result2 = lock.clone().try_send(NotificationsSinkMessage::ForceClose);
debug_assert!(_result2.map(|()| true).unwrap_or_else(|err| err.is_disconnected()));
}
}
/// Wait until the remote is ready to accept a notification.
///
/// Returns an error in the case where the connection is closed.
///
/// The protocol name is expected to be checked ahead of calling this method. It is a logic
/// error to send a notification using an unknown protocol.
pub async fn reserve_notification<'a>(&'a self, protocol_name: Cow<'static, str>) -> Result<Ready<'a>, ()> {
let mut lock = self.inner.async_channel.lock().await;
let poll_ready = future::poll_fn(|cx| lock.poll_ready(cx)).await;
if poll_ready.is_ok() {
Ok(Ready { protocol_name: protocol_name, lock })
} else {
Err(())
}
}
}
/// Notification slot is reserved and the notification can actually be sent.
#[must_use]
#[derive(Debug)]
pub struct Ready<'a> {
/// Guarded channel. The channel inside is guaranteed to not be full.
lock: FuturesMutexGuard<'a, mpsc::Sender<NotificationsSinkMessage>>,
/// Name of the protocol. Should match one of the protocols passed at initialization.
protocol_name: Cow<'static, str>,
}
impl<'a> Ready<'a> {
/// Consumes this slots reservation and actually queues the notification.
///
/// Returns an error if the substream has been closed.
pub fn send(
mut self,
notification: impl Into<Vec<u8>>
) -> Result<(), ()> {
self.lock.start_send(NotificationsSinkMessage::Notification {
protocol_name: self.protocol_name,
message: notification.into(),
}).map_err(|_| ())
}
}
/// Error specific to the collection of protocols.
#[derive(Debug, derive_more::Display, derive_more::Error)]
pub enum NotifsHandlerError {
/// Channel of synchronous notifications is full.
SyncNotificationsClogged,
/// Error in legacy protocol.
Legacy(<LegacyProtoHandler as ProtocolsHandler>::Error),
}
impl NotifsHandlerProto {
/// Builds a new handler.
///
/// `list` is a list of notification protocols names, and the message to send as part of the
/// handshake. At the moment, the message is always the same whether we open a substream
/// ourselves or respond to handshake from the remote.
///
/// The first protocol in `list` is special-cased as the protocol that contains the handshake
/// to report through the [`NotifsHandlerOut::Open`] event.
///
/// # Panic
///
/// - Panics if `list` is empty.
///
pub fn new(
legacy: RegisteredProtocol,
list: impl Into<Vec<(Cow<'static, str>, Arc<RwLock<Vec<u8>>>)>>,
) -> Self {
let list = list.into();
assert!(!list.is_empty());
let out_handlers = list
.clone()
.into_iter()
.map(|(proto_name, initial_message)| {
(NotifsOutHandlerProto::new(proto_name), initial_message)
}).collect();
let in_handlers = list.clone()
.into_iter()
.map(|(proto_name, msg)| (NotifsInHandlerProto::new(proto_name), msg))
.collect();
NotifsHandlerProto {
in_handlers,
out_handlers,
legacy: LegacyProtoHandlerProto::new(legacy),
}
}
}
impl ProtocolsHandler for NotifsHandler {
type InEvent = NotifsHandlerIn;
type OutEvent = NotifsHandlerOut;
type Error = NotifsHandlerError;
type InboundProtocol = SelectUpgrade<UpgradeCollec<NotificationsIn>, RegisteredProtocol>;
type OutboundProtocol = NotificationsOut;
// Index within the `out_handlers`
type OutboundOpenInfo = usize;
type InboundOpenInfo = ();
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, ()> {
let in_handlers = self.in_handlers.iter()
.map(|(h, _)| h.listen_protocol().into_upgrade().1)
.collect::<UpgradeCollec<_>>();
let proto = SelectUpgrade::new(in_handlers, self.legacy.listen_protocol().into_upgrade().1);
SubstreamProtocol::new(proto, ())
}
fn inject_fully_negotiated_inbound(
&mut self,
out: <Self::InboundProtocol as InboundUpgrade<NegotiatedSubstream>>::Output,
(): ()
) {
match out {
EitherOutput::First((out, num)) =>
self.in_handlers[num].0.inject_fully_negotiated_inbound(out, ()),
EitherOutput::Second(out) =>
self.legacy.inject_fully_negotiated_inbound(out, ()),
}
}
fn inject_fully_negotiated_outbound(
&mut self,
out: <Self::OutboundProtocol as OutboundUpgrade<NegotiatedSubstream>>::Output,
num: Self::OutboundOpenInfo
) {
self.out_handlers[num].0.inject_fully_negotiated_outbound(out, ())
}
fn inject_event(&mut self, message: NotifsHandlerIn) {
match message {
NotifsHandlerIn::Enable => {
if let EnabledState::Enabled = self.enabled {
debug!("enabling already-enabled handler");
}
self.enabled = EnabledState::Enabled;
self.legacy.inject_event(LegacyProtoHandlerIn::Enable);
for (handler, initial_message) in &mut self.out_handlers {
// We create `initial_message` on a separate line to be sure that the lock
// is released as soon as possible.
let initial_message = initial_message.read().clone();
handler.inject_event(NotifsOutHandlerIn::Enable {
initial_message,
});
}
for num in self.pending_in.drain(..) {
// We create `handshake_message` on a separate line to be sure
// that the lock is released as soon as possible.
let handshake_message = self.in_handlers[num].1.read().clone();
self.in_handlers[num].0
.inject_event(NotifsInHandlerIn::Accept(handshake_message));
}
},
NotifsHandlerIn::Disable => {
if let EnabledState::Disabled = self.enabled {
debug!("disabling already-disabled handler");
}
self.legacy.inject_event(LegacyProtoHandlerIn::Disable);
// The notifications protocols start in the disabled state. If we were in the
// "Initial" state, then we shouldn't disable the notifications protocols again.
if self.enabled != EnabledState::Initial {
for (handler, _) in &mut self.out_handlers {
handler.inject_event(NotifsOutHandlerIn::Disable);
}
}
self.enabled = EnabledState::Disabled;
for num in self.pending_in.drain(..) {
self.in_handlers[num].0.inject_event(NotifsInHandlerIn::Refuse);
}
},
}
}
fn inject_dial_upgrade_error(
&mut self,
num: usize,
err: ProtocolsHandlerUpgrErr<NotificationsHandshakeError>
) {
match err {
ProtocolsHandlerUpgrErr::Timeout =>
self.out_handlers[num].0.inject_dial_upgrade_error(
(),
ProtocolsHandlerUpgrErr::Timeout
),
ProtocolsHandlerUpgrErr::Timer =>
self.out_handlers[num].0.inject_dial_upgrade_error(
(),
ProtocolsHandlerUpgrErr::Timer
),
ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(err)) =>
self.out_handlers[num].0.inject_dial_upgrade_error(
(),
ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(err))
),
ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(err)) =>
self.out_handlers[num].0.inject_dial_upgrade_error(
(),
ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(err))
),
}
}
fn connection_keep_alive(&self) -> KeepAlive {
// Iterate over each handler and return the maximum value.
let mut ret = self.legacy.connection_keep_alive();
if ret.is_yes() {
return KeepAlive::Yes;
}
for (handler, _) in &self.in_handlers {
let val = handler.connection_keep_alive();
if val.is_yes() {
return KeepAlive::Yes;
}
if ret < val { ret = val; }
}
for (handler, _) in &self.out_handlers {
let val = handler.connection_keep_alive();
if val.is_yes() {
return KeepAlive::Yes;
}
if ret < val { ret = val; }
}
ret
}
fn poll(
&mut self,
cx: &mut Context,
) -> Poll<
ProtocolsHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::OutEvent, Self::Error>
> {
if let Some(notifications_sink_rx) = &mut self.notifications_sink_rx {
'poll_notifs_sink: loop {
// Before we poll the notifications sink receiver, check that all the notification
// channels are ready to send a message.
// TODO: it is planned that in the future we switch to one `NotificationsSink` per
// protocol, in which case each sink should wait only for its corresponding handler
// to be ready, and not all handlers
// see https://github.com/paritytech/substrate/issues/5670
for (out_handler, _) in &mut self.out_handlers {
match out_handler.poll_ready(cx) {
Poll::Ready(_) => {},
Poll::Pending => break 'poll_notifs_sink,
}
}
let message = match notifications_sink_rx.poll_next_unpin(cx) {
Poll::Ready(Some(msg)) => msg,
Poll::Ready(None) | Poll::Pending => break,
};
match message {
NotificationsSinkMessage::Notification {
protocol_name,
message
} => {
let mut found_any_with_name = false;
for (handler, _) in &mut self.out_handlers {
if *handler.protocol_name() == protocol_name {
found_any_with_name = true;
if handler.is_open() {
handler.send_or_discard(message);
continue 'poll_notifs_sink;
}
}
}
// This code can be reached via the following scenarios:
//
// - User tried to send a notification on a non-existing protocol. This
// most likely relates to https://github.com/paritytech/substrate/issues/6827
// - User tried to send a notification to a peer we're not or no longer
// connected to. This happens in a normal scenario due to the racy nature
// of connections and disconnections, and is benign.
//
// We print a warning in the former condition.
if !found_any_with_name {
log::warn!(
target: "sub-libp2p",
"Tried to send a notification on non-registered protocol: {:?}",
protocol_name
);
}
}
NotificationsSinkMessage::ForceClose => {
return Poll::Ready(ProtocolsHandlerEvent::Close(NotifsHandlerError::SyncNotificationsClogged));
}
}
}
}
// If `self.pending_handshake` is `Some`, we are in a state where the handshake-bearing
// substream (either the legacy substream or the one special-cased as providing the
// handshake) is open but the user isn't aware yet of the substreams being open.
// When that is the case, neither the legacy substream nor the incoming notifications
// substreams should be polled, otherwise there is a risk of receiving messages from them.
if self.pending_handshake.is_none() {
while let Poll::Ready(ev) = self.legacy.poll(cx) {
match ev {
ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol, .. } =>
match *protocol.info() {},
ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::CustomProtocolOpen {
received_handshake,
..
}) => {
if self.notifications_sink_rx.is_none() {
debug_assert!(self.pending_handshake.is_none());
self.pending_handshake = Some(received_handshake);
}
cx.waker().wake_by_ref();
return Poll::Pending;
},
ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::CustomProtocolClosed { reason, .. }) => {
// We consciously drop the receivers despite notifications being potentially
// still buffered up.
self.notifications_sink_rx = None;
return Poll::Ready(ProtocolsHandlerEvent::Custom(
NotifsHandlerOut::Closed { endpoint: self.endpoint.clone(), reason }
))
},
ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::CustomMessage { message }) => {
return Poll::Ready(ProtocolsHandlerEvent::Custom(
NotifsHandlerOut::CustomMessage { message }
))
},
ProtocolsHandlerEvent::Close(err) =>
return Poll::Ready(ProtocolsHandlerEvent::Close(NotifsHandlerError::Legacy(err))),
}
}
}
for (handler_num, (handler, handshake_message)) in self.in_handlers.iter_mut().enumerate() {
loop {
let poll = if self.notifications_sink_rx.is_some() {
handler.poll(cx)
} else {
handler.poll_process(cx)
};
let ev = match poll {
Poll::Ready(e) => e,
Poll::Pending => break,
};
match ev {
ProtocolsHandlerEvent::OutboundSubstreamRequest { .. } =>
error!("Incoming substream handler tried to open a substream"),
ProtocolsHandlerEvent::Close(err) => void::unreachable(err),
ProtocolsHandlerEvent::Custom(NotifsInHandlerOut::OpenRequest(_)) =>
match self.enabled {
EnabledState::Initial => self.pending_in.push(handler_num),
EnabledState::Enabled => {
// We create `handshake_message` on a separate line to be sure
// that the lock is released as soon as possible.
let handshake_message = handshake_message.read().clone();
handler.inject_event(NotifsInHandlerIn::Accept(handshake_message))
},
EnabledState::Disabled =>
handler.inject_event(NotifsInHandlerIn::Refuse),
},
ProtocolsHandlerEvent::Custom(NotifsInHandlerOut::Closed) => {},
ProtocolsHandlerEvent::Custom(NotifsInHandlerOut::Notif(message)) => {
debug_assert!(self.pending_handshake.is_none());
if self.notifications_sink_rx.is_some() {
let msg = NotifsHandlerOut::Notification {
message,
protocol_name: handler.protocol_name().clone(),
};
return Poll::Ready(ProtocolsHandlerEvent::Custom(msg));
}
},
}
}
}
for (handler_num, (handler, _)) in self.out_handlers.iter_mut().enumerate() {
while let Poll::Ready(ev) = handler.poll(cx) {
match ev {
ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol } =>
return Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest {
protocol: protocol
.map_info(|()| handler_num),
}),
ProtocolsHandlerEvent::Close(err) => void::unreachable(err),
// Opened substream on the handshake-bearing notification protocol.
ProtocolsHandlerEvent::Custom(NotifsOutHandlerOut::Open { handshake })
if handler_num == 0 =>
{
if self.notifications_sink_rx.is_none() && self.pending_handshake.is_none() {
self.pending_handshake = Some(handshake);
}
},
// Nothing to do in response to other notification substreams being opened
// or closed.
ProtocolsHandlerEvent::Custom(NotifsOutHandlerOut::Open { .. }) => {},
ProtocolsHandlerEvent::Custom(NotifsOutHandlerOut::Closed) => {},
ProtocolsHandlerEvent::Custom(NotifsOutHandlerOut::Refused) => {},
}
}
}
if self.out_handlers.iter().all(|(h, _)| h.is_open() || h.is_refused()) {
if let Some(handshake) = self.pending_handshake.take() {
let (async_tx, async_rx) = mpsc::channel(ASYNC_NOTIFICATIONS_BUFFER_SIZE);
let (sync_tx, sync_rx) = mpsc::channel(SYNC_NOTIFICATIONS_BUFFER_SIZE);
let notifications_sink = NotificationsSink {
inner: Arc::new(NotificationsSinkInner {
async_channel: FuturesMutex::new(async_tx),
sync_channel: Mutex::new(sync_tx),
}),
};
debug_assert!(self.notifications_sink_rx.is_none());
self.notifications_sink_rx = Some(stream::select(async_rx.fuse(), sync_rx.fuse()));
return Poll::Ready(ProtocolsHandlerEvent::Custom(
NotifsHandlerOut::Open {
endpoint: self.endpoint.clone(),
received_handshake: handshake,
notifications_sink
}
))
}
}
Poll::Pending
}
}
@@ -1,559 +0,0 @@
// Copyright 2019-2020 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
use crate::protocol::generic_proto::upgrade::{RegisteredProtocol, RegisteredProtocolEvent, RegisteredProtocolSubstream};
use bytes::BytesMut;
use futures::prelude::*;
use futures_timer::Delay;
use libp2p::core::{ConnectedPoint, PeerId, Endpoint};
use libp2p::core::upgrade::{InboundUpgrade, OutboundUpgrade};
use libp2p::swarm::{
ProtocolsHandler, ProtocolsHandlerEvent,
IntoProtocolsHandler,
KeepAlive,
ProtocolsHandlerUpgrErr,
SubstreamProtocol,
NegotiatedSubstream,
};
use log::{debug, error};
use smallvec::{smallvec, SmallVec};
use std::{borrow::Cow, collections::VecDeque, convert::Infallible, error, fmt, io, mem};
use std::{pin::Pin, task::{Context, Poll}, time::Duration};
/// Implements the `IntoProtocolsHandler` trait of libp2p.
///
/// Every time a connection with a remote starts, an instance of this struct is created and
/// sent to a background task dedicated to this connection. Once the connection is established,
/// it is turned into a `LegacyProtoHandler`. It then handles all communications that are specific
/// to Substrate on that single connection.
///
/// Note that there can be multiple instance of this struct simultaneously for same peer,
/// if there are multiple established connections to the peer.
///
/// ## State of the handler
///
/// There are six possible states for the handler:
///
/// - Enabled and open, which is a normal operation.
/// - Enabled and closed, in which case it will try to open substreams.
/// - Disabled and open, in which case it will try to close substreams.
/// - Disabled and closed, in which case the handler is idle. The connection will be
/// garbage-collected after a few seconds if nothing more happens.
/// - Initializing and open.
/// - Initializing and closed, which is the state the handler starts in.
///
/// The Init/Enabled/Disabled state is entirely controlled by the user by sending `Enable` or
/// `Disable` messages to the handler. The handler itself never transitions automatically between
/// these states. For example, if the handler reports a network misbehaviour, it will close the
/// substreams but it is the role of the user to send a `Disabled` event if it wants the connection
/// to close. Otherwise, the handler will try to reopen substreams.
///
/// The handler starts in the "Initializing" state and must be transitionned to Enabled or Disabled
/// as soon as possible.
///
/// The Open/Closed state is decided by the handler and is reported with the `CustomProtocolOpen`
/// and `CustomProtocolClosed` events. The `CustomMessage` event can only be generated if the
/// handler is open.
///
/// ## How it works
///
/// When the handler is created, it is initially in the `Init` state and waits for either a
/// `Disable` or an `Enable` message from the outer layer. At any time, the outer layer is free to
/// toggle the handler between the disabled and enabled states.
///
/// When the handler switches to "enabled", it opens a substream and negotiates the protocol named
/// `/substrate/xxx`, where `xxx` is chosen by the user and depends on the chain.
///
/// For backwards compatibility reasons, when we switch to "enabled" for the first time (while we
/// are still in "init" mode) and we are the connection listener, we don't open a substream.
///
/// In order the handle the situation where both the remote and us get enabled at the same time,
/// we tolerate multiple substreams open at the same time. Messages are transmitted on an arbitrary
/// substream. The endpoints don't try to agree on a single substream.
///
/// We consider that we are now "closed" if the remote closes all the existing substreams.
/// Re-opening it can then be performed by closing all active substream and re-opening one.
///
pub struct LegacyProtoHandlerProto {
/// Configuration for the protocol upgrade to negotiate.
protocol: RegisteredProtocol,
}
impl LegacyProtoHandlerProto {
/// Builds a new `LegacyProtoHandlerProto`.
pub fn new(protocol: RegisteredProtocol) -> Self {
LegacyProtoHandlerProto {
protocol,
}
}
}
impl IntoProtocolsHandler for LegacyProtoHandlerProto {
type Handler = LegacyProtoHandler;
fn inbound_protocol(&self) -> RegisteredProtocol {
self.protocol.clone()
}
fn into_handler(self, remote_peer_id: &PeerId, _: &ConnectedPoint) -> Self::Handler {
LegacyProtoHandler {
protocol: self.protocol,
remote_peer_id: remote_peer_id.clone(),
state: ProtocolState::Init {
substreams: SmallVec::new(),
init_deadline: Delay::new(Duration::from_secs(20))
},
events_queue: VecDeque::new(),
}
}
}
/// The actual handler once the connection has been established.
pub struct LegacyProtoHandler {
/// Configuration for the protocol upgrade to negotiate.
protocol: RegisteredProtocol,
/// State of the communications with the remote.
state: ProtocolState,
/// Identifier of the node we're talking to. Used only for logging purposes and shouldn't have
/// any influence on the behaviour.
remote_peer_id: PeerId,
/// Queue of events to send to the outside.
///
/// This queue must only ever be modified to insert elements at the back, or remove the first
/// element.
events_queue: VecDeque<
ProtocolsHandlerEvent<RegisteredProtocol, Infallible, LegacyProtoHandlerOut, ConnectionKillError>
>,
}
/// State of the handler.
enum ProtocolState {
/// Waiting for the behaviour to tell the handler whether it is enabled or disabled.
Init {
/// List of substreams opened by the remote but that haven't been processed yet.
/// For each substream, also includes the handshake message that we have received.
substreams: SmallVec<[(RegisteredProtocolSubstream<NegotiatedSubstream>, Vec<u8>); 6]>,
/// Deadline after which the initialization is abnormally long.
init_deadline: Delay,
},
/// Handler is ready to accept incoming substreams.
/// If we are in this state, we haven't sent any `CustomProtocolOpen` yet.
Opening,
/// Normal operating mode. Contains the substreams that are open.
/// If we are in this state, we have sent a `CustomProtocolOpen` message to the outside.
Normal {
/// The substreams where bidirectional communications happen.
substreams: SmallVec<[RegisteredProtocolSubstream<NegotiatedSubstream>; 4]>,
/// Contains substreams which are being shut down.
shutdown: SmallVec<[RegisteredProtocolSubstream<NegotiatedSubstream>; 4]>,
},
/// We are disabled. Contains substreams that are being closed.
/// If we are in this state, either we have sent a `CustomProtocolClosed` message to the
/// outside or we have never sent any `CustomProtocolOpen` in the first place.
Disabled {
/// List of substreams to shut down.
shutdown: SmallVec<[RegisteredProtocolSubstream<NegotiatedSubstream>; 6]>,
/// If true, we should reactivate the handler after all the substreams in `shutdown` have
/// been closed.
///
/// Since we don't want to mix old and new substreams, we wait for all old substreams to
/// be closed before opening any new one.
reenable: bool,
},
/// In this state, we don't care about anything anymore and need to kill the connection as soon
/// as possible.
KillAsap,
/// We sometimes temporarily switch to this state during processing. If we are in this state
/// at the beginning of a method, that means something bad happened in the source code.
Poisoned,
}
/// Event that can be received by a `LegacyProtoHandler`.
#[derive(Debug)]
pub enum LegacyProtoHandlerIn {
/// The node should start using custom protocols.
Enable,
/// The node should stop using custom protocols.
Disable,
}
/// Event that can be emitted by a `LegacyProtoHandler`.
#[derive(Debug)]
pub enum LegacyProtoHandlerOut {
/// Opened a custom protocol with the remote.
CustomProtocolOpen {
/// Version of the protocol that has been opened.
version: u8,
/// Handshake message that has been sent to us.
/// This is normally a "Status" message, but this out of the concern of this code.
received_handshake: Vec<u8>,
},
/// Closed a custom protocol with the remote.
CustomProtocolClosed {
/// Reason why the substream closed, for diagnostic purposes.
reason: Cow<'static, str>,
},
/// Receives a message on a custom protocol substream.
CustomMessage {
/// Message that has been received.
message: BytesMut,
},
}
impl LegacyProtoHandler {
/// Enables the handler.
fn enable(&mut self) {
self.state = match mem::replace(&mut self.state, ProtocolState::Poisoned) {
ProtocolState::Poisoned => {
error!(target: "sub-libp2p", "Handler with {:?} is in poisoned state",
self.remote_peer_id);
ProtocolState::Poisoned
}
ProtocolState::Init { substreams: mut incoming, .. } => {
if incoming.is_empty() {
ProtocolState::Opening
} else {
let event = LegacyProtoHandlerOut::CustomProtocolOpen {
version: incoming[0].0.protocol_version(),
received_handshake: mem::replace(&mut incoming[0].1, Vec::new()),
};
self.events_queue.push_back(ProtocolsHandlerEvent::Custom(event));
ProtocolState::Normal {
substreams: incoming.into_iter().map(|(s, _)| s).collect(),
shutdown: SmallVec::new()
}
}
}
st @ ProtocolState::KillAsap => st,
st @ ProtocolState::Opening { .. } => st,
st @ ProtocolState::Normal { .. } => st,
ProtocolState::Disabled { shutdown, .. } => {
ProtocolState::Disabled { shutdown, reenable: true }
}
}
}
/// Disables the handler.
fn disable(&mut self) {
self.state = match mem::replace(&mut self.state, ProtocolState::Poisoned) {
ProtocolState::Poisoned => {
error!(target: "sub-libp2p", "Handler with {:?} is in poisoned state",
self.remote_peer_id);
ProtocolState::Poisoned
}
ProtocolState::Init { substreams: shutdown, .. } => {
let mut shutdown = shutdown.into_iter().map(|(s, _)| s).collect::<SmallVec<[_; 6]>>();
for s in &mut shutdown {
s.shutdown();
}
ProtocolState::Disabled { shutdown, reenable: false }
}
ProtocolState::Opening { .. } | ProtocolState::Normal { .. } =>
// At the moment, if we get disabled while things were working, we kill the entire
// connection in order to force a reset of the state.
// This is obviously an extremely shameful way to do things, but at the time of
// the writing of this comment, the networking works very poorly and a solution
// needs to be found.
ProtocolState::KillAsap,
ProtocolState::Disabled { shutdown, .. } =>
ProtocolState::Disabled { shutdown, reenable: false },
ProtocolState::KillAsap => ProtocolState::KillAsap,
};
}
/// Polls the state for events. Optionally returns an event to produce.
#[must_use]
fn poll_state(&mut self, cx: &mut Context)
-> Option<ProtocolsHandlerEvent<RegisteredProtocol, Infallible, LegacyProtoHandlerOut, ConnectionKillError>> {
match mem::replace(&mut self.state, ProtocolState::Poisoned) {
ProtocolState::Poisoned => {
error!(target: "sub-libp2p", "Handler with {:?} is in poisoned state",
self.remote_peer_id);
self.state = ProtocolState::Poisoned;
None
}
ProtocolState::Init { substreams, mut init_deadline } => {
match Pin::new(&mut init_deadline).poll(cx) {
Poll::Ready(()) => {
error!(target: "sub-libp2p", "Handler initialization process is too long \
with {:?}", self.remote_peer_id);
self.state = ProtocolState::KillAsap;
},
Poll::Pending => {
self.state = ProtocolState::Init { substreams, init_deadline };
}
}
None
}
ProtocolState::Opening => {
self.state = ProtocolState::Opening;
None
}
ProtocolState::Normal { mut substreams, mut shutdown } => {
for n in (0..substreams.len()).rev() {
let mut substream = substreams.swap_remove(n);
match Pin::new(&mut substream).poll_next(cx) {
Poll::Pending => substreams.push(substream),
Poll::Ready(Some(Ok(RegisteredProtocolEvent::Message(message)))) => {
let event = LegacyProtoHandlerOut::CustomMessage {
message
};
substreams.push(substream);
self.state = ProtocolState::Normal { substreams, shutdown };
return Some(ProtocolsHandlerEvent::Custom(event));
},
Poll::Ready(Some(Ok(RegisteredProtocolEvent::Clogged))) => {
shutdown.push(substream);
if substreams.is_empty() {
let event = LegacyProtoHandlerOut::CustomProtocolClosed {
reason: "Legacy substream clogged".into(),
};
self.state = ProtocolState::Disabled {
shutdown: shutdown.into_iter().collect(),
reenable: true
};
return Some(ProtocolsHandlerEvent::Custom(event));
}
}
Poll::Ready(None) => {
shutdown.push(substream);
if substreams.is_empty() {
let event = LegacyProtoHandlerOut::CustomProtocolClosed {
reason: "All substreams have been closed by the remote".into(),
};
self.state = ProtocolState::Disabled {
shutdown: shutdown.into_iter().collect(),
reenable: true
};
return Some(ProtocolsHandlerEvent::Custom(event));
}
}
Poll::Ready(Some(Err(err))) => {
if substreams.is_empty() {
let event = LegacyProtoHandlerOut::CustomProtocolClosed {
reason: format!("Error on the last substream: {:?}", err).into(),
};
self.state = ProtocolState::Disabled {
shutdown: shutdown.into_iter().collect(),
reenable: true
};
return Some(ProtocolsHandlerEvent::Custom(event));
} else {
debug!(target: "sub-libp2p", "Error on extra substream: {:?}", err);
}
}
}
}
// This code is reached is none if and only if none of the substreams are in a ready state.
self.state = ProtocolState::Normal { substreams, shutdown };
None
}
ProtocolState::Disabled { mut shutdown, reenable } => {
shutdown_list(&mut shutdown, cx);
// If `reenable` is `true`, that means we should open the substreams system again
// after all the substreams are closed.
if reenable && shutdown.is_empty() {
self.state = ProtocolState::Opening;
} else {
self.state = ProtocolState::Disabled { shutdown, reenable };
}
None
}
ProtocolState::KillAsap => None,
}
}
}
impl ProtocolsHandler for LegacyProtoHandler {
type InEvent = LegacyProtoHandlerIn;
type OutEvent = LegacyProtoHandlerOut;
type Error = ConnectionKillError;
type InboundProtocol = RegisteredProtocol;
type OutboundProtocol = RegisteredProtocol;
type OutboundOpenInfo = Infallible;
type InboundOpenInfo = ();
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, ()> {
SubstreamProtocol::new(self.protocol.clone(), ())
}
fn inject_fully_negotiated_inbound(
&mut self,
(mut substream, received_handshake): <Self::InboundProtocol as InboundUpgrade<NegotiatedSubstream>>::Output,
(): ()
) {
self.state = match mem::replace(&mut self.state, ProtocolState::Poisoned) {
ProtocolState::Poisoned => {
error!(target: "sub-libp2p", "Handler with {:?} is in poisoned state",
self.remote_peer_id);
ProtocolState::Poisoned
}
ProtocolState::Init { mut substreams, init_deadline } => {
if substream.endpoint() == Endpoint::Dialer {
error!(target: "sub-libp2p", "Opened dialing substream with {:?} before \
initialization", self.remote_peer_id);
}
substreams.push((substream, received_handshake));
ProtocolState::Init { substreams, init_deadline }
}
ProtocolState::Opening { .. } => {
let event = LegacyProtoHandlerOut::CustomProtocolOpen {
version: substream.protocol_version(),
received_handshake,
};
self.events_queue.push_back(ProtocolsHandlerEvent::Custom(event));
ProtocolState::Normal {
substreams: smallvec![substream],
shutdown: SmallVec::new()
}
}
ProtocolState::Normal { substreams: mut existing, shutdown } => {
existing.push(substream);
ProtocolState::Normal { substreams: existing, shutdown }
}
ProtocolState::Disabled { mut shutdown, .. } => {
substream.shutdown();
shutdown.push(substream);
ProtocolState::Disabled { shutdown, reenable: false }
}
ProtocolState::KillAsap => ProtocolState::KillAsap,
};
}
fn inject_fully_negotiated_outbound(
&mut self,
_: <Self::OutboundProtocol as OutboundUpgrade<NegotiatedSubstream>>::Output,
unreachable: Self::OutboundOpenInfo
) {
match unreachable {}
}
fn inject_event(&mut self, message: LegacyProtoHandlerIn) {
match message {
LegacyProtoHandlerIn::Disable => self.disable(),
LegacyProtoHandlerIn::Enable => self.enable(),
}
}
fn inject_dial_upgrade_error(
&mut self,
unreachable: Self::OutboundOpenInfo,
_: ProtocolsHandlerUpgrErr<io::Error>
) {
match unreachable {}
}
fn connection_keep_alive(&self) -> KeepAlive {
match self.state {
ProtocolState::Init { .. } | ProtocolState::Normal { .. } => KeepAlive::Yes,
ProtocolState::Opening { .. } | ProtocolState::Disabled { .. } |
ProtocolState::Poisoned | ProtocolState::KillAsap => KeepAlive::No,
}
}
fn poll(
&mut self,
cx: &mut Context,
) -> Poll<
ProtocolsHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::OutEvent, Self::Error>
> {
// Flush the events queue if necessary.
if let Some(event) = self.events_queue.pop_front() {
return Poll::Ready(event)
}
// Kill the connection if needed.
if let ProtocolState::KillAsap = self.state {
return Poll::Ready(ProtocolsHandlerEvent::Close(ConnectionKillError));
}
// Process all the substreams.
if let Some(event) = self.poll_state(cx) {
return Poll::Ready(event)
}
Poll::Pending
}
}
impl fmt::Debug for LegacyProtoHandler {
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
f.debug_struct("LegacyProtoHandler")
.finish()
}
}
/// Given a list of substreams, tries to shut them down. The substreams that have been successfully
/// shut down are removed from the list.
fn shutdown_list
(list: &mut SmallVec<impl smallvec::Array<Item = RegisteredProtocolSubstream<NegotiatedSubstream>>>,
cx: &mut Context)
{
'outer: for n in (0..list.len()).rev() {
let mut substream = list.swap_remove(n);
loop {
match substream.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(_))) => {}
Poll::Pending => break,
Poll::Ready(Some(Err(_))) | Poll::Ready(None) => continue 'outer,
}
}
list.push(substream);
}
}
/// Error returned when switching from normal to disabled.
#[derive(Debug)]
pub struct ConnectionKillError;
impl error::Error for ConnectionKillError {
}
impl fmt::Display for ConnectionKillError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "Connection kill when switching from normal to disabled")
}
}
@@ -1,293 +0,0 @@
// This file is part of Substrate.
// Copyright (C) 2020 Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//! Implementations of the `IntoProtocolsHandler` and `ProtocolsHandler` traits for ingoing
//! substreams for a single gossiping protocol.
//!
//! > **Note**: Each instance corresponds to a single protocol. In order to support multiple
//! > protocols, you need to create multiple instances and group them.
//!
use crate::protocol::generic_proto::upgrade::{NotificationsIn, NotificationsInSubstream};
use bytes::BytesMut;
use futures::prelude::*;
use libp2p::core::{ConnectedPoint, PeerId};
use libp2p::core::upgrade::{DeniedUpgrade, InboundUpgrade, OutboundUpgrade};
use libp2p::swarm::{
ProtocolsHandler, ProtocolsHandlerEvent,
IntoProtocolsHandler,
KeepAlive,
ProtocolsHandlerUpgrErr,
SubstreamProtocol,
NegotiatedSubstream,
};
use log::{error, warn};
use std::{borrow::Cow, collections::VecDeque, fmt, pin::Pin, task::{Context, Poll}};
/// Implements the `IntoProtocolsHandler` trait of libp2p.
///
/// Every time a connection with a remote starts, an instance of this struct is created and
/// sent to a background task dedicated to this connection. Once the connection is established,
/// it is turned into a [`NotifsInHandler`].
pub struct NotifsInHandlerProto {
/// Configuration for the protocol upgrade to negotiate.
in_protocol: NotificationsIn,
}
/// The actual handler once the connection has been established.
pub struct NotifsInHandler {
/// Configuration for the protocol upgrade to negotiate for inbound substreams.
in_protocol: NotificationsIn,
/// Substream that is open with the remote.
substream: Option<NotificationsInSubstream<NegotiatedSubstream>>,
/// If the substream is opened and closed rapidly, we can emit several `OpenRequest` and
/// `Closed` messages in a row without the handler having time to respond with `Accept` or
/// `Refuse`.
///
/// In order to keep the state consistent, we increment this variable every time an
/// `OpenRequest` is emitted and decrement it every time an `Accept` or `Refuse` is received.
pending_accept_refuses: usize,
/// Queue of events to send to the outside.
///
/// This queue is only ever modified to insert elements at the back, or remove the first
/// element.
events_queue: VecDeque<ProtocolsHandlerEvent<DeniedUpgrade, (), NotifsInHandlerOut, void::Void>>,
}
/// Event that can be received by a `NotifsInHandler`.
#[derive(Debug, Clone)]
pub enum NotifsInHandlerIn {
/// Can be sent back as a response to an `OpenRequest`. Contains the status message to send
/// to the remote.
///
/// After sending this to the handler, the substream is now considered open and `Notif` events
/// can be received.
Accept(Vec<u8>),
/// Can be sent back as a response to an `OpenRequest`.
Refuse,
}
/// Event that can be emitted by a `NotifsInHandler`.
#[derive(Debug)]
pub enum NotifsInHandlerOut {
/// The remote wants to open a substream. Contains the initial message sent by the remote
/// when the substream has been opened.
///
/// Every time this event is emitted, a corresponding `Accepted` or `Refused` **must** be sent
/// back even if a `Closed` is received.
OpenRequest(Vec<u8>),
/// The notifications substream has been closed by the remote. In order to avoid race
/// conditions, this does **not** cancel any previously-sent `OpenRequest`.
Closed,
/// Received a message on the notifications substream.
///
/// Can only happen after an `Accept` and before a `Closed`.
Notif(BytesMut),
}
impl NotifsInHandlerProto {
/// Builds a new `NotifsInHandlerProto`.
pub fn new(
protocol_name: impl Into<Cow<'static, str>>
) -> Self {
NotifsInHandlerProto {
in_protocol: NotificationsIn::new(protocol_name),
}
}
}
impl IntoProtocolsHandler for NotifsInHandlerProto {
type Handler = NotifsInHandler;
fn inbound_protocol(&self) -> NotificationsIn {
self.in_protocol.clone()
}
fn into_handler(self, _: &PeerId, _: &ConnectedPoint) -> Self::Handler {
NotifsInHandler {
in_protocol: self.in_protocol,
substream: None,
pending_accept_refuses: 0,
events_queue: VecDeque::new(),
}
}
}
impl NotifsInHandler {
/// Returns the name of the protocol that we accept.
pub fn protocol_name(&self) -> &Cow<'static, str> {
self.in_protocol.protocol_name()
}
/// Equivalent to the `poll` method of `ProtocolsHandler`, except that it is guaranteed to
/// never generate [`NotifsInHandlerOut::Notif`].
///
/// Use this method in situations where it is not desirable to receive events but still
/// necessary to drive any potential incoming handshake or request.
pub fn poll_process(
&mut self,
cx: &mut Context
) -> Poll<
ProtocolsHandlerEvent<DeniedUpgrade, (), NotifsInHandlerOut, void::Void>
> {
if let Some(event) = self.events_queue.pop_front() {
return Poll::Ready(event)
}
match self.substream.as_mut().map(|s| NotificationsInSubstream::poll_process(Pin::new(s), cx)) {
None | Some(Poll::Pending) => {},
Some(Poll::Ready(Ok(v))) => match v {},
Some(Poll::Ready(Err(_))) => {
self.substream = None;
return Poll::Ready(ProtocolsHandlerEvent::Custom(NotifsInHandlerOut::Closed));
},
}
Poll::Pending
}
}
impl ProtocolsHandler for NotifsInHandler {
type InEvent = NotifsInHandlerIn;
type OutEvent = NotifsInHandlerOut;
type Error = void::Void;
type InboundProtocol = NotificationsIn;
type OutboundProtocol = DeniedUpgrade;
type OutboundOpenInfo = ();
type InboundOpenInfo = ();
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, ()> {
SubstreamProtocol::new(self.in_protocol.clone(), ())
}
fn inject_fully_negotiated_inbound(
&mut self,
(msg, proto): <Self::InboundProtocol as InboundUpgrade<NegotiatedSubstream>>::Output,
(): ()
) {
// If a substream already exists, we drop it and replace it with the new incoming one.
if self.substream.is_some() {
self.events_queue.push_back(ProtocolsHandlerEvent::Custom(NotifsInHandlerOut::Closed));
}
// Note that we drop the existing substream, which will send an equivalent to a TCP "RST"
// to the remote and force-close the substream. It might seem like an unclean way to get
// rid of a substream. However, keep in mind that it is invalid for the remote to open
// multiple such substreams, and therefore sending a "RST" is not an incorrect thing to do.
self.substream = Some(proto);
self.events_queue.push_back(ProtocolsHandlerEvent::Custom(NotifsInHandlerOut::OpenRequest(msg)));
self.pending_accept_refuses = self.pending_accept_refuses
.checked_add(1)
.unwrap_or_else(|| {
error!(target: "sub-libp2p", "Overflow in pending_accept_refuses");
usize::max_value()
});
}
fn inject_fully_negotiated_outbound(
&mut self,
out: <Self::OutboundProtocol as OutboundUpgrade<NegotiatedSubstream>>::Output,
_: Self::OutboundOpenInfo
) {
// We never emit any outgoing substream.
void::unreachable(out)
}
fn inject_event(&mut self, message: NotifsInHandlerIn) {
self.pending_accept_refuses = match self.pending_accept_refuses.checked_sub(1) {
Some(v) => v,
None => {
error!(
target: "sub-libp2p",
"Inconsistent state: received Accept/Refuse when no pending request exists"
);
return;
}
};
// If we send multiple `OpenRequest`s in a row, we will receive back multiple
// `Accept`/`Refuse` messages. All of them are obsolete except the last one.
if self.pending_accept_refuses != 0 {
return;
}
match (message, self.substream.as_mut()) {
(NotifsInHandlerIn::Accept(message), Some(sub)) => sub.send_handshake(message),
(NotifsInHandlerIn::Accept(_), None) => {},
(NotifsInHandlerIn::Refuse, _) => self.substream = None,
}
}
fn inject_dial_upgrade_error(&mut self, _: (), _: ProtocolsHandlerUpgrErr<void::Void>) {
error!(target: "sub-libp2p", "Received dial upgrade error in inbound-only handler");
}
fn connection_keep_alive(&self) -> KeepAlive {
if self.substream.is_some() {
KeepAlive::Yes
} else {
KeepAlive::No
}
}
fn poll(
&mut self,
cx: &mut Context,
) -> Poll<
ProtocolsHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::OutEvent, Self::Error>
> {
// Flush the events queue if necessary.
if let Some(event) = self.events_queue.pop_front() {
return Poll::Ready(event)
}
match self.substream.as_mut().map(|s| Stream::poll_next(Pin::new(s), cx)) {
None | Some(Poll::Pending) => {},
Some(Poll::Ready(Some(Ok(msg)))) => {
if self.pending_accept_refuses != 0 {
warn!(
target: "sub-libp2p",
"Bad state in inbound-only handler: notif before accepting substream"
);
}
return Poll::Ready(ProtocolsHandlerEvent::Custom(NotifsInHandlerOut::Notif(msg)))
},
Some(Poll::Ready(None)) | Some(Poll::Ready(Some(Err(_)))) => {
self.substream = None;
return Poll::Ready(ProtocolsHandlerEvent::Custom(NotifsInHandlerOut::Closed));
},
}
Poll::Pending
}
}
impl fmt::Debug for NotifsInHandler {
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
f.debug_struct("NotifsInHandler")
.field("substream_open", &self.substream.is_some())
.finish()
}
}
@@ -1,444 +0,0 @@
// Copyright 2019-2020 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
//! Implementations of the `IntoProtocolsHandler` and `ProtocolsHandler` traits for outgoing
//! substreams of a single gossiping protocol.
//!
//! > **Note**: Each instance corresponds to a single protocol. In order to support multiple
//! > protocols, you need to create multiple instances and group them.
//!
use crate::protocol::generic_proto::upgrade::{NotificationsOut, NotificationsOutSubstream, NotificationsHandshakeError};
use futures::prelude::*;
use libp2p::core::{ConnectedPoint, PeerId};
use libp2p::core::upgrade::{DeniedUpgrade, InboundUpgrade, OutboundUpgrade};
use libp2p::swarm::{
ProtocolsHandler, ProtocolsHandlerEvent,
IntoProtocolsHandler,
KeepAlive,
ProtocolsHandlerUpgrErr,
SubstreamProtocol,
NegotiatedSubstream,
};
use log::{debug, warn, error};
use std::{
borrow::Cow, collections::VecDeque, fmt, mem, pin::Pin, task::{Context, Poll, Waker},
time::Duration
};
use wasm_timer::Instant;
/// Maximum duration to open a substream and receive the handshake message. After that, we
/// consider that we failed to open the substream.
const OPEN_TIMEOUT: Duration = Duration::from_secs(10);
/// After successfully establishing a connection with the remote, we keep the connection open for
/// at least this amount of time in order to give the rest of the code the chance to notify us to
/// open substreams.
const INITIAL_KEEPALIVE_TIME: Duration = Duration::from_secs(5);
/// Implements the `IntoProtocolsHandler` trait of libp2p.
///
/// Every time a connection with a remote starts, an instance of this struct is created and
/// sent to a background task dedicated to this connection. Once the connection is established,
/// it is turned into a [`NotifsOutHandler`].
///
/// See the documentation of [`NotifsOutHandler`] for more information.
pub struct NotifsOutHandlerProto {
/// Name of the protocol to negotiate.
protocol_name: Cow<'static, str>,
}
impl NotifsOutHandlerProto {
/// Builds a new [`NotifsOutHandlerProto`]. Will use the given protocol name for the
/// notifications substream.
pub fn new(protocol_name: impl Into<Cow<'static, str>>) -> Self {
NotifsOutHandlerProto {
protocol_name: protocol_name.into(),
}
}
}
impl IntoProtocolsHandler for NotifsOutHandlerProto {
type Handler = NotifsOutHandler;
fn inbound_protocol(&self) -> DeniedUpgrade {
DeniedUpgrade
}
fn into_handler(self, _: &PeerId, _: &ConnectedPoint) -> Self::Handler {
NotifsOutHandler {
protocol_name: self.protocol_name,
when_connection_open: Instant::now(),
state: State::Disabled,
events_queue: VecDeque::new(),
}
}
}
/// Handler for an outbound notification substream.
///
/// When a connection is established, this handler starts in the "disabled" state, meaning that
/// no substream will be open.
///
/// One can try open a substream by sending an [`NotifsOutHandlerIn::Enable`] message to the
/// handler. Once done, the handler will try to establish then maintain an outbound substream with
/// the remote for the purpose of sending notifications to it.
pub struct NotifsOutHandler {
/// Name of the protocol to negotiate.
protocol_name: Cow<'static, str>,
/// Relationship with the node we're connected to.
state: State,
/// When the connection with the remote has been successfully established.
when_connection_open: Instant,
/// Queue of events to send to the outside.
///
/// This queue must only ever be modified to insert elements at the back, or remove the first
/// element.
events_queue: VecDeque<ProtocolsHandlerEvent<NotificationsOut, (), NotifsOutHandlerOut, void::Void>>,
}
/// Our relationship with the node we're connected to.
enum State {
/// The handler is disabled and idle. No substream is open.
Disabled,
/// The handler is disabled. A substream is still open and needs to be closed.
///
/// > **Important**: Having this state means that `poll_close` has been called at least once,
/// > but the `Sink` API is unclear about whether or not the stream can then
/// > be recovered. Because of that, we must never switch from the
/// > `DisabledOpen` state to the `Open` state while keeping the same substream.
DisabledOpen(NotificationsOutSubstream<NegotiatedSubstream>),
/// The handler is disabled but we are still trying to open a substream with the remote.
///
/// If the handler gets enabled again, we can immediately switch to `Opening`.
DisabledOpening,
/// The handler is enabled and we are trying to open a substream with the remote.
Opening {
/// The initial message that we sent. Necessary if we need to re-open a substream.
initial_message: Vec<u8>,
},
/// The handler is enabled. We have tried opening a substream in the past but the remote
/// refused it.
Refused,
/// The handler is enabled and substream is open.
Open {
/// Substream that is currently open.
substream: NotificationsOutSubstream<NegotiatedSubstream>,
/// Waker for the last task that got `Poll::Pending` from `poll_ready`, to notify
/// when the open substream closes due to being disabled or encountering an
/// error, i.e. to notify the task as soon as the substream becomes unavailable,
/// without waiting for an underlying I/O task wakeup.
close_waker: Option<Waker>,
/// The initial message that we sent. Necessary if we need to re-open a substream.
initial_message: Vec<u8>,
},
/// Poisoned state. Shouldn't be found in the wild.
Poisoned,
}
/// Event that can be received by a `NotifsOutHandler`.
#[derive(Debug)]
pub enum NotifsOutHandlerIn {
/// Enables the notifications substream for this node. The handler will try to maintain a
/// substream with the remote.
Enable {
/// Initial message to send to remote nodes when we open substreams.
initial_message: Vec<u8>,
},
/// Disables the notifications substream for this node. This is the default state.
Disable,
}
/// Event that can be emitted by a `NotifsOutHandler`.
#[derive(Debug)]
pub enum NotifsOutHandlerOut {
/// The notifications substream has been accepted by the remote.
Open {
/// Handshake message sent by the remote after we opened the substream.
handshake: Vec<u8>,
},
/// The notifications substream has been closed by the remote.
Closed,
/// We tried to open a notifications substream, but the remote refused it.
///
/// Can only happen if we're in a closed state.
Refused,
}
impl NotifsOutHandler {
/// Returns true if the substream is currently open.
pub fn is_open(&self) -> bool {
match &self.state {
State::Disabled => false,
State::DisabledOpening => false,
State::DisabledOpen(_) => true,
State::Opening { .. } => false,
State::Refused => false,
State::Open { .. } => true,
State::Poisoned => false,
}
}
/// Returns `true` if there has been an attempt to open the substream, but the remote refused
/// the substream.
///
/// Always returns `false` if the handler is in a disabled state.
pub fn is_refused(&self) -> bool {
match &self.state {
State::Disabled => false,
State::DisabledOpening => false,
State::DisabledOpen(_) => false,
State::Opening { .. } => false,
State::Refused => true,
State::Open { .. } => false,
State::Poisoned => false,
}
}
/// Returns the name of the protocol that we negotiate.
pub fn protocol_name(&self) -> &Cow<'static, str> {
&self.protocol_name
}
/// Polls whether the outbound substream is ready to send a notification.
///
/// - Returns `Poll::Pending` if the substream is open but not ready to send a notification.
/// - Returns `Poll::Ready(true)` if the substream is ready to send a notification.
/// - Returns `Poll::Ready(false)` if the substream is closed.
///
pub fn poll_ready(&mut self, cx: &mut Context) -> Poll<bool> {
if let State::Open { substream, close_waker, .. } = &mut self.state {
match substream.poll_ready_unpin(cx) {
Poll::Ready(Ok(())) => Poll::Ready(true),
Poll::Ready(Err(_)) => Poll::Ready(false),
Poll::Pending => {
*close_waker = Some(cx.waker().clone());
Poll::Pending
}
}
} else {
Poll::Ready(false)
}
}
/// Sends out a notification.
///
/// If the substream is closed, or not ready to send out a notification yet, then the
/// notification is silently discarded.
///
/// You are encouraged to call [`NotifsOutHandler::poll_ready`] beforehand to determine
/// whether this will succeed. If `Poll::Ready(true)` is returned, then this method will send
/// out a notification.
pub fn send_or_discard(&mut self, notification: Vec<u8>) {
if let State::Open { substream, .. } = &mut self.state {
let _ = substream.start_send_unpin(notification);
}
}
}
impl ProtocolsHandler for NotifsOutHandler {
type InEvent = NotifsOutHandlerIn;
type OutEvent = NotifsOutHandlerOut;
type Error = void::Void;
type InboundProtocol = DeniedUpgrade;
type OutboundProtocol = NotificationsOut;
type OutboundOpenInfo = ();
type InboundOpenInfo = ();
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, ()> {
SubstreamProtocol::new(DeniedUpgrade, ())
}
fn inject_fully_negotiated_inbound(
&mut self,
proto: <Self::InboundProtocol as InboundUpgrade<NegotiatedSubstream>>::Output,
(): ()
) {
// We should never reach here. `proto` is a `Void`.
void::unreachable(proto)
}
fn inject_fully_negotiated_outbound(
&mut self,
(handshake_msg, substream): <Self::OutboundProtocol as OutboundUpgrade<NegotiatedSubstream>>::Output,
_: ()
) {
match mem::replace(&mut self.state, State::Poisoned) {
State::Opening { initial_message } => {
let ev = NotifsOutHandlerOut::Open { handshake: handshake_msg };
self.events_queue.push_back(ProtocolsHandlerEvent::Custom(ev));
self.state = State::Open { substream, initial_message, close_waker: None };
},
// If the handler was disabled while we were negotiating the protocol, immediately
// close it.
State::DisabledOpening => self.state = State::DisabledOpen(substream),
// Any other situation should never happen.
State::Disabled | State::Refused | State::Open { .. } | State::DisabledOpen(_) =>
error!("☎️ State mismatch in notifications handler: substream already open"),
State::Poisoned => error!("☎️ Notifications handler in a poisoned state"),
}
}
fn inject_event(&mut self, message: NotifsOutHandlerIn) {
match message {
NotifsOutHandlerIn::Enable { initial_message } => {
match mem::replace(&mut self.state, State::Poisoned) {
State::Disabled => {
let proto = NotificationsOut::new(self.protocol_name.clone(), initial_message.clone());
self.events_queue.push_back(ProtocolsHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(proto, ()).with_timeout(OPEN_TIMEOUT),
});
self.state = State::Opening { initial_message };
},
State::DisabledOpening => self.state = State::Opening { initial_message },
State::DisabledOpen(mut sub) => {
// As documented above, in this state we have already called `poll_close`
// once on the substream, and it is unclear whether the substream can then
// be recovered. When in doubt, let's drop the existing substream and
// open a new one.
if sub.close().now_or_never().is_none() {
warn!(
target: "sub-libp2p",
"📞 Improperly closed outbound notifications substream"
);
}
let proto = NotificationsOut::new(self.protocol_name.clone(), initial_message.clone());
self.events_queue.push_back(ProtocolsHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(proto, ()).with_timeout(OPEN_TIMEOUT),
});
self.state = State::Opening { initial_message };
},
st @ State::Opening { .. } | st @ State::Refused | st @ State::Open { .. } => {
debug!(target: "sub-libp2p",
"Tried to enable notifications handler that was already enabled");
self.state = st;
}
State::Poisoned => error!("Notifications handler in a poisoned state"),
}
}
NotifsOutHandlerIn::Disable => {
match mem::replace(&mut self.state, State::Poisoned) {
st @ State::Disabled | st @ State::DisabledOpen(_) | st @ State::DisabledOpening => {
debug!(target: "sub-libp2p",
"Tried to disable notifications handler that was already disabled");
self.state = st;
}
State::Opening { .. } => self.state = State::DisabledOpening,
State::Refused => self.state = State::Disabled,
State::Open { substream, close_waker, .. } => {
if let Some(close_waker) = close_waker {
close_waker.wake();
}
self.state = State::DisabledOpen(substream)
},
State::Poisoned => error!("☎️ Notifications handler in a poisoned state"),
}
}
}
}
fn inject_dial_upgrade_error(&mut self, _: (), _: ProtocolsHandlerUpgrErr<NotificationsHandshakeError>) {
match mem::replace(&mut self.state, State::Poisoned) {
State::Disabled => {},
State::DisabledOpen(_) | State::Refused | State::Open { .. } =>
error!("☎️ State mismatch in NotificationsOut"),
State::Opening { .. } => {
self.state = State::Refused;
let ev = NotifsOutHandlerOut::Refused;
self.events_queue.push_back(ProtocolsHandlerEvent::Custom(ev));
},
State::DisabledOpening => self.state = State::Disabled,
State::Poisoned => error!("☎️ Notifications handler in a poisoned state"),
}
}
fn connection_keep_alive(&self) -> KeepAlive {
match self.state {
// We have a small grace period of `INITIAL_KEEPALIVE_TIME` during which we keep the
// connection open no matter what, in order to avoid closing and reopening
// connections all the time.
State::Disabled | State::DisabledOpen(_) | State::DisabledOpening =>
KeepAlive::Until(self.when_connection_open + INITIAL_KEEPALIVE_TIME),
State::Opening { .. } | State::Open { .. } => KeepAlive::Yes,
State::Refused | State::Poisoned => KeepAlive::No,
}
}
fn poll(
&mut self,
cx: &mut Context,
) -> Poll<ProtocolsHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::OutEvent, Self::Error>> {
// Flush the events queue if necessary.
if let Some(event) = self.events_queue.pop_front() {
return Poll::Ready(event)
}
match &mut self.state {
State::Open { substream, initial_message, close_waker } =>
match Sink::poll_flush(Pin::new(substream), cx) {
Poll::Pending | Poll::Ready(Ok(())) => {},
Poll::Ready(Err(_)) => {
if let Some(close_waker) = close_waker.take() {
close_waker.wake();
}
// We try to re-open a substream.
let initial_message = mem::replace(initial_message, Vec::new());
self.state = State::Opening { initial_message: initial_message.clone() };
let proto = NotificationsOut::new(self.protocol_name.clone(), initial_message);
self.events_queue.push_back(ProtocolsHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(proto, ()).with_timeout(OPEN_TIMEOUT),
});
return Poll::Ready(ProtocolsHandlerEvent::Custom(NotifsOutHandlerOut::Closed));
}
},
State::DisabledOpen(sub) => match Sink::poll_close(Pin::new(sub), cx) {
Poll::Pending => {},
Poll::Ready(Ok(())) | Poll::Ready(Err(_)) => {
self.state = State::Disabled;
return Poll::Ready(ProtocolsHandlerEvent::Custom(NotifsOutHandlerOut::Closed));
},
},
_ => {}
}
Poll::Pending
}
}
impl fmt::Debug for NotifsOutHandler {
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
f.debug_struct("NotifsOutHandler")
.field("open", &self.is_open())
.finish()
}
}
@@ -20,7 +20,7 @@ use crate::config::ProtocolId;
use bytes::BytesMut;
use futures::prelude::*;
use futures_codec::Framed;
use libp2p::core::{Endpoint, UpgradeInfo, InboundUpgrade, OutboundUpgrade, upgrade::ProtocolName};
use libp2p::core::{UpgradeInfo, InboundUpgrade, OutboundUpgrade, upgrade::ProtocolName};
use parking_lot::RwLock;
use std::{collections::VecDeque, io, pin::Pin, sync::Arc, vec::IntoIter as VecIntoIter};
use std::task::{Context, Poll};
@@ -85,34 +85,18 @@ impl Clone for RegisteredProtocol {
pub struct RegisteredProtocolSubstream<TSubstream> {
/// If true, we are in the process of closing the sink.
is_closing: bool,
/// Whether the local node opened this substream (dialer), or we received this substream from
/// the remote (listener).
endpoint: Endpoint,
/// Buffer of packets to send.
send_queue: VecDeque<BytesMut>,
/// If true, we should call `poll_complete` on the inner sink.
requires_poll_flush: bool,
/// The underlying substream.
inner: stream::Fuse<Framed<TSubstream, UviBytes<BytesMut>>>,
/// Version of the protocol that was negotiated.
protocol_version: u8,
/// If true, we have sent a "remote is clogged" event recently and shouldn't send another one
/// unless the buffer empties then fills itself again.
clogged_fuse: bool,
}
impl<TSubstream> RegisteredProtocolSubstream<TSubstream> {
/// Returns the version of the protocol that was negotiated.
pub fn protocol_version(&self) -> u8 {
self.protocol_version
}
/// Returns whether the local node opened this substream (dialer), or we received this
/// substream from the remote (listener).
pub fn endpoint(&self) -> Endpoint {
self.endpoint
}
/// Starts a graceful shutdown process on this substream.
///
/// Note that "graceful" means that we sent a closing message. We don't wait for any
@@ -246,7 +230,7 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + 'static,
fn upgrade_inbound(
self,
socket: TSubstream,
info: Self::Info,
_: Self::Info,
) -> Self::Future {
Box::pin(async move {
let mut framed = {
@@ -262,11 +246,9 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + 'static,
Ok((RegisteredProtocolSubstream {
is_closing: false,
endpoint: Endpoint::Listener,
send_queue: VecDeque::new(),
requires_poll_flush: false,
inner: framed.fuse(),
protocol_version: info.version,
clogged_fuse: false,
}, received_handshake.to_vec()))
})
@@ -283,7 +265,7 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + 'static,
fn upgrade_outbound(
self,
socket: TSubstream,
info: Self::Info,
_: Self::Info,
) -> Self::Future {
Box::pin(async move {
let mut framed = {
@@ -301,11 +283,9 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + 'static,
Ok((RegisteredProtocolSubstream {
is_closing: false,
endpoint: Endpoint::Dialer,
send_queue: VecDeque::new(),
requires_poll_flush: false,
inner: framed.fuse(),
protocol_version: info.version,
clogged_fuse: false,
}, received_handshake.to_vec()))
})