mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-12 03:01:07 +00:00
Rename GenericProto to Notifications (#8415)
* Rename GenericProto to Notifications * Small comment fix
This commit is contained in:
File diff suppressed because it is too large
Load Diff
@@ -0,0 +1,830 @@
|
||||
// This file is part of Substrate.
|
||||
|
||||
// Copyright (C) 2019-2021 Parity Technologies (UK) Ltd.
|
||||
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
|
||||
|
||||
// This program is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// This program is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
|
||||
//! Implementations of the `IntoProtocolsHandler` and `ProtocolsHandler` traits for both incoming
|
||||
//! and outgoing substreams for all gossiping protocols.
|
||||
//!
|
||||
//! This is the main implementation of `ProtocolsHandler` in this crate, that handles all the
|
||||
//! gossiping protocols that are Substrate-related and outside of the scope of libp2p.
|
||||
//!
|
||||
//! # Usage
|
||||
//!
|
||||
//! From an API perspective, for each of its protocols, the [`NotifsHandler`] is always in one of
|
||||
//! the following state (see [`State`]):
|
||||
//!
|
||||
//! - Closed substream. This is the initial state.
|
||||
//! - Closed substream, but remote desires them to be open.
|
||||
//! - Open substream.
|
||||
//! - Open substream, but remote desires them to be closed.
|
||||
//!
|
||||
//! Each protocol in the [`NotifsHandler`] can spontaneously switch between these states:
|
||||
//!
|
||||
//! - "Closed substream" to "Closed substream but open desired". When that happens, a
|
||||
//! [`NotifsHandlerOut::OpenDesiredByRemote`] is emitted.
|
||||
//! - "Closed substream but open desired" to "Closed substream" (i.e. the remote has cancelled
|
||||
//! their request). When that happens, a [`NotifsHandlerOut::CloseDesired`] is emitted.
|
||||
//! - "Open substream" to "Open substream but close desired". When that happens, a
|
||||
//! [`NotifsHandlerOut::CloseDesired`] is emitted.
|
||||
//!
|
||||
//! The user can instruct a protocol in the `NotifsHandler` to switch from "closed" to "open" or
|
||||
//! vice-versa by sending either a [`NotifsHandlerIn::Open`] or a [`NotifsHandlerIn::Close`]. The
|
||||
//! `NotifsHandler` must answer with [`NotifsHandlerOut::OpenResultOk`] or
|
||||
//! [`NotifsHandlerOut::OpenResultErr`], or with [`NotifsHandlerOut::CloseResult`].
|
||||
//!
|
||||
//! When a [`NotifsHandlerOut::OpenResultOk`] is emitted, the substream is now in the open state.
|
||||
//! When a [`NotifsHandlerOut::OpenResultErr`] or [`NotifsHandlerOut::CloseResult`] is emitted,
|
||||
//! the `NotifsHandler` is now (or remains) in the closed state.
|
||||
//!
|
||||
//! When a [`NotifsHandlerOut::OpenDesiredByRemote`] is emitted, the user should always send back
|
||||
//! either a [`NotifsHandlerIn::Open`] or a [`NotifsHandlerIn::Close`].If this isn't done, the
|
||||
//! remote will be left in a pending state.
|
||||
//!
|
||||
//! It is illegal to send a [`NotifsHandlerIn::Open`] before a previously-emitted
|
||||
//! [`NotifsHandlerIn::Open`] has gotten an answer.
|
||||
|
||||
use crate::protocol::notifications::{
|
||||
upgrade::{
|
||||
NotificationsIn, NotificationsOut, NotificationsInSubstream, NotificationsOutSubstream,
|
||||
NotificationsHandshakeError, UpgradeCollec
|
||||
},
|
||||
};
|
||||
|
||||
use bytes::BytesMut;
|
||||
use libp2p::core::{ConnectedPoint, PeerId, upgrade::{InboundUpgrade, OutboundUpgrade}};
|
||||
use libp2p::swarm::{
|
||||
ProtocolsHandler, ProtocolsHandlerEvent,
|
||||
IntoProtocolsHandler,
|
||||
KeepAlive,
|
||||
ProtocolsHandlerUpgrErr,
|
||||
SubstreamProtocol,
|
||||
NegotiatedSubstream,
|
||||
};
|
||||
use futures::{
|
||||
channel::mpsc,
|
||||
lock::{Mutex as FuturesMutex, MutexGuard as FuturesMutexGuard},
|
||||
prelude::*
|
||||
};
|
||||
use log::error;
|
||||
use parking_lot::{Mutex, RwLock};
|
||||
use std::{borrow::Cow, collections::VecDeque, mem, pin::Pin, str, sync::Arc, task::{Context, Poll}, time::Duration};
|
||||
use wasm_timer::Instant;
|
||||
|
||||
/// Number of pending notifications in asynchronous contexts.
|
||||
/// See [`NotificationsSink::reserve_notification`] for context.
|
||||
const ASYNC_NOTIFICATIONS_BUFFER_SIZE: usize = 8;
|
||||
|
||||
/// Number of pending notifications in synchronous contexts.
|
||||
const SYNC_NOTIFICATIONS_BUFFER_SIZE: usize = 2048;
|
||||
|
||||
/// Maximum duration to open a substream and receive the handshake message. After that, we
|
||||
/// consider that we failed to open the substream.
|
||||
const OPEN_TIMEOUT: Duration = Duration::from_secs(10);
|
||||
|
||||
/// After successfully establishing a connection with the remote, we keep the connection open for
|
||||
/// at least this amount of time in order to give the rest of the code the chance to notify us to
|
||||
/// open substreams.
|
||||
const INITIAL_KEEPALIVE_TIME: Duration = Duration::from_secs(5);
|
||||
|
||||
/// Implements the `IntoProtocolsHandler` trait of libp2p.
|
||||
///
|
||||
/// Every time a connection with a remote starts, an instance of this struct is created and
|
||||
/// sent to a background task dedicated to this connection. Once the connection is established,
|
||||
/// it is turned into a [`NotifsHandler`].
|
||||
///
|
||||
/// See the documentation at the module level for more information.
|
||||
pub struct NotifsHandlerProto {
|
||||
/// Name of protocols, prototypes for upgrades for inbound substreams, and the message we
|
||||
/// send or respond with in the handshake.
|
||||
protocols: Vec<(Cow<'static, str>, NotificationsIn, Arc<RwLock<Vec<u8>>>, u64)>,
|
||||
}
|
||||
|
||||
/// The actual handler once the connection has been established.
|
||||
///
|
||||
/// See the documentation at the module level for more information.
|
||||
pub struct NotifsHandler {
|
||||
/// List of notification protocols, specified by the user at initialization.
|
||||
protocols: Vec<Protocol>,
|
||||
|
||||
/// When the connection with the remote has been successfully established.
|
||||
when_connection_open: Instant,
|
||||
|
||||
/// Whether we are the connection dialer or listener.
|
||||
endpoint: ConnectedPoint,
|
||||
|
||||
/// Remote we are connected to.
|
||||
peer_id: PeerId,
|
||||
|
||||
/// Events to return in priority from `poll`.
|
||||
events_queue: VecDeque<
|
||||
ProtocolsHandlerEvent<NotificationsOut, usize, NotifsHandlerOut, NotifsHandlerError>
|
||||
>,
|
||||
}
|
||||
|
||||
/// Fields specific for each individual protocol.
|
||||
struct Protocol {
|
||||
/// Name of the protocol.
|
||||
name: Cow<'static, str>,
|
||||
|
||||
/// Prototype for the inbound upgrade.
|
||||
in_upgrade: NotificationsIn,
|
||||
|
||||
/// Handshake to send when opening a substream or receiving an open request.
|
||||
handshake: Arc<RwLock<Vec<u8>>>,
|
||||
|
||||
/// Maximum allowed size of individual notifications.
|
||||
max_notification_size: u64,
|
||||
|
||||
/// Current state of the substreams for this protocol.
|
||||
state: State,
|
||||
}
|
||||
|
||||
/// See the module-level documentation to learn about the meaning of these variants.
|
||||
enum State {
|
||||
/// Protocol is in the "Closed" state.
|
||||
Closed {
|
||||
/// True if an outgoing substream is still in the process of being opened.
|
||||
pending_opening: bool,
|
||||
},
|
||||
|
||||
/// Protocol is in the "Closed" state. A [`NotifsHandlerOut::OpenDesiredByRemote`] has been
|
||||
/// emitted.
|
||||
OpenDesiredByRemote {
|
||||
/// Substream opened by the remote and that hasn't been accepted/rejected yet.
|
||||
in_substream: NotificationsInSubstream<NegotiatedSubstream>,
|
||||
|
||||
/// See [`State::Closed::pending_opening`].
|
||||
pending_opening: bool,
|
||||
},
|
||||
|
||||
/// Protocol is in the "Closed" state, but has received a [`NotifsHandlerIn::Open`] and is
|
||||
/// consequently trying to open the various notifications substreams.
|
||||
///
|
||||
/// A [`NotifsHandlerOut::OpenResultOk`] or a [`NotifsHandlerOut::OpenResultErr`] event must
|
||||
/// be emitted when transitionning to respectively [`State::Open`] or [`State::Closed`].
|
||||
Opening {
|
||||
/// Substream opened by the remote. If `Some`, has been accepted.
|
||||
in_substream: Option<NotificationsInSubstream<NegotiatedSubstream>>,
|
||||
},
|
||||
|
||||
/// Protocol is in the "Open" state.
|
||||
Open {
|
||||
/// Contains the two `Receiver`s connected to the [`NotificationsSink`] that has been
|
||||
/// sent out. The notifications to send out can be pulled from this receivers.
|
||||
/// We use two different channels in order to have two different channel sizes, but from
|
||||
/// the receiving point of view, the two channels are the same.
|
||||
/// The receivers are fused in case the user drops the [`NotificationsSink`] entirely.
|
||||
notifications_sink_rx: stream::Select<
|
||||
stream::Fuse<mpsc::Receiver<NotificationsSinkMessage>>,
|
||||
stream::Fuse<mpsc::Receiver<NotificationsSinkMessage>>
|
||||
>,
|
||||
|
||||
/// Outbound substream that has been accepted by the remote.
|
||||
///
|
||||
/// Always `Some` on transition to [`State::Open`]. Switched to `None` only if the remote
|
||||
/// closed the substream. If `None`, a [`NotifsHandlerOut::CloseDesired`] event has been
|
||||
/// emitted.
|
||||
out_substream: Option<NotificationsOutSubstream<NegotiatedSubstream>>,
|
||||
|
||||
/// Substream opened by the remote.
|
||||
///
|
||||
/// Contrary to the `out_substream` field, operations continue as normal even if the
|
||||
/// substream has been closed by the remote. A `None` is treated the same way as if there
|
||||
/// was an idle substream.
|
||||
in_substream: Option<NotificationsInSubstream<NegotiatedSubstream>>,
|
||||
},
|
||||
}
|
||||
|
||||
impl IntoProtocolsHandler for NotifsHandlerProto {
|
||||
type Handler = NotifsHandler;
|
||||
|
||||
fn inbound_protocol(&self) -> UpgradeCollec<NotificationsIn> {
|
||||
self.protocols.iter()
|
||||
.map(|(_, p, _, _)| p.clone())
|
||||
.collect::<UpgradeCollec<_>>()
|
||||
}
|
||||
|
||||
fn into_handler(self, peer_id: &PeerId, connected_point: &ConnectedPoint) -> Self::Handler {
|
||||
NotifsHandler {
|
||||
protocols: self.protocols.into_iter().map(|(name, in_upgrade, handshake, max_size)| {
|
||||
Protocol {
|
||||
name,
|
||||
in_upgrade,
|
||||
handshake,
|
||||
state: State::Closed {
|
||||
pending_opening: false,
|
||||
},
|
||||
max_notification_size: max_size,
|
||||
}
|
||||
}).collect(),
|
||||
peer_id: peer_id.clone(),
|
||||
endpoint: connected_point.clone(),
|
||||
when_connection_open: Instant::now(),
|
||||
events_queue: VecDeque::with_capacity(16),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Event that can be received by a `NotifsHandler`.
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum NotifsHandlerIn {
|
||||
/// Instruct the handler to open the notification substreams.
|
||||
///
|
||||
/// Must always be answered by a [`NotifsHandlerOut::OpenResultOk`] or a
|
||||
/// [`NotifsHandlerOut::OpenResultErr`] event.
|
||||
///
|
||||
/// Importantly, it is forbidden to send a [`NotifsHandlerIn::Open`] while a previous one is
|
||||
/// already in the fly. It is however possible if a `Close` is still in the fly.
|
||||
Open {
|
||||
/// Index of the protocol in the list of protocols passed at initialization.
|
||||
protocol_index: usize,
|
||||
},
|
||||
|
||||
/// Instruct the handler to close the notification substreams, or reject any pending incoming
|
||||
/// substream request.
|
||||
///
|
||||
/// Must always be answered by a [`NotifsHandlerOut::CloseResult`] event.
|
||||
Close {
|
||||
/// Index of the protocol in the list of protocols passed at initialization.
|
||||
protocol_index: usize,
|
||||
},
|
||||
}
|
||||
|
||||
/// Event that can be emitted by a `NotifsHandler`.
|
||||
#[derive(Debug)]
|
||||
pub enum NotifsHandlerOut {
|
||||
/// Acknowledges a [`NotifsHandlerIn::Open`].
|
||||
OpenResultOk {
|
||||
/// Index of the protocol in the list of protocols passed at initialization.
|
||||
protocol_index: usize,
|
||||
/// The endpoint of the connection that is open for custom protocols.
|
||||
endpoint: ConnectedPoint,
|
||||
/// Handshake that was sent to us.
|
||||
/// This is normally a "Status" message, but this out of the concern of this code.
|
||||
received_handshake: Vec<u8>,
|
||||
/// How notifications can be sent to this node.
|
||||
notifications_sink: NotificationsSink,
|
||||
},
|
||||
|
||||
/// Acknowledges a [`NotifsHandlerIn::Open`]. The remote has refused the attempt to open
|
||||
/// notification substreams.
|
||||
OpenResultErr {
|
||||
/// Index of the protocol in the list of protocols passed at initialization.
|
||||
protocol_index: usize,
|
||||
},
|
||||
|
||||
/// Acknowledges a [`NotifsHandlerIn::Close`].
|
||||
CloseResult {
|
||||
/// Index of the protocol in the list of protocols passed at initialization.
|
||||
protocol_index: usize,
|
||||
},
|
||||
|
||||
/// The remote would like the substreams to be open. Send a [`NotifsHandlerIn::Open`] or a
|
||||
/// [`NotifsHandlerIn::Close`] in order to either accept or deny this request. If a
|
||||
/// [`NotifsHandlerIn::Open`] or [`NotifsHandlerIn::Close`] has been sent before and has not
|
||||
/// yet been acknowledged by a matching [`NotifsHandlerOut`], then you don't need to a send
|
||||
/// another [`NotifsHandlerIn`].
|
||||
OpenDesiredByRemote {
|
||||
/// Index of the protocol in the list of protocols passed at initialization.
|
||||
protocol_index: usize,
|
||||
},
|
||||
|
||||
/// The remote would like the substreams to be closed. Send a [`NotifsHandlerIn::Close`] in
|
||||
/// order to close them. If a [`NotifsHandlerIn::Close`] has been sent before and has not yet
|
||||
/// been acknowledged by a [`NotifsHandlerOut::CloseResult`], then you don't need to a send
|
||||
/// another one.
|
||||
CloseDesired {
|
||||
/// Index of the protocol in the list of protocols passed at initialization.
|
||||
protocol_index: usize,
|
||||
},
|
||||
|
||||
/// Received a message on a custom protocol substream.
|
||||
///
|
||||
/// Can only happen when the handler is in the open state.
|
||||
Notification {
|
||||
/// Index of the protocol in the list of protocols passed at initialization.
|
||||
protocol_index: usize,
|
||||
/// Message that has been received.
|
||||
message: BytesMut,
|
||||
},
|
||||
}
|
||||
|
||||
/// Sink connected directly to the node background task. Allows sending notifications to the peer.
|
||||
///
|
||||
/// Can be cloned in order to obtain multiple references to the substream of the same peer.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct NotificationsSink {
|
||||
inner: Arc<NotificationsSinkInner>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct NotificationsSinkInner {
|
||||
/// Target of the sink.
|
||||
peer_id: PeerId,
|
||||
/// Sender to use in asynchronous contexts. Uses an asynchronous mutex.
|
||||
async_channel: FuturesMutex<mpsc::Sender<NotificationsSinkMessage>>,
|
||||
/// Sender to use in synchronous contexts. Uses a synchronous mutex.
|
||||
/// This channel has a large capacity and is meant to be used in contexts where
|
||||
/// back-pressure cannot be properly exerted.
|
||||
/// It will be removed in a future version.
|
||||
sync_channel: Mutex<mpsc::Sender<NotificationsSinkMessage>>,
|
||||
}
|
||||
|
||||
/// Message emitted through the [`NotificationsSink`] and processed by the background task
|
||||
/// dedicated to the peer.
|
||||
#[derive(Debug)]
|
||||
enum NotificationsSinkMessage {
|
||||
/// Message emitted by [`NotificationsSink::reserve_notification`] and
|
||||
/// [`NotificationsSink::write_notification_now`].
|
||||
Notification {
|
||||
message: Vec<u8>,
|
||||
},
|
||||
|
||||
/// Must close the connection.
|
||||
ForceClose,
|
||||
}
|
||||
|
||||
impl NotificationsSink {
|
||||
/// Returns the [`PeerId`] the sink is connected to.
|
||||
pub fn peer_id(&self) -> &PeerId {
|
||||
&self.inner.peer_id
|
||||
}
|
||||
|
||||
/// Sends a notification to the peer.
|
||||
///
|
||||
/// If too many messages are already buffered, the notification is silently discarded and the
|
||||
/// connection to the peer will be closed shortly after.
|
||||
///
|
||||
/// The protocol name is expected to be checked ahead of calling this method. It is a logic
|
||||
/// error to send a notification using an unknown protocol.
|
||||
///
|
||||
/// This method will be removed in a future version.
|
||||
pub fn send_sync_notification<'a>(
|
||||
&'a self,
|
||||
message: impl Into<Vec<u8>>
|
||||
) {
|
||||
let mut lock = self.inner.sync_channel.lock();
|
||||
let result = lock.try_send(NotificationsSinkMessage::Notification {
|
||||
message: message.into()
|
||||
});
|
||||
|
||||
if result.is_err() {
|
||||
// Cloning the `mpsc::Sender` guarantees the allocation of an extra spot in the
|
||||
// buffer, and therefore `try_send` will succeed.
|
||||
let _result2 = lock.clone().try_send(NotificationsSinkMessage::ForceClose);
|
||||
debug_assert!(_result2.map(|()| true).unwrap_or_else(|err| err.is_disconnected()));
|
||||
}
|
||||
}
|
||||
|
||||
/// Wait until the remote is ready to accept a notification.
|
||||
///
|
||||
/// Returns an error in the case where the connection is closed.
|
||||
///
|
||||
/// The protocol name is expected to be checked ahead of calling this method. It is a logic
|
||||
/// error to send a notification using an unknown protocol.
|
||||
pub async fn reserve_notification<'a>(&'a self) -> Result<Ready<'a>, ()> {
|
||||
let mut lock = self.inner.async_channel.lock().await;
|
||||
|
||||
let poll_ready = future::poll_fn(|cx| lock.poll_ready(cx)).await;
|
||||
if poll_ready.is_ok() {
|
||||
Ok(Ready { lock })
|
||||
} else {
|
||||
Err(())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Notification slot is reserved and the notification can actually be sent.
|
||||
#[must_use]
|
||||
#[derive(Debug)]
|
||||
pub struct Ready<'a> {
|
||||
/// Guarded channel. The channel inside is guaranteed to not be full.
|
||||
lock: FuturesMutexGuard<'a, mpsc::Sender<NotificationsSinkMessage>>,
|
||||
}
|
||||
|
||||
impl<'a> Ready<'a> {
|
||||
/// Consumes this slots reservation and actually queues the notification.
|
||||
///
|
||||
/// Returns an error if the substream has been closed.
|
||||
pub fn send(
|
||||
mut self,
|
||||
notification: impl Into<Vec<u8>>
|
||||
) -> Result<(), ()> {
|
||||
self.lock.start_send(NotificationsSinkMessage::Notification {
|
||||
message: notification.into(),
|
||||
}).map_err(|_| ())
|
||||
}
|
||||
}
|
||||
|
||||
/// Error specific to the collection of protocols.
|
||||
#[derive(Debug, derive_more::Display, derive_more::Error)]
|
||||
pub enum NotifsHandlerError {
|
||||
/// Channel of synchronous notifications is full.
|
||||
SyncNotificationsClogged,
|
||||
}
|
||||
|
||||
impl NotifsHandlerProto {
|
||||
/// Builds a new handler.
|
||||
///
|
||||
/// `list` is a list of notification protocols names, the message to send as part of the
|
||||
/// handshake, and the maximum allowed size of a notification. At the moment, the message
|
||||
/// is always the same whether we open a substream ourselves or respond to handshake from
|
||||
/// the remote.
|
||||
pub fn new(
|
||||
list: impl Into<Vec<(Cow<'static, str>, Arc<RwLock<Vec<u8>>>, u64)>>,
|
||||
) -> Self {
|
||||
let protocols = list
|
||||
.into()
|
||||
.into_iter()
|
||||
.map(|(proto_name, msg, max_notif_size)| {
|
||||
(proto_name.clone(), NotificationsIn::new(proto_name, max_notif_size), msg, max_notif_size)
|
||||
})
|
||||
.collect();
|
||||
|
||||
NotifsHandlerProto {
|
||||
protocols,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ProtocolsHandler for NotifsHandler {
|
||||
type InEvent = NotifsHandlerIn;
|
||||
type OutEvent = NotifsHandlerOut;
|
||||
type Error = NotifsHandlerError;
|
||||
type InboundProtocol = UpgradeCollec<NotificationsIn>;
|
||||
type OutboundProtocol = NotificationsOut;
|
||||
// Index within the `out_protocols`.
|
||||
type OutboundOpenInfo = usize;
|
||||
type InboundOpenInfo = ();
|
||||
|
||||
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, ()> {
|
||||
let protocols = self.protocols.iter()
|
||||
.map(|p| p.in_upgrade.clone())
|
||||
.collect::<UpgradeCollec<_>>();
|
||||
|
||||
SubstreamProtocol::new(protocols, ())
|
||||
}
|
||||
|
||||
fn inject_fully_negotiated_inbound(
|
||||
&mut self,
|
||||
((_remote_handshake, mut new_substream), protocol_index):
|
||||
<Self::InboundProtocol as InboundUpgrade<NegotiatedSubstream>>::Output,
|
||||
(): ()
|
||||
) {
|
||||
let mut protocol_info = &mut self.protocols[protocol_index];
|
||||
match protocol_info.state {
|
||||
State::Closed { pending_opening } => {
|
||||
self.events_queue.push_back(ProtocolsHandlerEvent::Custom(
|
||||
NotifsHandlerOut::OpenDesiredByRemote {
|
||||
protocol_index,
|
||||
}
|
||||
));
|
||||
|
||||
protocol_info.state = State::OpenDesiredByRemote {
|
||||
in_substream: new_substream,
|
||||
pending_opening,
|
||||
};
|
||||
},
|
||||
State::OpenDesiredByRemote { .. } => {
|
||||
// If a substream already exists, silently drop the new one.
|
||||
// Note that we drop the substream, which will send an equivalent to a
|
||||
// TCP "RST" to the remote and force-close the substream. It might
|
||||
// seem like an unclean way to get rid of a substream. However, keep
|
||||
// in mind that it is invalid for the remote to open multiple such
|
||||
// substreams, and therefore sending a "RST" is the most correct thing
|
||||
// to do.
|
||||
return;
|
||||
},
|
||||
State::Opening { ref mut in_substream, .. } |
|
||||
State::Open { ref mut in_substream, .. } => {
|
||||
if in_substream.is_some() {
|
||||
// Same remark as above.
|
||||
return;
|
||||
}
|
||||
|
||||
// Create `handshake_message` on a separate line to be sure that the
|
||||
// lock is released as soon as possible.
|
||||
let handshake_message = protocol_info.handshake.read().clone();
|
||||
new_substream.send_handshake(handshake_message);
|
||||
*in_substream = Some(new_substream);
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
fn inject_fully_negotiated_outbound(
|
||||
&mut self,
|
||||
(handshake, substream): <Self::OutboundProtocol as OutboundUpgrade<NegotiatedSubstream>>::Output,
|
||||
protocol_index: Self::OutboundOpenInfo
|
||||
) {
|
||||
match self.protocols[protocol_index].state {
|
||||
State::Closed { ref mut pending_opening } |
|
||||
State::OpenDesiredByRemote { ref mut pending_opening, .. } => {
|
||||
debug_assert!(*pending_opening);
|
||||
*pending_opening = false;
|
||||
}
|
||||
State::Open { .. } => {
|
||||
error!(target: "sub-libp2p", "☎️ State mismatch in notifications handler");
|
||||
debug_assert!(false);
|
||||
}
|
||||
State::Opening { ref mut in_substream } => {
|
||||
let (async_tx, async_rx) = mpsc::channel(ASYNC_NOTIFICATIONS_BUFFER_SIZE);
|
||||
let (sync_tx, sync_rx) = mpsc::channel(SYNC_NOTIFICATIONS_BUFFER_SIZE);
|
||||
let notifications_sink = NotificationsSink {
|
||||
inner: Arc::new(NotificationsSinkInner {
|
||||
peer_id: self.peer_id.clone(),
|
||||
async_channel: FuturesMutex::new(async_tx),
|
||||
sync_channel: Mutex::new(sync_tx),
|
||||
}),
|
||||
};
|
||||
|
||||
self.protocols[protocol_index].state = State::Open {
|
||||
notifications_sink_rx: stream::select(async_rx.fuse(), sync_rx.fuse()),
|
||||
out_substream: Some(substream),
|
||||
in_substream: in_substream.take(),
|
||||
};
|
||||
|
||||
self.events_queue.push_back(ProtocolsHandlerEvent::Custom(
|
||||
NotifsHandlerOut::OpenResultOk {
|
||||
protocol_index,
|
||||
endpoint: self.endpoint.clone(),
|
||||
received_handshake: handshake,
|
||||
notifications_sink
|
||||
}
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn inject_event(&mut self, message: NotifsHandlerIn) {
|
||||
match message {
|
||||
NotifsHandlerIn::Open { protocol_index } => {
|
||||
let protocol_info = &mut self.protocols[protocol_index];
|
||||
match &mut protocol_info.state {
|
||||
State::Closed { pending_opening } => {
|
||||
if !*pending_opening {
|
||||
let proto = NotificationsOut::new(
|
||||
protocol_info.name.clone(),
|
||||
protocol_info.handshake.read().clone(),
|
||||
protocol_info.max_notification_size
|
||||
);
|
||||
|
||||
self.events_queue.push_back(ProtocolsHandlerEvent::OutboundSubstreamRequest {
|
||||
protocol: SubstreamProtocol::new(proto, protocol_index)
|
||||
.with_timeout(OPEN_TIMEOUT),
|
||||
});
|
||||
}
|
||||
|
||||
protocol_info.state = State::Opening {
|
||||
in_substream: None,
|
||||
};
|
||||
},
|
||||
State::OpenDesiredByRemote { pending_opening, in_substream } => {
|
||||
let handshake_message = protocol_info.handshake.read().clone();
|
||||
|
||||
if !*pending_opening {
|
||||
let proto = NotificationsOut::new(
|
||||
protocol_info.name.clone(),
|
||||
handshake_message.clone(),
|
||||
protocol_info.max_notification_size,
|
||||
);
|
||||
|
||||
self.events_queue.push_back(ProtocolsHandlerEvent::OutboundSubstreamRequest {
|
||||
protocol: SubstreamProtocol::new(proto, protocol_index)
|
||||
.with_timeout(OPEN_TIMEOUT),
|
||||
});
|
||||
}
|
||||
|
||||
in_substream.send_handshake(handshake_message);
|
||||
|
||||
// The state change is done in two steps because of borrowing issues.
|
||||
let in_substream = match
|
||||
mem::replace(&mut protocol_info.state, State::Opening { in_substream: None })
|
||||
{
|
||||
State::OpenDesiredByRemote { in_substream, .. } => in_substream,
|
||||
_ => unreachable!()
|
||||
};
|
||||
protocol_info.state = State::Opening {
|
||||
in_substream: Some(in_substream),
|
||||
};
|
||||
},
|
||||
State::Opening { .. } |
|
||||
State::Open { .. } => {
|
||||
// As documented, it is forbidden to send an `Open` while there is already
|
||||
// one in the fly.
|
||||
error!(target: "sub-libp2p", "opening already-opened handler");
|
||||
debug_assert!(false);
|
||||
},
|
||||
}
|
||||
},
|
||||
|
||||
NotifsHandlerIn::Close { protocol_index } => {
|
||||
match self.protocols[protocol_index].state {
|
||||
State::Open { .. } => {
|
||||
self.protocols[protocol_index].state = State::Closed {
|
||||
pending_opening: false,
|
||||
};
|
||||
},
|
||||
State::Opening { .. } => {
|
||||
self.protocols[protocol_index].state = State::Closed {
|
||||
pending_opening: true,
|
||||
};
|
||||
|
||||
self.events_queue.push_back(ProtocolsHandlerEvent::Custom(
|
||||
NotifsHandlerOut::OpenResultErr {
|
||||
protocol_index,
|
||||
}
|
||||
));
|
||||
},
|
||||
State::OpenDesiredByRemote { pending_opening, .. } => {
|
||||
self.protocols[protocol_index].state = State::Closed {
|
||||
pending_opening,
|
||||
};
|
||||
}
|
||||
State::Closed { .. } => {},
|
||||
}
|
||||
|
||||
self.events_queue.push_back(
|
||||
ProtocolsHandlerEvent::Custom(NotifsHandlerOut::CloseResult {
|
||||
protocol_index,
|
||||
})
|
||||
);
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
fn inject_dial_upgrade_error(
|
||||
&mut self,
|
||||
num: usize,
|
||||
_: ProtocolsHandlerUpgrErr<NotificationsHandshakeError>
|
||||
) {
|
||||
match self.protocols[num].state {
|
||||
State::Closed { ref mut pending_opening } |
|
||||
State::OpenDesiredByRemote { ref mut pending_opening, .. } => {
|
||||
debug_assert!(*pending_opening);
|
||||
*pending_opening = false;
|
||||
}
|
||||
|
||||
State::Opening { .. } => {
|
||||
self.protocols[num].state = State::Closed {
|
||||
pending_opening: false,
|
||||
};
|
||||
|
||||
self.events_queue.push_back(ProtocolsHandlerEvent::Custom(
|
||||
NotifsHandlerOut::OpenResultErr {
|
||||
protocol_index: num,
|
||||
}
|
||||
));
|
||||
}
|
||||
|
||||
// No substream is being open when already `Open`.
|
||||
State::Open { .. } => debug_assert!(false),
|
||||
}
|
||||
}
|
||||
|
||||
fn connection_keep_alive(&self) -> KeepAlive {
|
||||
// `Yes` if any protocol has some activity.
|
||||
if self.protocols.iter().any(|p| !matches!(p.state, State::Closed { .. })) {
|
||||
return KeepAlive::Yes;
|
||||
}
|
||||
|
||||
// A grace period of `INITIAL_KEEPALIVE_TIME` must be given to leave time for the remote
|
||||
// to express desire to open substreams.
|
||||
KeepAlive::Until(self.when_connection_open + INITIAL_KEEPALIVE_TIME)
|
||||
}
|
||||
|
||||
fn poll(
|
||||
&mut self,
|
||||
cx: &mut Context,
|
||||
) -> Poll<
|
||||
ProtocolsHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::OutEvent, Self::Error>
|
||||
> {
|
||||
if let Some(ev) = self.events_queue.pop_front() {
|
||||
return Poll::Ready(ev);
|
||||
}
|
||||
|
||||
for protocol_index in 0..self.protocols.len() {
|
||||
// Poll inbound substreams.
|
||||
// Inbound substreams being closed is always tolerated, except for the
|
||||
// `OpenDesiredByRemote` state which might need to be switched back to `Closed`.
|
||||
match &mut self.protocols[protocol_index].state {
|
||||
State::Closed { .. } |
|
||||
State::Open { in_substream: None, .. } |
|
||||
State::Opening { in_substream: None } => {}
|
||||
|
||||
State::Open { in_substream: in_substream @ Some(_), .. } => {
|
||||
match Stream::poll_next(Pin::new(in_substream.as_mut().unwrap()), cx) {
|
||||
Poll::Pending => {},
|
||||
Poll::Ready(Some(Ok(message))) => {
|
||||
let event = NotifsHandlerOut::Notification {
|
||||
protocol_index,
|
||||
message,
|
||||
};
|
||||
return Poll::Ready(ProtocolsHandlerEvent::Custom(event))
|
||||
},
|
||||
Poll::Ready(None) | Poll::Ready(Some(Err(_))) =>
|
||||
*in_substream = None,
|
||||
}
|
||||
}
|
||||
|
||||
State::OpenDesiredByRemote { in_substream, pending_opening } => {
|
||||
match NotificationsInSubstream::poll_process(Pin::new(in_substream), cx) {
|
||||
Poll::Pending => {},
|
||||
Poll::Ready(Ok(void)) => match void {},
|
||||
Poll::Ready(Err(_)) => {
|
||||
self.protocols[protocol_index].state = State::Closed {
|
||||
pending_opening: *pending_opening,
|
||||
};
|
||||
return Poll::Ready(ProtocolsHandlerEvent::Custom(
|
||||
NotifsHandlerOut::CloseDesired { protocol_index }
|
||||
))
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
State::Opening { in_substream: in_substream @ Some(_), .. } => {
|
||||
match NotificationsInSubstream::poll_process(Pin::new(in_substream.as_mut().unwrap()), cx) {
|
||||
Poll::Pending => {},
|
||||
Poll::Ready(Ok(void)) => match void {},
|
||||
Poll::Ready(Err(_)) => *in_substream = None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Poll outbound substream.
|
||||
match &mut self.protocols[protocol_index].state {
|
||||
State::Open { out_substream: out_substream @ Some(_), .. } => {
|
||||
match Sink::poll_flush(Pin::new(out_substream.as_mut().unwrap()), cx) {
|
||||
Poll::Pending | Poll::Ready(Ok(())) => {},
|
||||
Poll::Ready(Err(_)) => {
|
||||
*out_substream = None;
|
||||
let event = NotifsHandlerOut::CloseDesired { protocol_index };
|
||||
return Poll::Ready(ProtocolsHandlerEvent::Custom(event));
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
State::Closed { .. } |
|
||||
State::Opening { .. } |
|
||||
State::Open { out_substream: None, .. } |
|
||||
State::OpenDesiredByRemote { .. } => {}
|
||||
}
|
||||
|
||||
if let State::Open { notifications_sink_rx, out_substream: Some(out_substream), .. }
|
||||
= &mut self.protocols[protocol_index].state
|
||||
{
|
||||
loop {
|
||||
// Before we poll the notifications sink receiver, check that the substream
|
||||
// is ready to accept a message.
|
||||
match out_substream.poll_ready_unpin(cx) {
|
||||
Poll::Ready(_) => {},
|
||||
Poll::Pending => break
|
||||
}
|
||||
|
||||
// Now that all substreams are ready for a message, grab what to send.
|
||||
let message = match notifications_sink_rx.poll_next_unpin(cx) {
|
||||
Poll::Ready(Some(msg)) => msg,
|
||||
Poll::Ready(None) | Poll::Pending => break,
|
||||
};
|
||||
|
||||
match message {
|
||||
NotificationsSinkMessage::Notification { message } => {
|
||||
let _ = out_substream.start_send_unpin(message);
|
||||
|
||||
// Calling `start_send_unpin` only queues the message. Actually
|
||||
// emitting the message is done with `poll_flush`. In order to
|
||||
// not introduce too much complexity, this flushing is done earlier
|
||||
// in the body of this `poll()` method. As such, we schedule a task
|
||||
// wake-up now in order to guarantee that `poll()` will be called
|
||||
// again and the flush happening.
|
||||
// At the time of the writing of this comment, a rewrite of this
|
||||
// code is being planned. If you find this comment in the wild and
|
||||
// the rewrite didn't happen, please consider a refactor.
|
||||
cx.waker().wake_by_ref();
|
||||
}
|
||||
NotificationsSinkMessage::ForceClose => {
|
||||
return Poll::Ready(
|
||||
ProtocolsHandlerEvent::Close(NotifsHandlerError::SyncNotificationsClogged)
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,319 @@
|
||||
// This file is part of Substrate.
|
||||
|
||||
// Copyright (C) 2019-2021 Parity Technologies (UK) Ltd.
|
||||
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
|
||||
|
||||
// This program is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// This program is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
|
||||
#![cfg(test)]
|
||||
|
||||
use crate::protocol::notifications::{Notifications, NotificationsOut};
|
||||
|
||||
use futures::prelude::*;
|
||||
use libp2p::{PeerId, Multiaddr, Transport};
|
||||
use libp2p::core::{
|
||||
connection::{ConnectionId, ListenerId},
|
||||
ConnectedPoint,
|
||||
transport::MemoryTransport,
|
||||
upgrade
|
||||
};
|
||||
use libp2p::{identity, noise, yamux};
|
||||
use libp2p::swarm::{
|
||||
Swarm, ProtocolsHandler, IntoProtocolsHandler, PollParameters,
|
||||
NetworkBehaviour, NetworkBehaviourAction
|
||||
};
|
||||
use std::{error, io, iter, task::{Context, Poll}, time::Duration};
|
||||
|
||||
/// Builds two nodes that have each other as bootstrap nodes.
|
||||
/// This is to be used only for testing, and a panic will happen if something goes wrong.
|
||||
fn build_nodes() -> (Swarm<CustomProtoWithAddr>, Swarm<CustomProtoWithAddr>) {
|
||||
let mut out = Vec::with_capacity(2);
|
||||
|
||||
let keypairs: Vec<_> = (0..2).map(|_| identity::Keypair::generate_ed25519()).collect();
|
||||
let addrs: Vec<Multiaddr> = (0..2)
|
||||
.map(|_| format!("/memory/{}", rand::random::<u64>()).parse().unwrap())
|
||||
.collect();
|
||||
|
||||
for index in 0 .. 2 {
|
||||
let keypair = keypairs[index].clone();
|
||||
|
||||
let noise_keys = noise::Keypair::<noise::X25519Spec>::new()
|
||||
.into_authentic(&keypair)
|
||||
.unwrap();
|
||||
|
||||
let transport = MemoryTransport
|
||||
.upgrade(upgrade::Version::V1)
|
||||
.authenticate(noise::NoiseConfig::xx(noise_keys).into_authenticated())
|
||||
.multiplex(yamux::YamuxConfig::default())
|
||||
.timeout(Duration::from_secs(20))
|
||||
.boxed();
|
||||
|
||||
let (peerset, _) = sc_peerset::Peerset::from_config(sc_peerset::PeersetConfig {
|
||||
sets: vec![
|
||||
sc_peerset::SetConfig {
|
||||
in_peers: 25,
|
||||
out_peers: 25,
|
||||
bootnodes: if index == 0 {
|
||||
keypairs
|
||||
.iter()
|
||||
.skip(1)
|
||||
.map(|keypair| keypair.public().into_peer_id())
|
||||
.collect()
|
||||
} else {
|
||||
vec![]
|
||||
},
|
||||
reserved_nodes: Default::default(),
|
||||
reserved_only: false,
|
||||
}
|
||||
],
|
||||
});
|
||||
|
||||
let behaviour = CustomProtoWithAddr {
|
||||
inner: Notifications::new(peerset, iter::once(("/foo".into(), Vec::new(), 1024 * 1024))),
|
||||
addrs: addrs
|
||||
.iter()
|
||||
.enumerate()
|
||||
.filter_map(|(n, a)| if n != index {
|
||||
Some((keypairs[n].public().into_peer_id(), a.clone()))
|
||||
} else {
|
||||
None
|
||||
})
|
||||
.collect(),
|
||||
};
|
||||
|
||||
let mut swarm = Swarm::new(
|
||||
transport,
|
||||
behaviour,
|
||||
keypairs[index].public().into_peer_id()
|
||||
);
|
||||
Swarm::listen_on(&mut swarm, addrs[index].clone()).unwrap();
|
||||
out.push(swarm);
|
||||
}
|
||||
|
||||
// Final output
|
||||
let mut out_iter = out.into_iter();
|
||||
let first = out_iter.next().unwrap();
|
||||
let second = out_iter.next().unwrap();
|
||||
(first, second)
|
||||
}
|
||||
|
||||
/// Wraps around the `CustomBehaviour` network behaviour, and adds hardcoded node addresses to it.
|
||||
struct CustomProtoWithAddr {
|
||||
inner: Notifications,
|
||||
addrs: Vec<(PeerId, Multiaddr)>,
|
||||
}
|
||||
|
||||
impl std::ops::Deref for CustomProtoWithAddr {
|
||||
type Target = Notifications;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.inner
|
||||
}
|
||||
}
|
||||
|
||||
impl std::ops::DerefMut for CustomProtoWithAddr {
|
||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||
&mut self.inner
|
||||
}
|
||||
}
|
||||
|
||||
impl NetworkBehaviour for CustomProtoWithAddr {
|
||||
type ProtocolsHandler = <Notifications as NetworkBehaviour>::ProtocolsHandler;
|
||||
type OutEvent = <Notifications as NetworkBehaviour>::OutEvent;
|
||||
|
||||
fn new_handler(&mut self) -> Self::ProtocolsHandler {
|
||||
self.inner.new_handler()
|
||||
}
|
||||
|
||||
fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec<Multiaddr> {
|
||||
let mut list = self.inner.addresses_of_peer(peer_id);
|
||||
for (p, a) in self.addrs.iter() {
|
||||
if p == peer_id {
|
||||
list.push(a.clone());
|
||||
}
|
||||
}
|
||||
list
|
||||
}
|
||||
|
||||
fn inject_connected(&mut self, peer_id: &PeerId) {
|
||||
self.inner.inject_connected(peer_id)
|
||||
}
|
||||
|
||||
fn inject_disconnected(&mut self, peer_id: &PeerId) {
|
||||
self.inner.inject_disconnected(peer_id)
|
||||
}
|
||||
|
||||
fn inject_connection_established(&mut self, peer_id: &PeerId, conn: &ConnectionId, endpoint: &ConnectedPoint) {
|
||||
self.inner.inject_connection_established(peer_id, conn, endpoint)
|
||||
}
|
||||
|
||||
fn inject_connection_closed(&mut self, peer_id: &PeerId, conn: &ConnectionId, endpoint: &ConnectedPoint) {
|
||||
self.inner.inject_connection_closed(peer_id, conn, endpoint)
|
||||
}
|
||||
|
||||
fn inject_event(
|
||||
&mut self,
|
||||
peer_id: PeerId,
|
||||
connection: ConnectionId,
|
||||
event: <<Self::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::OutEvent
|
||||
) {
|
||||
self.inner.inject_event(peer_id, connection, event)
|
||||
}
|
||||
|
||||
fn poll(
|
||||
&mut self,
|
||||
cx: &mut Context,
|
||||
params: &mut impl PollParameters
|
||||
) -> Poll<
|
||||
NetworkBehaviourAction<
|
||||
<<Self::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::InEvent,
|
||||
Self::OutEvent
|
||||
>
|
||||
> {
|
||||
self.inner.poll(cx, params)
|
||||
}
|
||||
|
||||
fn inject_addr_reach_failure(&mut self, peer_id: Option<&PeerId>, addr: &Multiaddr, error: &dyn std::error::Error) {
|
||||
self.inner.inject_addr_reach_failure(peer_id, addr, error)
|
||||
}
|
||||
|
||||
fn inject_dial_failure(&mut self, peer_id: &PeerId) {
|
||||
self.inner.inject_dial_failure(peer_id)
|
||||
}
|
||||
|
||||
fn inject_new_listen_addr(&mut self, addr: &Multiaddr) {
|
||||
self.inner.inject_new_listen_addr(addr)
|
||||
}
|
||||
|
||||
fn inject_expired_listen_addr(&mut self, addr: &Multiaddr) {
|
||||
self.inner.inject_expired_listen_addr(addr)
|
||||
}
|
||||
|
||||
fn inject_new_external_addr(&mut self, addr: &Multiaddr) {
|
||||
self.inner.inject_new_external_addr(addr)
|
||||
}
|
||||
|
||||
fn inject_listener_error(&mut self, id: ListenerId, err: &(dyn error::Error + 'static)) {
|
||||
self.inner.inject_listener_error(id, err);
|
||||
}
|
||||
|
||||
fn inject_listener_closed(&mut self, id: ListenerId, reason: Result<(), &io::Error>) {
|
||||
self.inner.inject_listener_closed(id, reason);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn reconnect_after_disconnect() {
|
||||
// We connect two nodes together, then force a disconnect (through the API of the `Service`),
|
||||
// check that the disconnect worked, and finally check whether they successfully reconnect.
|
||||
|
||||
let (mut service1, mut service2) = build_nodes();
|
||||
|
||||
// For this test, the services can be in the following states.
|
||||
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
|
||||
enum ServiceState { NotConnected, FirstConnec, Disconnected, ConnectedAgain }
|
||||
let mut service1_state = ServiceState::NotConnected;
|
||||
let mut service2_state = ServiceState::NotConnected;
|
||||
|
||||
futures::executor::block_on(async move {
|
||||
loop {
|
||||
// Grab next event from services.
|
||||
let event = {
|
||||
let s1 = service1.next();
|
||||
let s2 = service2.next();
|
||||
futures::pin_mut!(s1, s2);
|
||||
match future::select(s1, s2).await {
|
||||
future::Either::Left((ev, _)) => future::Either::Left(ev),
|
||||
future::Either::Right((ev, _)) => future::Either::Right(ev),
|
||||
}
|
||||
};
|
||||
|
||||
match event {
|
||||
future::Either::Left(NotificationsOut::CustomProtocolOpen { .. }) => {
|
||||
match service1_state {
|
||||
ServiceState::NotConnected => {
|
||||
service1_state = ServiceState::FirstConnec;
|
||||
if service2_state == ServiceState::FirstConnec {
|
||||
service1.disconnect_peer(
|
||||
Swarm::local_peer_id(&service2),
|
||||
sc_peerset::SetId::from(0)
|
||||
);
|
||||
}
|
||||
},
|
||||
ServiceState::Disconnected => service1_state = ServiceState::ConnectedAgain,
|
||||
ServiceState::FirstConnec | ServiceState::ConnectedAgain => panic!(),
|
||||
}
|
||||
},
|
||||
future::Either::Left(NotificationsOut::CustomProtocolClosed { .. }) => {
|
||||
match service1_state {
|
||||
ServiceState::FirstConnec => service1_state = ServiceState::Disconnected,
|
||||
ServiceState::ConnectedAgain| ServiceState::NotConnected |
|
||||
ServiceState::Disconnected => panic!(),
|
||||
}
|
||||
},
|
||||
future::Either::Right(NotificationsOut::CustomProtocolOpen { .. }) => {
|
||||
match service2_state {
|
||||
ServiceState::NotConnected => {
|
||||
service2_state = ServiceState::FirstConnec;
|
||||
if service1_state == ServiceState::FirstConnec {
|
||||
service1.disconnect_peer(
|
||||
Swarm::local_peer_id(&service2),
|
||||
sc_peerset::SetId::from(0)
|
||||
);
|
||||
}
|
||||
},
|
||||
ServiceState::Disconnected => service2_state = ServiceState::ConnectedAgain,
|
||||
ServiceState::FirstConnec | ServiceState::ConnectedAgain => panic!(),
|
||||
}
|
||||
},
|
||||
future::Either::Right(NotificationsOut::CustomProtocolClosed { .. }) => {
|
||||
match service2_state {
|
||||
ServiceState::FirstConnec => service2_state = ServiceState::Disconnected,
|
||||
ServiceState::ConnectedAgain| ServiceState::NotConnected |
|
||||
ServiceState::Disconnected => panic!(),
|
||||
}
|
||||
},
|
||||
_ => {}
|
||||
}
|
||||
|
||||
if service1_state == ServiceState::ConnectedAgain && service2_state == ServiceState::ConnectedAgain {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Now that the two services have disconnected and reconnected, wait for 3 seconds and
|
||||
// check whether they're still connected.
|
||||
let mut delay = futures_timer::Delay::new(Duration::from_secs(3));
|
||||
|
||||
loop {
|
||||
// Grab next event from services.
|
||||
let event = {
|
||||
let s1 = service1.next();
|
||||
let s2 = service2.next();
|
||||
futures::pin_mut!(s1, s2);
|
||||
match future::select(future::select(s1, s2), &mut delay).await {
|
||||
future::Either::Right(_) => break, // success
|
||||
future::Either::Left((future::Either::Left((ev, _)), _)) => ev,
|
||||
future::Either::Left((future::Either::Right((ev, _)), _)) => ev,
|
||||
}
|
||||
};
|
||||
|
||||
match event {
|
||||
NotificationsOut::CustomProtocolOpen { .. } |
|
||||
NotificationsOut::CustomProtocolClosed { .. } => panic!(),
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
@@ -0,0 +1,30 @@
|
||||
// This file is part of Substrate.
|
||||
|
||||
// Copyright (C) 2018-2021 Parity Technologies (UK) Ltd.
|
||||
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
|
||||
|
||||
// This program is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// This program is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
|
||||
pub use self::collec::UpgradeCollec;
|
||||
pub use self::notifications::{
|
||||
NotificationsIn,
|
||||
NotificationsInSubstream,
|
||||
NotificationsOut,
|
||||
NotificationsOutSubstream,
|
||||
NotificationsHandshakeError,
|
||||
NotificationsOutError,
|
||||
};
|
||||
|
||||
mod collec;
|
||||
mod notifications;
|
||||
@@ -0,0 +1,95 @@
|
||||
// This file is part of Substrate.
|
||||
|
||||
// Copyright (C) 2018-2021 Parity Technologies (UK) Ltd.
|
||||
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
|
||||
|
||||
// This program is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// This program is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
|
||||
use futures::prelude::*;
|
||||
use libp2p::core::upgrade::{InboundUpgrade, ProtocolName, UpgradeInfo};
|
||||
use std::{iter::FromIterator, pin::Pin, task::{Context, Poll}, vec};
|
||||
|
||||
// TODO: move this to libp2p => https://github.com/libp2p/rust-libp2p/issues/1445
|
||||
|
||||
/// Upgrade that combines multiple upgrades of the same type into one. Supports all the protocols
|
||||
/// supported by either sub-upgrade.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct UpgradeCollec<T>(pub Vec<T>);
|
||||
|
||||
impl<T> From<Vec<T>> for UpgradeCollec<T> {
|
||||
fn from(list: Vec<T>) -> Self {
|
||||
UpgradeCollec(list)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> FromIterator<T> for UpgradeCollec<T> {
|
||||
fn from_iter<I: IntoIterator<Item = T>>(iter: I) -> Self {
|
||||
UpgradeCollec(iter.into_iter().collect())
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: UpgradeInfo> UpgradeInfo for UpgradeCollec<T> {
|
||||
type Info = ProtoNameWithUsize<T::Info>;
|
||||
type InfoIter = vec::IntoIter<Self::Info>;
|
||||
|
||||
fn protocol_info(&self) -> Self::InfoIter {
|
||||
self.0.iter().enumerate()
|
||||
.flat_map(|(n, p)|
|
||||
p.protocol_info().into_iter().map(move |i| ProtoNameWithUsize(i, n)))
|
||||
.collect::<Vec<_>>()
|
||||
.into_iter()
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, C> InboundUpgrade<C> for UpgradeCollec<T>
|
||||
where
|
||||
T: InboundUpgrade<C>,
|
||||
{
|
||||
type Output = (T::Output, usize);
|
||||
type Error = (T::Error, usize);
|
||||
type Future = FutWithUsize<T::Future>;
|
||||
|
||||
fn upgrade_inbound(mut self, sock: C, info: Self::Info) -> Self::Future {
|
||||
let fut = self.0.remove(info.1).upgrade_inbound(sock, info.0);
|
||||
FutWithUsize(fut, info.1)
|
||||
}
|
||||
}
|
||||
|
||||
/// Groups a `ProtocolName` with a `usize`.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct ProtoNameWithUsize<T>(T, usize);
|
||||
|
||||
impl<T: ProtocolName> ProtocolName for ProtoNameWithUsize<T> {
|
||||
fn protocol_name(&self) -> &[u8] {
|
||||
self.0.protocol_name()
|
||||
}
|
||||
}
|
||||
|
||||
/// Equivalent to `fut.map_ok(|v| (v, num)).map_err(|e| (e, num))`, where `fut` and `num` are
|
||||
/// the two fields of this struct.
|
||||
#[pin_project::pin_project]
|
||||
pub struct FutWithUsize<T>(#[pin] T, usize);
|
||||
|
||||
impl<T: Future<Output = Result<O, E>>, O, E> Future for FutWithUsize<T> {
|
||||
type Output = Result<(O, usize), (E, usize)>;
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
|
||||
let this = self.project();
|
||||
match Future::poll(this.0, cx) {
|
||||
Poll::Ready(Ok(v)) => Poll::Ready(Ok((v, *this.1))),
|
||||
Poll::Ready(Err(e)) => Poll::Ready(Err((e, *this.1))),
|
||||
Poll::Pending => Poll::Pending,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,624 @@
|
||||
// This file is part of Substrate.
|
||||
|
||||
// Copyright (C) 2019-2021 Parity Technologies (UK) Ltd.
|
||||
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
|
||||
|
||||
// This program is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// This program is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
|
||||
/// Notifications protocol.
|
||||
///
|
||||
/// The Substrate notifications protocol consists in the following:
|
||||
///
|
||||
/// - Node A opens a substream to node B and sends a message which contains some protocol-specific
|
||||
/// higher-level logic. This message is prefixed with a variable-length integer message length.
|
||||
/// This message can be empty, in which case `0` is sent.
|
||||
/// - If node B accepts the substream, it sends back a message with the same properties.
|
||||
/// - If instead B refuses the connection (which typically happens because no empty slot is
|
||||
/// available), then it immediately closes the substream without sending back anything.
|
||||
/// - Node A can then send notifications to B, prefixed with a variable-length integer indicating
|
||||
/// the length of the message.
|
||||
/// - Either node A or node B can signal that it doesn't want this notifications substream anymore
|
||||
/// by closing its writing side. The other party should respond by also closing their own
|
||||
/// writing side soon after.
|
||||
///
|
||||
/// Notification substreams are unidirectional. If A opens a substream with B, then B is
|
||||
/// encouraged but not required to open a substream to A as well.
|
||||
///
|
||||
|
||||
use bytes::BytesMut;
|
||||
use futures::prelude::*;
|
||||
use asynchronous_codec::Framed;
|
||||
use libp2p::core::{UpgradeInfo, InboundUpgrade, OutboundUpgrade, upgrade};
|
||||
use log::error;
|
||||
use std::{borrow::Cow, convert::{Infallible, TryFrom as _}, io, iter, mem, pin::Pin, task::{Context, Poll}};
|
||||
use unsigned_varint::codec::UviBytes;
|
||||
|
||||
/// Maximum allowed size of the two handshake messages, in bytes.
|
||||
const MAX_HANDSHAKE_SIZE: usize = 1024;
|
||||
|
||||
/// Upgrade that accepts a substream, sends back a status message, then becomes a unidirectional
|
||||
/// stream of messages.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct NotificationsIn {
|
||||
/// Protocol name to use when negotiating the substream.
|
||||
protocol_name: Cow<'static, str>,
|
||||
/// Maximum allowed size for a single notification.
|
||||
max_notification_size: u64,
|
||||
}
|
||||
|
||||
/// Upgrade that opens a substream, waits for the remote to accept by sending back a status
|
||||
/// message, then becomes a unidirectional sink of data.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct NotificationsOut {
|
||||
/// Protocol name to use when negotiating the substream.
|
||||
protocol_name: Cow<'static, str>,
|
||||
/// Message to send when we start the handshake.
|
||||
initial_message: Vec<u8>,
|
||||
/// Maximum allowed size for a single notification.
|
||||
max_notification_size: u64,
|
||||
}
|
||||
|
||||
/// A substream for incoming notification messages.
|
||||
///
|
||||
/// When creating, this struct starts in a state in which we must first send back a handshake
|
||||
/// message to the remote. No message will come before this has been done.
|
||||
#[pin_project::pin_project]
|
||||
pub struct NotificationsInSubstream<TSubstream> {
|
||||
#[pin]
|
||||
socket: Framed<TSubstream, UviBytes<io::Cursor<Vec<u8>>>>,
|
||||
handshake: NotificationsInSubstreamHandshake,
|
||||
}
|
||||
|
||||
/// State of the handshake sending back process.
|
||||
enum NotificationsInSubstreamHandshake {
|
||||
/// Waiting for the user to give us the handshake message.
|
||||
NotSent,
|
||||
/// User gave us the handshake message. Trying to push it in the socket.
|
||||
PendingSend(Vec<u8>),
|
||||
/// Handshake message was pushed in the socket. Still need to flush.
|
||||
Flush,
|
||||
/// Handshake message successfully sent and flushed.
|
||||
Sent,
|
||||
/// Remote has closed their writing side. We close our own writing side in return.
|
||||
ClosingInResponseToRemote,
|
||||
/// Both our side and the remote have closed their writing side.
|
||||
BothSidesClosed,
|
||||
}
|
||||
|
||||
/// A substream for outgoing notification messages.
|
||||
#[pin_project::pin_project]
|
||||
pub struct NotificationsOutSubstream<TSubstream> {
|
||||
/// Substream where to send messages.
|
||||
#[pin]
|
||||
socket: Framed<TSubstream, UviBytes<io::Cursor<Vec<u8>>>>,
|
||||
}
|
||||
|
||||
impl NotificationsIn {
|
||||
/// Builds a new potential upgrade.
|
||||
pub fn new(protocol_name: impl Into<Cow<'static, str>>, max_notification_size: u64) -> Self {
|
||||
NotificationsIn {
|
||||
protocol_name: protocol_name.into(),
|
||||
max_notification_size,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl UpgradeInfo for NotificationsIn {
|
||||
type Info = Cow<'static, [u8]>;
|
||||
type InfoIter = iter::Once<Self::Info>;
|
||||
|
||||
fn protocol_info(&self) -> Self::InfoIter {
|
||||
let bytes: Cow<'static, [u8]> = match &self.protocol_name {
|
||||
Cow::Borrowed(s) => Cow::Borrowed(s.as_bytes()),
|
||||
Cow::Owned(s) => Cow::Owned(s.as_bytes().to_vec())
|
||||
};
|
||||
iter::once(bytes)
|
||||
}
|
||||
}
|
||||
|
||||
impl<TSubstream> InboundUpgrade<TSubstream> for NotificationsIn
|
||||
where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + 'static,
|
||||
{
|
||||
type Output = (Vec<u8>, NotificationsInSubstream<TSubstream>);
|
||||
type Future = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>;
|
||||
type Error = NotificationsHandshakeError;
|
||||
|
||||
fn upgrade_inbound(
|
||||
self,
|
||||
mut socket: TSubstream,
|
||||
_: Self::Info,
|
||||
) -> Self::Future {
|
||||
Box::pin(async move {
|
||||
let initial_message_len = unsigned_varint::aio::read_usize(&mut socket).await?;
|
||||
if initial_message_len > MAX_HANDSHAKE_SIZE {
|
||||
return Err(NotificationsHandshakeError::TooLarge {
|
||||
requested: initial_message_len,
|
||||
max: MAX_HANDSHAKE_SIZE,
|
||||
});
|
||||
}
|
||||
|
||||
let mut initial_message = vec![0u8; initial_message_len];
|
||||
if !initial_message.is_empty() {
|
||||
socket.read_exact(&mut initial_message).await?;
|
||||
}
|
||||
|
||||
let mut codec = UviBytes::default();
|
||||
codec.set_max_len(usize::try_from(self.max_notification_size).unwrap_or(usize::max_value()));
|
||||
|
||||
let substream = NotificationsInSubstream {
|
||||
socket: Framed::new(socket, codec),
|
||||
handshake: NotificationsInSubstreamHandshake::NotSent,
|
||||
};
|
||||
|
||||
Ok((initial_message, substream))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl<TSubstream> NotificationsInSubstream<TSubstream>
|
||||
where TSubstream: AsyncRead + AsyncWrite + Unpin,
|
||||
{
|
||||
/// Sends the handshake in order to inform the remote that we accept the substream.
|
||||
pub fn send_handshake(&mut self, message: impl Into<Vec<u8>>) {
|
||||
if !matches!(self.handshake, NotificationsInSubstreamHandshake::NotSent) {
|
||||
error!(target: "sub-libp2p", "Tried to send handshake twice");
|
||||
return;
|
||||
}
|
||||
|
||||
self.handshake = NotificationsInSubstreamHandshake::PendingSend(message.into());
|
||||
}
|
||||
|
||||
/// Equivalent to `Stream::poll_next`, except that it only drives the handshake and is
|
||||
/// guaranteed to not generate any notification.
|
||||
pub fn poll_process(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<Infallible, io::Error>> {
|
||||
let mut this = self.project();
|
||||
|
||||
loop {
|
||||
match mem::replace(this.handshake, NotificationsInSubstreamHandshake::Sent) {
|
||||
NotificationsInSubstreamHandshake::PendingSend(msg) =>
|
||||
match Sink::poll_ready(this.socket.as_mut(), cx) {
|
||||
Poll::Ready(_) => {
|
||||
*this.handshake = NotificationsInSubstreamHandshake::Flush;
|
||||
match Sink::start_send(this.socket.as_mut(), io::Cursor::new(msg)) {
|
||||
Ok(()) => {},
|
||||
Err(err) => return Poll::Ready(Err(err)),
|
||||
}
|
||||
},
|
||||
Poll::Pending => {
|
||||
*this.handshake = NotificationsInSubstreamHandshake::PendingSend(msg);
|
||||
return Poll::Pending
|
||||
}
|
||||
},
|
||||
NotificationsInSubstreamHandshake::Flush =>
|
||||
match Sink::poll_flush(this.socket.as_mut(), cx)? {
|
||||
Poll::Ready(()) =>
|
||||
*this.handshake = NotificationsInSubstreamHandshake::Sent,
|
||||
Poll::Pending => {
|
||||
*this.handshake = NotificationsInSubstreamHandshake::Flush;
|
||||
return Poll::Pending
|
||||
}
|
||||
},
|
||||
|
||||
st @ NotificationsInSubstreamHandshake::NotSent |
|
||||
st @ NotificationsInSubstreamHandshake::Sent |
|
||||
st @ NotificationsInSubstreamHandshake::ClosingInResponseToRemote |
|
||||
st @ NotificationsInSubstreamHandshake::BothSidesClosed => {
|
||||
*this.handshake = st;
|
||||
return Poll::Pending;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<TSubstream> Stream for NotificationsInSubstream<TSubstream>
|
||||
where TSubstream: AsyncRead + AsyncWrite + Unpin,
|
||||
{
|
||||
type Item = Result<BytesMut, io::Error>;
|
||||
|
||||
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||
let mut this = self.project();
|
||||
|
||||
// This `Stream` implementation first tries to send back the handshake if necessary.
|
||||
loop {
|
||||
match mem::replace(this.handshake, NotificationsInSubstreamHandshake::Sent) {
|
||||
NotificationsInSubstreamHandshake::NotSent => {
|
||||
*this.handshake = NotificationsInSubstreamHandshake::NotSent;
|
||||
return Poll::Pending
|
||||
},
|
||||
NotificationsInSubstreamHandshake::PendingSend(msg) =>
|
||||
match Sink::poll_ready(this.socket.as_mut(), cx) {
|
||||
Poll::Ready(_) => {
|
||||
*this.handshake = NotificationsInSubstreamHandshake::Flush;
|
||||
match Sink::start_send(this.socket.as_mut(), io::Cursor::new(msg)) {
|
||||
Ok(()) => {},
|
||||
Err(err) => return Poll::Ready(Some(Err(err))),
|
||||
}
|
||||
},
|
||||
Poll::Pending => {
|
||||
*this.handshake = NotificationsInSubstreamHandshake::PendingSend(msg);
|
||||
return Poll::Pending
|
||||
}
|
||||
},
|
||||
NotificationsInSubstreamHandshake::Flush =>
|
||||
match Sink::poll_flush(this.socket.as_mut(), cx)? {
|
||||
Poll::Ready(()) =>
|
||||
*this.handshake = NotificationsInSubstreamHandshake::Sent,
|
||||
Poll::Pending => {
|
||||
*this.handshake = NotificationsInSubstreamHandshake::Flush;
|
||||
return Poll::Pending
|
||||
}
|
||||
},
|
||||
|
||||
NotificationsInSubstreamHandshake::Sent => {
|
||||
match Stream::poll_next(this.socket.as_mut(), cx) {
|
||||
Poll::Ready(None) => *this.handshake =
|
||||
NotificationsInSubstreamHandshake::ClosingInResponseToRemote,
|
||||
Poll::Ready(Some(msg)) => {
|
||||
*this.handshake = NotificationsInSubstreamHandshake::Sent;
|
||||
return Poll::Ready(Some(msg))
|
||||
},
|
||||
Poll::Pending => {
|
||||
*this.handshake = NotificationsInSubstreamHandshake::Sent;
|
||||
return Poll::Pending
|
||||
},
|
||||
}
|
||||
},
|
||||
|
||||
NotificationsInSubstreamHandshake::ClosingInResponseToRemote =>
|
||||
match Sink::poll_close(this.socket.as_mut(), cx)? {
|
||||
Poll::Ready(()) =>
|
||||
*this.handshake = NotificationsInSubstreamHandshake::BothSidesClosed,
|
||||
Poll::Pending => {
|
||||
*this.handshake = NotificationsInSubstreamHandshake::ClosingInResponseToRemote;
|
||||
return Poll::Pending
|
||||
}
|
||||
},
|
||||
|
||||
NotificationsInSubstreamHandshake::BothSidesClosed =>
|
||||
return Poll::Ready(None),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl NotificationsOut {
|
||||
/// Builds a new potential upgrade.
|
||||
pub fn new(
|
||||
protocol_name: impl Into<Cow<'static, str>>,
|
||||
initial_message: impl Into<Vec<u8>>,
|
||||
max_notification_size: u64,
|
||||
) -> Self {
|
||||
let initial_message = initial_message.into();
|
||||
if initial_message.len() > MAX_HANDSHAKE_SIZE {
|
||||
error!(target: "sub-libp2p", "Outbound networking handshake is above allowed protocol limit");
|
||||
}
|
||||
|
||||
NotificationsOut {
|
||||
protocol_name: protocol_name.into(),
|
||||
initial_message,
|
||||
max_notification_size,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl UpgradeInfo for NotificationsOut {
|
||||
type Info = Cow<'static, [u8]>;
|
||||
type InfoIter = iter::Once<Self::Info>;
|
||||
|
||||
fn protocol_info(&self) -> Self::InfoIter {
|
||||
let bytes: Cow<'static, [u8]> = match &self.protocol_name {
|
||||
Cow::Borrowed(s) => Cow::Borrowed(s.as_bytes()),
|
||||
Cow::Owned(s) => Cow::Owned(s.as_bytes().to_vec())
|
||||
};
|
||||
iter::once(bytes)
|
||||
}
|
||||
}
|
||||
|
||||
impl<TSubstream> OutboundUpgrade<TSubstream> for NotificationsOut
|
||||
where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + 'static,
|
||||
{
|
||||
type Output = (Vec<u8>, NotificationsOutSubstream<TSubstream>);
|
||||
type Future = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>;
|
||||
type Error = NotificationsHandshakeError;
|
||||
|
||||
fn upgrade_outbound(
|
||||
self,
|
||||
mut socket: TSubstream,
|
||||
_: Self::Info,
|
||||
) -> Self::Future {
|
||||
Box::pin(async move {
|
||||
upgrade::write_with_len_prefix(&mut socket, &self.initial_message).await?;
|
||||
|
||||
// Reading handshake.
|
||||
let handshake_len = unsigned_varint::aio::read_usize(&mut socket).await?;
|
||||
if handshake_len > MAX_HANDSHAKE_SIZE {
|
||||
return Err(NotificationsHandshakeError::TooLarge {
|
||||
requested: handshake_len,
|
||||
max: MAX_HANDSHAKE_SIZE,
|
||||
});
|
||||
}
|
||||
|
||||
let mut handshake = vec![0u8; handshake_len];
|
||||
if !handshake.is_empty() {
|
||||
socket.read_exact(&mut handshake).await?;
|
||||
}
|
||||
|
||||
let mut codec = UviBytes::default();
|
||||
codec.set_max_len(usize::try_from(self.max_notification_size).unwrap_or(usize::max_value()));
|
||||
|
||||
Ok((handshake, NotificationsOutSubstream {
|
||||
socket: Framed::new(socket, codec),
|
||||
}))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl<TSubstream> Sink<Vec<u8>> for NotificationsOutSubstream<TSubstream>
|
||||
where TSubstream: AsyncRead + AsyncWrite + Unpin,
|
||||
{
|
||||
type Error = NotificationsOutError;
|
||||
|
||||
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
|
||||
let mut this = self.project();
|
||||
Sink::poll_ready(this.socket.as_mut(), cx)
|
||||
.map_err(NotificationsOutError::Io)
|
||||
}
|
||||
|
||||
fn start_send(self: Pin<&mut Self>, item: Vec<u8>) -> Result<(), Self::Error> {
|
||||
let mut this = self.project();
|
||||
Sink::start_send(this.socket.as_mut(), io::Cursor::new(item))
|
||||
.map_err(NotificationsOutError::Io)
|
||||
}
|
||||
|
||||
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
|
||||
let mut this = self.project();
|
||||
Sink::poll_flush(this.socket.as_mut(), cx)
|
||||
.map_err(NotificationsOutError::Io)
|
||||
}
|
||||
|
||||
fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
|
||||
let mut this = self.project();
|
||||
Sink::poll_close(this.socket.as_mut(), cx)
|
||||
.map_err(NotificationsOutError::Io)
|
||||
}
|
||||
}
|
||||
|
||||
/// Error generated by sending on a notifications out substream.
|
||||
#[derive(Debug, derive_more::From, derive_more::Display)]
|
||||
pub enum NotificationsHandshakeError {
|
||||
/// I/O error on the substream.
|
||||
Io(io::Error),
|
||||
|
||||
/// Initial message or handshake was too large.
|
||||
#[display(fmt = "Initial message or handshake was too large: {}", requested)]
|
||||
TooLarge {
|
||||
/// Size requested by the remote.
|
||||
requested: usize,
|
||||
/// Maximum allowed,
|
||||
max: usize,
|
||||
},
|
||||
|
||||
/// Error while decoding the variable-length integer.
|
||||
VarintDecode(unsigned_varint::decode::Error),
|
||||
}
|
||||
|
||||
impl From<unsigned_varint::io::ReadError> for NotificationsHandshakeError {
|
||||
fn from(err: unsigned_varint::io::ReadError) -> Self {
|
||||
match err {
|
||||
unsigned_varint::io::ReadError::Io(err) => NotificationsHandshakeError::Io(err),
|
||||
unsigned_varint::io::ReadError::Decode(err) => NotificationsHandshakeError::VarintDecode(err),
|
||||
_ => {
|
||||
log::warn!("Unrecognized varint decoding error");
|
||||
NotificationsHandshakeError::Io(From::from(io::ErrorKind::InvalidData))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Error generated by sending on a notifications out substream.
|
||||
#[derive(Debug, derive_more::From, derive_more::Display)]
|
||||
pub enum NotificationsOutError {
|
||||
/// I/O error on the substream.
|
||||
Io(io::Error),
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::{NotificationsIn, NotificationsOut};
|
||||
|
||||
use async_std::net::{TcpListener, TcpStream};
|
||||
use futures::{prelude::*, channel::oneshot};
|
||||
use libp2p::core::upgrade;
|
||||
use std::borrow::Cow;
|
||||
|
||||
#[test]
|
||||
fn basic_works() {
|
||||
const PROTO_NAME: Cow<'static, str> = Cow::Borrowed("/test/proto/1");
|
||||
let (listener_addr_tx, listener_addr_rx) = oneshot::channel();
|
||||
|
||||
let client = async_std::task::spawn(async move {
|
||||
let socket = TcpStream::connect(listener_addr_rx.await.unwrap()).await.unwrap();
|
||||
let (handshake, mut substream) = upgrade::apply_outbound(
|
||||
socket,
|
||||
NotificationsOut::new(PROTO_NAME, &b"initial message"[..], 1024 * 1024),
|
||||
upgrade::Version::V1
|
||||
).await.unwrap();
|
||||
|
||||
assert_eq!(handshake, b"hello world");
|
||||
substream.send(b"test message".to_vec()).await.unwrap();
|
||||
});
|
||||
|
||||
async_std::task::block_on(async move {
|
||||
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
|
||||
listener_addr_tx.send(listener.local_addr().unwrap()).unwrap();
|
||||
|
||||
let (socket, _) = listener.accept().await.unwrap();
|
||||
let (initial_message, mut substream) = upgrade::apply_inbound(
|
||||
socket,
|
||||
NotificationsIn::new(PROTO_NAME, 1024 * 1024)
|
||||
).await.unwrap();
|
||||
|
||||
assert_eq!(initial_message, b"initial message");
|
||||
substream.send_handshake(&b"hello world"[..]);
|
||||
|
||||
let msg = substream.next().await.unwrap().unwrap();
|
||||
assert_eq!(msg.as_ref(), b"test message");
|
||||
});
|
||||
|
||||
async_std::task::block_on(client);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn empty_handshake() {
|
||||
// Check that everything still works when the handshake messages are empty.
|
||||
|
||||
const PROTO_NAME: Cow<'static, str> = Cow::Borrowed("/test/proto/1");
|
||||
let (listener_addr_tx, listener_addr_rx) = oneshot::channel();
|
||||
|
||||
let client = async_std::task::spawn(async move {
|
||||
let socket = TcpStream::connect(listener_addr_rx.await.unwrap()).await.unwrap();
|
||||
let (handshake, mut substream) = upgrade::apply_outbound(
|
||||
socket,
|
||||
NotificationsOut::new(PROTO_NAME, vec![], 1024 * 1024),
|
||||
upgrade::Version::V1
|
||||
).await.unwrap();
|
||||
|
||||
assert!(handshake.is_empty());
|
||||
substream.send(Default::default()).await.unwrap();
|
||||
});
|
||||
|
||||
async_std::task::block_on(async move {
|
||||
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
|
||||
listener_addr_tx.send(listener.local_addr().unwrap()).unwrap();
|
||||
|
||||
let (socket, _) = listener.accept().await.unwrap();
|
||||
let (initial_message, mut substream) = upgrade::apply_inbound(
|
||||
socket,
|
||||
NotificationsIn::new(PROTO_NAME, 1024 * 1024)
|
||||
).await.unwrap();
|
||||
|
||||
assert!(initial_message.is_empty());
|
||||
substream.send_handshake(vec![]);
|
||||
|
||||
let msg = substream.next().await.unwrap().unwrap();
|
||||
assert!(msg.as_ref().is_empty());
|
||||
});
|
||||
|
||||
async_std::task::block_on(client);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn refused() {
|
||||
const PROTO_NAME: Cow<'static, str> = Cow::Borrowed("/test/proto/1");
|
||||
let (listener_addr_tx, listener_addr_rx) = oneshot::channel();
|
||||
|
||||
let client = async_std::task::spawn(async move {
|
||||
let socket = TcpStream::connect(listener_addr_rx.await.unwrap()).await.unwrap();
|
||||
let outcome = upgrade::apply_outbound(
|
||||
socket,
|
||||
NotificationsOut::new(PROTO_NAME, &b"hello"[..], 1024 * 1024),
|
||||
upgrade::Version::V1
|
||||
).await;
|
||||
|
||||
// Despite the protocol negotiation being successfully conducted on the listener
|
||||
// side, we have to receive an error here because the listener didn't send the
|
||||
// handshake.
|
||||
assert!(outcome.is_err());
|
||||
});
|
||||
|
||||
async_std::task::block_on(async move {
|
||||
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
|
||||
listener_addr_tx.send(listener.local_addr().unwrap()).unwrap();
|
||||
|
||||
let (socket, _) = listener.accept().await.unwrap();
|
||||
let (initial_msg, substream) = upgrade::apply_inbound(
|
||||
socket,
|
||||
NotificationsIn::new(PROTO_NAME, 1024 * 1024)
|
||||
).await.unwrap();
|
||||
|
||||
assert_eq!(initial_msg, b"hello");
|
||||
|
||||
// We successfully upgrade to the protocol, but then close the substream.
|
||||
drop(substream);
|
||||
});
|
||||
|
||||
async_std::task::block_on(client);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn large_initial_message_refused() {
|
||||
const PROTO_NAME: Cow<'static, str> = Cow::Borrowed("/test/proto/1");
|
||||
let (listener_addr_tx, listener_addr_rx) = oneshot::channel();
|
||||
|
||||
let client = async_std::task::spawn(async move {
|
||||
let socket = TcpStream::connect(listener_addr_rx.await.unwrap()).await.unwrap();
|
||||
let ret = upgrade::apply_outbound(
|
||||
socket,
|
||||
// We check that an initial message that is too large gets refused.
|
||||
NotificationsOut::new(PROTO_NAME, (0..32768).map(|_| 0).collect::<Vec<_>>(), 1024 * 1024),
|
||||
upgrade::Version::V1
|
||||
).await;
|
||||
assert!(ret.is_err());
|
||||
});
|
||||
|
||||
async_std::task::block_on(async move {
|
||||
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
|
||||
listener_addr_tx.send(listener.local_addr().unwrap()).unwrap();
|
||||
|
||||
let (socket, _) = listener.accept().await.unwrap();
|
||||
let ret = upgrade::apply_inbound(
|
||||
socket,
|
||||
NotificationsIn::new(PROTO_NAME, 1024 * 1024)
|
||||
).await;
|
||||
assert!(ret.is_err());
|
||||
});
|
||||
|
||||
async_std::task::block_on(client);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn large_handshake_refused() {
|
||||
const PROTO_NAME: Cow<'static, str> = Cow::Borrowed("/test/proto/1");
|
||||
let (listener_addr_tx, listener_addr_rx) = oneshot::channel();
|
||||
|
||||
let client = async_std::task::spawn(async move {
|
||||
let socket = TcpStream::connect(listener_addr_rx.await.unwrap()).await.unwrap();
|
||||
let ret = upgrade::apply_outbound(
|
||||
socket,
|
||||
NotificationsOut::new(PROTO_NAME, &b"initial message"[..], 1024 * 1024),
|
||||
upgrade::Version::V1
|
||||
).await;
|
||||
assert!(ret.is_err());
|
||||
});
|
||||
|
||||
async_std::task::block_on(async move {
|
||||
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
|
||||
listener_addr_tx.send(listener.local_addr().unwrap()).unwrap();
|
||||
|
||||
let (socket, _) = listener.accept().await.unwrap();
|
||||
let (initial_message, mut substream) = upgrade::apply_inbound(
|
||||
socket,
|
||||
NotificationsIn::new(PROTO_NAME, 1024 * 1024)
|
||||
).await.unwrap();
|
||||
assert_eq!(initial_message, b"initial message");
|
||||
|
||||
// We check that a handshake that is too large gets refused.
|
||||
substream.send_handshake((0..32768).map(|_| 0).collect::<Vec<_>>());
|
||||
let _ = substream.next().await;
|
||||
});
|
||||
|
||||
async_std::task::block_on(client);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user