mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-04-26 11:07:56 +00:00
Move legacy_proto to protocol (#4296)
* Move legacy_proto to protocol * Edition 2018ize legacy_proto * Some basic documentation
This commit is contained in:
committed by
Gavin Wood
parent
e126ca9b2c
commit
2231c06294
@@ -0,0 +1,28 @@
|
||||
// Copyright 2019 Parity Technologies (UK) Ltd.
|
||||
// This file is part of Substrate.
|
||||
|
||||
// Substrate is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// Substrate is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
//! Implementation of libp2p's `NetworkBehaviour` trait that opens a single substream with the
|
||||
//! remote and then allows any communication with them.
|
||||
//!
|
||||
//! The `Protocol` struct uses `LegacyProto` in order to open substreams with the rest of the
|
||||
//! network, then performs the Substrate protocol handling on top.
|
||||
|
||||
pub use self::behaviour::{LegacyProto, LegacyProtoOut};
|
||||
|
||||
mod behaviour;
|
||||
mod handler;
|
||||
mod upgrade;
|
||||
mod tests;
|
||||
File diff suppressed because it is too large
Load Diff
@@ -0,0 +1,651 @@
|
||||
// Copyright 2019 Parity Technologies (UK) Ltd.
|
||||
// This file is part of Substrate.
|
||||
|
||||
// Substrate is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// Substrate is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
use super::upgrade::{RegisteredProtocol, RegisteredProtocolEvent, RegisteredProtocolSubstream};
|
||||
use bytes::BytesMut;
|
||||
use futures::prelude::*;
|
||||
use futures03::{compat::Compat, TryFutureExt as _};
|
||||
use futures_timer::Delay;
|
||||
use libp2p::core::{ConnectedPoint, PeerId, Endpoint};
|
||||
use libp2p::core::upgrade::{InboundUpgrade, OutboundUpgrade};
|
||||
use libp2p::swarm::{
|
||||
ProtocolsHandler, ProtocolsHandlerEvent,
|
||||
IntoProtocolsHandler,
|
||||
KeepAlive,
|
||||
ProtocolsHandlerUpgrErr,
|
||||
SubstreamProtocol,
|
||||
};
|
||||
use log::{debug, error};
|
||||
use smallvec::{smallvec, SmallVec};
|
||||
use std::{borrow::Cow, error, fmt, io, marker::PhantomData, mem, time::Duration};
|
||||
use tokio_io::{AsyncRead, AsyncWrite};
|
||||
|
||||
/// Implements the `IntoProtocolsHandler` trait of libp2p.
|
||||
///
|
||||
/// Every time a connection with a remote starts, an instance of this struct is created and
|
||||
/// sent to a background task dedicated to this connection. Once the connection is established,
|
||||
/// it is turned into a `CustomProtoHandler`. It then handles all communications that are specific
|
||||
/// to Substrate on that single connection.
|
||||
///
|
||||
/// Note that there can be multiple instance of this struct simultaneously for same peer. However
|
||||
/// if that happens, only one main instance can communicate with the outer layers of the code. In
|
||||
/// other words, the outer layers of the code only ever see one handler.
|
||||
///
|
||||
/// ## State of the handler
|
||||
///
|
||||
/// There are six possible states for the handler:
|
||||
///
|
||||
/// - Enabled and open, which is a normal operation.
|
||||
/// - Enabled and closed, in which case it will try to open substreams.
|
||||
/// - Disabled and open, in which case it will try to close substreams.
|
||||
/// - Disabled and closed, in which case the handler is idle. The connection will be
|
||||
/// garbage-collected after a few seconds if nothing more happens.
|
||||
/// - Initializing and open.
|
||||
/// - Initializing and closed, which is the state the handler starts in.
|
||||
///
|
||||
/// The Init/Enabled/Disabled state is entirely controlled by the user by sending `Enable` or
|
||||
/// `Disable` messages to the handler. The handler itself never transitions automatically between
|
||||
/// these states. For example, if the handler reports a network misbehaviour, it will close the
|
||||
/// substreams but it is the role of the user to send a `Disabled` event if it wants the connection
|
||||
/// to close. Otherwise, the handler will try to reopen substreams.
|
||||
/// The handler starts in the "Initializing" state and must be transitionned to Enabled or Disabled
|
||||
/// as soon as possible.
|
||||
///
|
||||
/// The Open/Closed state is decided by the handler and is reported with the `CustomProtocolOpen`
|
||||
/// and `CustomProtocolClosed` events. The `CustomMessage` event can only be generated if the
|
||||
/// handler is open.
|
||||
///
|
||||
/// ## How it works
|
||||
///
|
||||
/// When the handler is created, it is initially in the `Init` state and waits for either a
|
||||
/// `Disable` or an `Enable` message from the outer layer. At any time, the outer layer is free to
|
||||
/// toggle the handler between the disabled and enabled states.
|
||||
///
|
||||
/// When the handler switches to "enabled", it opens a substream and negotiates the protocol named
|
||||
/// `/substrate/xxx`, where `xxx` is chosen by the user and depends on the chain.
|
||||
///
|
||||
/// For backwards compatibility reasons, when we switch to "enabled" for the first time (while we
|
||||
/// are still in "init" mode) and we are the connection listener, we don't open a substream.
|
||||
///
|
||||
/// In order the handle the situation where both the remote and us get enabled at the same time,
|
||||
/// we tolerate multiple substreams open at the same time. Messages are transmitted on an arbitrary
|
||||
/// substream. The endpoints don't try to agree on a single substream.
|
||||
///
|
||||
/// We consider that we are now "closed" if the remote closes all the existing substreams.
|
||||
/// Re-opening it can then be performed by closing all active substream and re-opening one.
|
||||
///
|
||||
pub struct CustomProtoHandlerProto<TSubstream> {
|
||||
/// Configuration for the protocol upgrade to negotiate.
|
||||
protocol: RegisteredProtocol,
|
||||
|
||||
/// Marker to pin the generic type.
|
||||
marker: PhantomData<TSubstream>,
|
||||
}
|
||||
|
||||
impl<TSubstream> CustomProtoHandlerProto<TSubstream>
|
||||
where
|
||||
TSubstream: AsyncRead + AsyncWrite,
|
||||
{
|
||||
/// Builds a new `CustomProtoHandlerProto`.
|
||||
pub fn new(protocol: RegisteredProtocol) -> Self {
|
||||
CustomProtoHandlerProto {
|
||||
protocol,
|
||||
marker: PhantomData,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<TSubstream> IntoProtocolsHandler for CustomProtoHandlerProto<TSubstream>
|
||||
where
|
||||
TSubstream: AsyncRead + AsyncWrite,
|
||||
{
|
||||
type Handler = CustomProtoHandler<TSubstream>;
|
||||
|
||||
fn inbound_protocol(&self) -> RegisteredProtocol {
|
||||
self.protocol.clone()
|
||||
}
|
||||
|
||||
fn into_handler(self, remote_peer_id: &PeerId, connected_point: &ConnectedPoint) -> Self::Handler {
|
||||
CustomProtoHandler {
|
||||
protocol: self.protocol,
|
||||
endpoint: connected_point.to_endpoint(),
|
||||
remote_peer_id: remote_peer_id.clone(),
|
||||
state: ProtocolState::Init {
|
||||
substreams: SmallVec::new(),
|
||||
init_deadline: Delay::new(Duration::from_secs(5)).compat()
|
||||
},
|
||||
events_queue: SmallVec::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// The actual handler once the connection has been established.
|
||||
pub struct CustomProtoHandler<TSubstream> {
|
||||
/// Configuration for the protocol upgrade to negotiate.
|
||||
protocol: RegisteredProtocol,
|
||||
|
||||
/// State of the communications with the remote.
|
||||
state: ProtocolState<TSubstream>,
|
||||
|
||||
/// Identifier of the node we're talking to. Used only for logging purposes and shouldn't have
|
||||
/// any influence on the behaviour.
|
||||
remote_peer_id: PeerId,
|
||||
|
||||
/// Whether we are the connection dialer or listener. Used to determine who, between the local
|
||||
/// node and the remote node, has priority.
|
||||
endpoint: Endpoint,
|
||||
|
||||
/// Queue of events to send to the outside.
|
||||
///
|
||||
/// This queue must only ever be modified to insert elements at the back, or remove the first
|
||||
/// element.
|
||||
events_queue: SmallVec<[ProtocolsHandlerEvent<RegisteredProtocol, (), CustomProtoHandlerOut>; 16]>,
|
||||
}
|
||||
|
||||
/// State of the handler.
|
||||
enum ProtocolState<TSubstream> {
|
||||
/// Waiting for the behaviour to tell the handler whether it is enabled or disabled.
|
||||
Init {
|
||||
/// List of substreams opened by the remote but that haven't been processed yet.
|
||||
substreams: SmallVec<[RegisteredProtocolSubstream<TSubstream>; 6]>,
|
||||
/// Deadline after which the initialization is abnormally long.
|
||||
init_deadline: Compat<Delay>,
|
||||
},
|
||||
|
||||
/// Handler is opening a substream in order to activate itself.
|
||||
/// If we are in this state, we haven't sent any `CustomProtocolOpen` yet.
|
||||
Opening {
|
||||
/// Deadline after which the opening is abnormally long.
|
||||
deadline: Compat<Delay>,
|
||||
},
|
||||
|
||||
/// Normal operating mode. Contains the substreams that are open.
|
||||
/// If we are in this state, we have sent a `CustomProtocolOpen` message to the outside.
|
||||
Normal {
|
||||
/// The substreams where bidirectional communications happen.
|
||||
substreams: SmallVec<[RegisteredProtocolSubstream<TSubstream>; 4]>,
|
||||
/// Contains substreams which are being shut down.
|
||||
shutdown: SmallVec<[RegisteredProtocolSubstream<TSubstream>; 4]>,
|
||||
},
|
||||
|
||||
/// We are disabled. Contains substreams that are being closed.
|
||||
/// If we are in this state, either we have sent a `CustomProtocolClosed` message to the
|
||||
/// outside or we have never sent any `CustomProtocolOpen` in the first place.
|
||||
Disabled {
|
||||
/// List of substreams to shut down.
|
||||
shutdown: SmallVec<[RegisteredProtocolSubstream<TSubstream>; 6]>,
|
||||
|
||||
/// If true, we should reactivate the handler after all the substreams in `shutdown` have
|
||||
/// been closed.
|
||||
///
|
||||
/// Since we don't want to mix old and new substreams, we wait for all old substreams to
|
||||
/// be closed before opening any new one.
|
||||
reenable: bool,
|
||||
},
|
||||
|
||||
/// In this state, we don't care about anything anymore and need to kill the connection as soon
|
||||
/// as possible.
|
||||
KillAsap,
|
||||
|
||||
/// We sometimes temporarily switch to this state during processing. If we are in this state
|
||||
/// at the beginning of a method, that means something bad happened in the source code.
|
||||
Poisoned,
|
||||
}
|
||||
|
||||
/// Event that can be received by a `CustomProtoHandler`.
|
||||
#[derive(Debug)]
|
||||
pub enum CustomProtoHandlerIn {
|
||||
/// The node should start using custom protocols.
|
||||
Enable,
|
||||
|
||||
/// The node should stop using custom protocols.
|
||||
Disable,
|
||||
|
||||
/// Sends a message through a custom protocol substream.
|
||||
SendCustomMessage {
|
||||
/// The message to send.
|
||||
message: Vec<u8>,
|
||||
},
|
||||
}
|
||||
|
||||
/// Event that can be emitted by a `CustomProtoHandler`.
|
||||
#[derive(Debug)]
|
||||
pub enum CustomProtoHandlerOut {
|
||||
/// Opened a custom protocol with the remote.
|
||||
CustomProtocolOpen {
|
||||
/// Version of the protocol that has been opened.
|
||||
version: u8,
|
||||
},
|
||||
|
||||
/// Closed a custom protocol with the remote.
|
||||
CustomProtocolClosed {
|
||||
/// Reason why the substream closed, for diagnostic purposes.
|
||||
reason: Cow<'static, str>,
|
||||
},
|
||||
|
||||
/// Receives a message on a custom protocol substream.
|
||||
CustomMessage {
|
||||
/// Message that has been received.
|
||||
message: BytesMut,
|
||||
},
|
||||
|
||||
/// A substream to the remote is clogged. The send buffer is very large, and we should print
|
||||
/// a diagnostic message and/or avoid sending more data.
|
||||
Clogged {
|
||||
/// Copy of the messages that are within the buffer, for further diagnostic.
|
||||
messages: Vec<Vec<u8>>,
|
||||
},
|
||||
|
||||
/// An error has happened on the protocol level with this node.
|
||||
ProtocolError {
|
||||
/// If true the error is severe, such as a protocol violation.
|
||||
is_severe: bool,
|
||||
/// The error that happened.
|
||||
error: Box<dyn error::Error + Send + Sync>,
|
||||
},
|
||||
}
|
||||
|
||||
impl<TSubstream> CustomProtoHandler<TSubstream>
|
||||
where
|
||||
TSubstream: AsyncRead + AsyncWrite,
|
||||
{
|
||||
/// Enables the handler.
|
||||
fn enable(&mut self) {
|
||||
self.state = match mem::replace(&mut self.state, ProtocolState::Poisoned) {
|
||||
ProtocolState::Poisoned => {
|
||||
error!(target: "sub-libp2p", "Handler with {:?} is in poisoned state",
|
||||
self.remote_peer_id);
|
||||
ProtocolState::Poisoned
|
||||
}
|
||||
|
||||
ProtocolState::Init { substreams: incoming, .. } => {
|
||||
if incoming.is_empty() {
|
||||
if let Endpoint::Dialer = self.endpoint {
|
||||
self.events_queue.push(ProtocolsHandlerEvent::OutboundSubstreamRequest {
|
||||
protocol: SubstreamProtocol::new(self.protocol.clone()),
|
||||
info: (),
|
||||
});
|
||||
}
|
||||
ProtocolState::Opening {
|
||||
deadline: Delay::new(Duration::from_secs(60)).compat()
|
||||
}
|
||||
|
||||
} else {
|
||||
let event = CustomProtoHandlerOut::CustomProtocolOpen {
|
||||
version: incoming[0].protocol_version()
|
||||
};
|
||||
self.events_queue.push(ProtocolsHandlerEvent::Custom(event));
|
||||
ProtocolState::Normal {
|
||||
substreams: incoming.into_iter().collect(),
|
||||
shutdown: SmallVec::new()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
st @ ProtocolState::KillAsap => st,
|
||||
st @ ProtocolState::Opening { .. } => st,
|
||||
st @ ProtocolState::Normal { .. } => st,
|
||||
ProtocolState::Disabled { shutdown, .. } => {
|
||||
ProtocolState::Disabled { shutdown, reenable: true }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Disables the handler.
|
||||
fn disable(&mut self) {
|
||||
self.state = match mem::replace(&mut self.state, ProtocolState::Poisoned) {
|
||||
ProtocolState::Poisoned => {
|
||||
error!(target: "sub-libp2p", "Handler with {:?} is in poisoned state",
|
||||
self.remote_peer_id);
|
||||
ProtocolState::Poisoned
|
||||
}
|
||||
|
||||
ProtocolState::Init { substreams: mut shutdown, .. } => {
|
||||
for s in &mut shutdown {
|
||||
s.shutdown();
|
||||
}
|
||||
ProtocolState::Disabled { shutdown, reenable: false }
|
||||
}
|
||||
|
||||
ProtocolState::Opening { .. } | ProtocolState::Normal { .. } =>
|
||||
// At the moment, if we get disabled while things were working, we kill the entire
|
||||
// connection in order to force a reset of the state.
|
||||
// This is obviously an extremely shameful way to do things, but at the time of
|
||||
// the writing of this comment, the networking works very poorly and a solution
|
||||
// needs to be found.
|
||||
ProtocolState::KillAsap,
|
||||
|
||||
ProtocolState::Disabled { shutdown, .. } =>
|
||||
ProtocolState::Disabled { shutdown, reenable: false },
|
||||
|
||||
ProtocolState::KillAsap => ProtocolState::KillAsap,
|
||||
};
|
||||
}
|
||||
|
||||
/// Polls the state for events. Optionally returns an event to produce.
|
||||
#[must_use]
|
||||
fn poll_state(&mut self)
|
||||
-> Option<ProtocolsHandlerEvent<RegisteredProtocol, (), CustomProtoHandlerOut>> {
|
||||
match mem::replace(&mut self.state, ProtocolState::Poisoned) {
|
||||
ProtocolState::Poisoned => {
|
||||
error!(target: "sub-libp2p", "Handler with {:?} is in poisoned state",
|
||||
self.remote_peer_id);
|
||||
self.state = ProtocolState::Poisoned;
|
||||
None
|
||||
}
|
||||
|
||||
ProtocolState::Init { substreams, mut init_deadline } => {
|
||||
match init_deadline.poll() {
|
||||
Ok(Async::Ready(())) => {
|
||||
init_deadline = Delay::new(Duration::from_secs(60)).compat();
|
||||
error!(target: "sub-libp2p", "Handler initialization process is too long \
|
||||
with {:?}", self.remote_peer_id)
|
||||
},
|
||||
Ok(Async::NotReady) => {}
|
||||
Err(_) => error!(target: "sub-libp2p", "Tokio timer has errored")
|
||||
}
|
||||
|
||||
self.state = ProtocolState::Init { substreams, init_deadline };
|
||||
None
|
||||
}
|
||||
|
||||
ProtocolState::Opening { mut deadline } => {
|
||||
match deadline.poll() {
|
||||
Ok(Async::Ready(())) => {
|
||||
deadline = Delay::new(Duration::from_secs(60)).compat();
|
||||
let event = CustomProtoHandlerOut::ProtocolError {
|
||||
is_severe: true,
|
||||
error: "Timeout when opening protocol".to_string().into(),
|
||||
};
|
||||
self.state = ProtocolState::Opening { deadline };
|
||||
Some(ProtocolsHandlerEvent::Custom(event))
|
||||
},
|
||||
Ok(Async::NotReady) => {
|
||||
self.state = ProtocolState::Opening { deadline };
|
||||
None
|
||||
},
|
||||
Err(_) => {
|
||||
error!(target: "sub-libp2p", "Tokio timer has errored");
|
||||
deadline = Delay::new(Duration::from_secs(60)).compat();
|
||||
self.state = ProtocolState::Opening { deadline };
|
||||
None
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
ProtocolState::Normal { mut substreams, mut shutdown } => {
|
||||
for n in (0..substreams.len()).rev() {
|
||||
let mut substream = substreams.swap_remove(n);
|
||||
match substream.poll() {
|
||||
Ok(Async::NotReady) => substreams.push(substream),
|
||||
Ok(Async::Ready(Some(RegisteredProtocolEvent::Message(message)))) => {
|
||||
let event = CustomProtoHandlerOut::CustomMessage {
|
||||
message
|
||||
};
|
||||
substreams.push(substream);
|
||||
self.state = ProtocolState::Normal { substreams, shutdown };
|
||||
return Some(ProtocolsHandlerEvent::Custom(event));
|
||||
},
|
||||
Ok(Async::Ready(Some(RegisteredProtocolEvent::Clogged { messages }))) => {
|
||||
let event = CustomProtoHandlerOut::Clogged {
|
||||
messages,
|
||||
};
|
||||
substreams.push(substream);
|
||||
self.state = ProtocolState::Normal { substreams, shutdown };
|
||||
return Some(ProtocolsHandlerEvent::Custom(event));
|
||||
}
|
||||
Ok(Async::Ready(None)) => {
|
||||
shutdown.push(substream);
|
||||
if substreams.is_empty() {
|
||||
let event = CustomProtoHandlerOut::CustomProtocolClosed {
|
||||
reason: "All substreams have been closed by the remote".into(),
|
||||
};
|
||||
self.state = ProtocolState::Disabled {
|
||||
shutdown: shutdown.into_iter().collect(),
|
||||
reenable: true
|
||||
};
|
||||
return Some(ProtocolsHandlerEvent::Custom(event));
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
if substreams.is_empty() {
|
||||
let event = CustomProtoHandlerOut::CustomProtocolClosed {
|
||||
reason: format!("Error on the last substream: {:?}", err).into(),
|
||||
};
|
||||
self.state = ProtocolState::Disabled {
|
||||
shutdown: shutdown.into_iter().collect(),
|
||||
reenable: true
|
||||
};
|
||||
return Some(ProtocolsHandlerEvent::Custom(event));
|
||||
} else {
|
||||
debug!(target: "sub-libp2p", "Error on extra substream: {:?}", err);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// This code is reached is none if and only if none of the substreams are in a ready state.
|
||||
self.state = ProtocolState::Normal { substreams, shutdown };
|
||||
None
|
||||
}
|
||||
|
||||
ProtocolState::Disabled { mut shutdown, reenable } => {
|
||||
shutdown_list(&mut shutdown);
|
||||
// If `reenable` is `true`, that means we should open the substreams system again
|
||||
// after all the substreams are closed.
|
||||
if reenable && shutdown.is_empty() {
|
||||
self.state = ProtocolState::Opening {
|
||||
deadline: Delay::new(Duration::from_secs(60)).compat()
|
||||
};
|
||||
Some(ProtocolsHandlerEvent::OutboundSubstreamRequest {
|
||||
protocol: SubstreamProtocol::new(self.protocol.clone()),
|
||||
info: (),
|
||||
})
|
||||
} else {
|
||||
self.state = ProtocolState::Disabled { shutdown, reenable };
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
ProtocolState::KillAsap => None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Called by `inject_fully_negotiated_inbound` and `inject_fully_negotiated_outbound`.
|
||||
fn inject_fully_negotiated(
|
||||
&mut self,
|
||||
mut substream: RegisteredProtocolSubstream<TSubstream>
|
||||
) {
|
||||
self.state = match mem::replace(&mut self.state, ProtocolState::Poisoned) {
|
||||
ProtocolState::Poisoned => {
|
||||
error!(target: "sub-libp2p", "Handler with {:?} is in poisoned state",
|
||||
self.remote_peer_id);
|
||||
ProtocolState::Poisoned
|
||||
}
|
||||
|
||||
ProtocolState::Init { mut substreams, init_deadline } => {
|
||||
if substream.endpoint() == Endpoint::Dialer {
|
||||
error!(target: "sub-libp2p", "Opened dialing substream with {:?} before \
|
||||
initialization", self.remote_peer_id);
|
||||
}
|
||||
substreams.push(substream);
|
||||
ProtocolState::Init { substreams, init_deadline }
|
||||
}
|
||||
|
||||
ProtocolState::Opening { .. } => {
|
||||
let event = CustomProtoHandlerOut::CustomProtocolOpen {
|
||||
version: substream.protocol_version()
|
||||
};
|
||||
self.events_queue.push(ProtocolsHandlerEvent::Custom(event));
|
||||
ProtocolState::Normal {
|
||||
substreams: smallvec![substream],
|
||||
shutdown: SmallVec::new()
|
||||
}
|
||||
}
|
||||
|
||||
ProtocolState::Normal { substreams: mut existing, shutdown } => {
|
||||
existing.push(substream);
|
||||
ProtocolState::Normal { substreams: existing, shutdown }
|
||||
}
|
||||
|
||||
ProtocolState::Disabled { mut shutdown, .. } => {
|
||||
substream.shutdown();
|
||||
shutdown.push(substream);
|
||||
ProtocolState::Disabled { shutdown, reenable: false }
|
||||
}
|
||||
|
||||
ProtocolState::KillAsap => ProtocolState::KillAsap,
|
||||
};
|
||||
}
|
||||
|
||||
/// Sends a message to the remote.
|
||||
fn send_message(&mut self, message: Vec<u8>) {
|
||||
match self.state {
|
||||
ProtocolState::Normal { ref mut substreams, .. } =>
|
||||
substreams[0].send_message(message),
|
||||
|
||||
_ => debug!(target: "sub-libp2p", "Tried to send message over closed protocol \
|
||||
with {:?}", self.remote_peer_id)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<TSubstream> ProtocolsHandler for CustomProtoHandler<TSubstream>
|
||||
where TSubstream: AsyncRead + AsyncWrite {
|
||||
type InEvent = CustomProtoHandlerIn;
|
||||
type OutEvent = CustomProtoHandlerOut;
|
||||
type Substream = TSubstream;
|
||||
type Error = ConnectionKillError;
|
||||
type InboundProtocol = RegisteredProtocol;
|
||||
type OutboundProtocol = RegisteredProtocol;
|
||||
type OutboundOpenInfo = ();
|
||||
|
||||
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol> {
|
||||
SubstreamProtocol::new(self.protocol.clone())
|
||||
}
|
||||
|
||||
fn inject_fully_negotiated_inbound(
|
||||
&mut self,
|
||||
proto: <Self::InboundProtocol as InboundUpgrade<TSubstream>>::Output
|
||||
) {
|
||||
self.inject_fully_negotiated(proto);
|
||||
}
|
||||
|
||||
fn inject_fully_negotiated_outbound(
|
||||
&mut self,
|
||||
proto: <Self::OutboundProtocol as OutboundUpgrade<TSubstream>>::Output,
|
||||
_: Self::OutboundOpenInfo
|
||||
) {
|
||||
self.inject_fully_negotiated(proto);
|
||||
}
|
||||
|
||||
fn inject_event(&mut self, message: CustomProtoHandlerIn) {
|
||||
match message {
|
||||
CustomProtoHandlerIn::Disable => self.disable(),
|
||||
CustomProtoHandlerIn::Enable => self.enable(),
|
||||
CustomProtoHandlerIn::SendCustomMessage { message } =>
|
||||
self.send_message(message),
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn inject_dial_upgrade_error(&mut self, _: (), err: ProtocolsHandlerUpgrErr<io::Error>) {
|
||||
let is_severe = match err {
|
||||
ProtocolsHandlerUpgrErr::Upgrade(_) => true,
|
||||
_ => false,
|
||||
};
|
||||
|
||||
self.events_queue.push(ProtocolsHandlerEvent::Custom(CustomProtoHandlerOut::ProtocolError {
|
||||
is_severe,
|
||||
error: Box::new(err),
|
||||
}));
|
||||
}
|
||||
|
||||
fn connection_keep_alive(&self) -> KeepAlive {
|
||||
match self.state {
|
||||
ProtocolState::Init { .. } | ProtocolState::Opening { .. } |
|
||||
ProtocolState::Normal { .. } => KeepAlive::Yes,
|
||||
ProtocolState::Disabled { .. } | ProtocolState::Poisoned |
|
||||
ProtocolState::KillAsap => KeepAlive::No,
|
||||
}
|
||||
}
|
||||
|
||||
fn poll(
|
||||
&mut self,
|
||||
) -> Poll<
|
||||
ProtocolsHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::OutEvent>,
|
||||
Self::Error,
|
||||
> {
|
||||
// Flush the events queue if necessary.
|
||||
if !self.events_queue.is_empty() {
|
||||
let event = self.events_queue.remove(0);
|
||||
return Ok(Async::Ready(event))
|
||||
}
|
||||
|
||||
// Kill the connection if needed.
|
||||
if let ProtocolState::KillAsap = self.state {
|
||||
return Err(ConnectionKillError);
|
||||
}
|
||||
|
||||
// Process all the substreams.
|
||||
if let Some(event) = self.poll_state() {
|
||||
return Ok(Async::Ready(event))
|
||||
}
|
||||
|
||||
Ok(Async::NotReady)
|
||||
}
|
||||
}
|
||||
|
||||
impl<TSubstream> fmt::Debug for CustomProtoHandler<TSubstream>
|
||||
where
|
||||
TSubstream: AsyncRead + AsyncWrite,
|
||||
{
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
|
||||
f.debug_struct("CustomProtoHandler")
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
/// Given a list of substreams, tries to shut them down. The substreams that have been successfully
|
||||
/// shut down are removed from the list.
|
||||
fn shutdown_list<TSubstream>
|
||||
(list: &mut SmallVec<impl smallvec::Array<Item = RegisteredProtocolSubstream<TSubstream>>>)
|
||||
where TSubstream: AsyncRead + AsyncWrite {
|
||||
'outer: for n in (0..list.len()).rev() {
|
||||
let mut substream = list.swap_remove(n);
|
||||
loop {
|
||||
match substream.poll() {
|
||||
Ok(Async::Ready(Some(_))) => {}
|
||||
Ok(Async::NotReady) => break,
|
||||
Err(_) | Ok(Async::Ready(None)) => continue 'outer,
|
||||
}
|
||||
}
|
||||
list.push(substream);
|
||||
}
|
||||
}
|
||||
|
||||
/// Error returned when switching from normal to disabled.
|
||||
#[derive(Debug)]
|
||||
pub struct ConnectionKillError;
|
||||
|
||||
impl error::Error for ConnectionKillError {
|
||||
}
|
||||
|
||||
impl fmt::Display for ConnectionKillError {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
write!(f, "Connection kill when switching from normal to disabled")
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,410 @@
|
||||
// Copyright 2019 Parity Technologies (UK) Ltd.
|
||||
// This file is part of Substrate.
|
||||
|
||||
// Substrate is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// Substrate is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
#![cfg(test)]
|
||||
|
||||
use futures::{future, prelude::*, try_ready};
|
||||
use codec::{Encode, Decode};
|
||||
use libp2p::core::nodes::Substream;
|
||||
use libp2p::core::{ConnectedPoint, transport::boxed::Boxed, muxing::StreamMuxerBox};
|
||||
use libp2p::swarm::{Swarm, ProtocolsHandler, IntoProtocolsHandler};
|
||||
use libp2p::swarm::{PollParameters, NetworkBehaviour, NetworkBehaviourAction};
|
||||
use libp2p::{PeerId, Multiaddr, Transport};
|
||||
use rand::seq::SliceRandom;
|
||||
use std::{io, time::Duration, time::Instant};
|
||||
use crate::message::Message;
|
||||
use crate::protocol::legacy_proto::{LegacyProto, LegacyProtoOut};
|
||||
use test_client::runtime::Block;
|
||||
|
||||
/// Builds two nodes that have each other as bootstrap nodes.
|
||||
/// This is to be used only for testing, and a panic will happen if something goes wrong.
|
||||
fn build_nodes()
|
||||
-> (
|
||||
Swarm<Boxed<(PeerId, StreamMuxerBox), io::Error>, CustomProtoWithAddr>,
|
||||
Swarm<Boxed<(PeerId, StreamMuxerBox), io::Error>, CustomProtoWithAddr>
|
||||
) {
|
||||
let mut out = Vec::with_capacity(2);
|
||||
|
||||
let keypairs: Vec<_> = (0..2).map(|_| libp2p::identity::Keypair::generate_ed25519()).collect();
|
||||
let addrs: Vec<Multiaddr> = (0..2)
|
||||
.map(|_| format!("/memory/{}", rand::random::<u64>()).parse().unwrap())
|
||||
.collect();
|
||||
|
||||
for index in 0 .. 2 {
|
||||
let keypair = keypairs[index].clone();
|
||||
let transport = libp2p::core::transport::MemoryTransport
|
||||
.and_then(move |out, endpoint| {
|
||||
let secio = libp2p::secio::SecioConfig::new(keypair);
|
||||
libp2p::core::upgrade::apply(
|
||||
out,
|
||||
secio,
|
||||
endpoint,
|
||||
libp2p::core::upgrade::Version::V1
|
||||
)
|
||||
})
|
||||
.and_then(move |(peer_id, stream), endpoint| {
|
||||
libp2p::core::upgrade::apply(
|
||||
stream,
|
||||
libp2p::yamux::Config::default(),
|
||||
endpoint,
|
||||
libp2p::core::upgrade::Version::V1
|
||||
)
|
||||
.map(|muxer| (peer_id, libp2p::core::muxing::StreamMuxerBox::new(muxer)))
|
||||
})
|
||||
.timeout(Duration::from_secs(20))
|
||||
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))
|
||||
.boxed();
|
||||
|
||||
let (peerset, _) = peerset::Peerset::from_config(peerset::PeersetConfig {
|
||||
in_peers: 25,
|
||||
out_peers: 25,
|
||||
bootnodes: if index == 0 {
|
||||
keypairs
|
||||
.iter()
|
||||
.skip(1)
|
||||
.map(|keypair| keypair.public().into_peer_id())
|
||||
.collect()
|
||||
} else {
|
||||
vec![]
|
||||
},
|
||||
reserved_only: false,
|
||||
reserved_nodes: Vec::new(),
|
||||
});
|
||||
|
||||
let behaviour = CustomProtoWithAddr {
|
||||
inner: LegacyProto::new(&b"test"[..], &[1], peerset),
|
||||
addrs: addrs
|
||||
.iter()
|
||||
.enumerate()
|
||||
.filter_map(|(n, a)| if n != index {
|
||||
Some((keypairs[n].public().into_peer_id(), a.clone()))
|
||||
} else {
|
||||
None
|
||||
})
|
||||
.collect(),
|
||||
};
|
||||
|
||||
let mut swarm = Swarm::new(
|
||||
transport,
|
||||
behaviour,
|
||||
keypairs[index].public().into_peer_id()
|
||||
);
|
||||
Swarm::listen_on(&mut swarm, addrs[index].clone()).unwrap();
|
||||
out.push(swarm);
|
||||
}
|
||||
|
||||
// Final output
|
||||
let mut out_iter = out.into_iter();
|
||||
let first = out_iter.next().unwrap();
|
||||
let second = out_iter.next().unwrap();
|
||||
(first, second)
|
||||
}
|
||||
|
||||
/// Wraps around the `CustomBehaviour` network behaviour, and adds hardcoded node addresses to it.
|
||||
struct CustomProtoWithAddr {
|
||||
inner: LegacyProto<Substream<StreamMuxerBox>>,
|
||||
addrs: Vec<(PeerId, Multiaddr)>,
|
||||
}
|
||||
|
||||
impl std::ops::Deref for CustomProtoWithAddr {
|
||||
type Target = LegacyProto<Substream<StreamMuxerBox>>;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.inner
|
||||
}
|
||||
}
|
||||
|
||||
impl std::ops::DerefMut for CustomProtoWithAddr {
|
||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||
&mut self.inner
|
||||
}
|
||||
}
|
||||
|
||||
impl NetworkBehaviour for CustomProtoWithAddr {
|
||||
type ProtocolsHandler =
|
||||
<LegacyProto<Substream<StreamMuxerBox>> as NetworkBehaviour>::ProtocolsHandler;
|
||||
type OutEvent = <LegacyProto<Substream<StreamMuxerBox>> as NetworkBehaviour>::OutEvent;
|
||||
|
||||
fn new_handler(&mut self) -> Self::ProtocolsHandler {
|
||||
self.inner.new_handler()
|
||||
}
|
||||
|
||||
fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec<Multiaddr> {
|
||||
let mut list = self.inner.addresses_of_peer(peer_id);
|
||||
for (p, a) in self.addrs.iter() {
|
||||
if p == peer_id {
|
||||
list.push(a.clone());
|
||||
}
|
||||
}
|
||||
list
|
||||
}
|
||||
|
||||
fn inject_connected(&mut self, peer_id: PeerId, endpoint: ConnectedPoint) {
|
||||
self.inner.inject_connected(peer_id, endpoint)
|
||||
}
|
||||
|
||||
fn inject_disconnected(&mut self, peer_id: &PeerId, endpoint: ConnectedPoint) {
|
||||
self.inner.inject_disconnected(peer_id, endpoint)
|
||||
}
|
||||
|
||||
fn inject_node_event(
|
||||
&mut self,
|
||||
peer_id: PeerId,
|
||||
event: <<Self::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::OutEvent
|
||||
) {
|
||||
self.inner.inject_node_event(peer_id, event)
|
||||
}
|
||||
|
||||
fn poll(
|
||||
&mut self,
|
||||
params: &mut impl PollParameters
|
||||
) -> Async<
|
||||
NetworkBehaviourAction<
|
||||
<<Self::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::InEvent,
|
||||
Self::OutEvent
|
||||
>
|
||||
> {
|
||||
self.inner.poll(params)
|
||||
}
|
||||
|
||||
fn inject_replaced(&mut self, peer_id: PeerId, closed_endpoint: ConnectedPoint, new_endpoint: ConnectedPoint) {
|
||||
self.inner.inject_replaced(peer_id, closed_endpoint, new_endpoint)
|
||||
}
|
||||
|
||||
fn inject_addr_reach_failure(&mut self, peer_id: Option<&PeerId>, addr: &Multiaddr, error: &dyn std::error::Error) {
|
||||
self.inner.inject_addr_reach_failure(peer_id, addr, error)
|
||||
}
|
||||
|
||||
fn inject_dial_failure(&mut self, peer_id: &PeerId) {
|
||||
self.inner.inject_dial_failure(peer_id)
|
||||
}
|
||||
|
||||
fn inject_new_listen_addr(&mut self, addr: &Multiaddr) {
|
||||
self.inner.inject_new_listen_addr(addr)
|
||||
}
|
||||
|
||||
fn inject_expired_listen_addr(&mut self, addr: &Multiaddr) {
|
||||
self.inner.inject_expired_listen_addr(addr)
|
||||
}
|
||||
|
||||
fn inject_new_external_addr(&mut self, addr: &Multiaddr) {
|
||||
self.inner.inject_new_external_addr(addr)
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn two_nodes_transfer_lots_of_packets() {
|
||||
// We spawn two nodes, then make the first one send lots of packets to the second one. The test
|
||||
// ends when the second one has received all of them.
|
||||
|
||||
// Note that if we go too high, we will reach the limit to the number of simultaneous
|
||||
// substreams allowed by the multiplexer.
|
||||
const NUM_PACKETS: u32 = 5000;
|
||||
|
||||
let (mut service1, mut service2) = build_nodes();
|
||||
|
||||
let fut1 = future::poll_fn(move || -> io::Result<_> {
|
||||
loop {
|
||||
match try_ready!(service1.poll()) {
|
||||
Some(LegacyProtoOut::CustomProtocolOpen { peer_id, .. }) => {
|
||||
for n in 0 .. NUM_PACKETS {
|
||||
service1.send_packet(
|
||||
&peer_id,
|
||||
Message::<Block>::ChainSpecific(vec![(n % 256) as u8]).encode()
|
||||
);
|
||||
}
|
||||
},
|
||||
_ => panic!(),
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
let mut packet_counter = 0u32;
|
||||
let fut2 = future::poll_fn(move || -> io::Result<_> {
|
||||
loop {
|
||||
match try_ready!(service2.poll()) {
|
||||
Some(LegacyProtoOut::CustomProtocolOpen { .. }) => {},
|
||||
Some(LegacyProtoOut::CustomMessage { message, .. }) => {
|
||||
match Message::<Block>::decode(&mut &message[..]).unwrap() {
|
||||
Message::<Block>::ChainSpecific(message) => {
|
||||
assert_eq!(message.len(), 1);
|
||||
packet_counter += 1;
|
||||
if packet_counter == NUM_PACKETS {
|
||||
return Ok(Async::Ready(()))
|
||||
}
|
||||
},
|
||||
_ => panic!(),
|
||||
}
|
||||
}
|
||||
_ => panic!(),
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
let combined = fut1.select(fut2).map_err(|(err, _)| err);
|
||||
let _ = tokio::runtime::Runtime::new().unwrap().block_on(combined).unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn basic_two_nodes_requests_in_parallel() {
|
||||
let (mut service1, mut service2) = build_nodes();
|
||||
|
||||
// Generate random messages with or without a request id.
|
||||
let mut to_send = {
|
||||
let mut to_send = Vec::new();
|
||||
for _ in 0..200 { // Note: don't make that number too high or the CPU usage will explode.
|
||||
let msg = (0..10).map(|_| rand::random::<u8>()).collect::<Vec<_>>();
|
||||
to_send.push(Message::<Block>::ChainSpecific(msg));
|
||||
}
|
||||
to_send
|
||||
};
|
||||
|
||||
// Clone `to_send` in `to_receive`. Below we will remove from `to_receive` the messages we
|
||||
// receive, until the list is empty.
|
||||
let mut to_receive = to_send.clone();
|
||||
to_send.shuffle(&mut rand::thread_rng());
|
||||
|
||||
let fut1 = future::poll_fn(move || -> io::Result<_> {
|
||||
loop {
|
||||
match try_ready!(service1.poll()) {
|
||||
Some(LegacyProtoOut::CustomProtocolOpen { peer_id, .. }) => {
|
||||
for msg in to_send.drain(..) {
|
||||
service1.send_packet(&peer_id, msg.encode());
|
||||
}
|
||||
},
|
||||
_ => panic!(),
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
let fut2 = future::poll_fn(move || -> io::Result<_> {
|
||||
loop {
|
||||
match try_ready!(service2.poll()) {
|
||||
Some(LegacyProtoOut::CustomProtocolOpen { .. }) => {},
|
||||
Some(LegacyProtoOut::CustomMessage { message, .. }) => {
|
||||
let pos = to_receive.iter().position(|m| m.encode() == message).unwrap();
|
||||
to_receive.remove(pos);
|
||||
if to_receive.is_empty() {
|
||||
return Ok(Async::Ready(()))
|
||||
}
|
||||
}
|
||||
_ => panic!(),
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
let combined = fut1.select(fut2).map_err(|(err, _)| err);
|
||||
let _ = tokio::runtime::Runtime::new().unwrap().block_on_all(combined).unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn reconnect_after_disconnect() {
|
||||
// We connect two nodes together, then force a disconnect (through the API of the `Service`),
|
||||
// check that the disconnect worked, and finally check whether they successfully reconnect.
|
||||
|
||||
let (mut service1, mut service2) = build_nodes();
|
||||
|
||||
// We use the `current_thread` runtime because it doesn't require us to have `'static` futures.
|
||||
let mut runtime = tokio::runtime::current_thread::Runtime::new().unwrap();
|
||||
|
||||
// For this test, the services can be in the following states.
|
||||
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
|
||||
enum ServiceState { NotConnected, FirstConnec, Disconnected, ConnectedAgain }
|
||||
let mut service1_state = ServiceState::NotConnected;
|
||||
let mut service2_state = ServiceState::NotConnected;
|
||||
|
||||
// Run the events loops.
|
||||
runtime.block_on(future::poll_fn(|| -> Result<_, io::Error> {
|
||||
loop {
|
||||
let mut service1_not_ready = false;
|
||||
|
||||
match service1.poll().unwrap() {
|
||||
Async::Ready(Some(LegacyProtoOut::CustomProtocolOpen { .. })) => {
|
||||
match service1_state {
|
||||
ServiceState::NotConnected => {
|
||||
service1_state = ServiceState::FirstConnec;
|
||||
if service2_state == ServiceState::FirstConnec {
|
||||
service1.disconnect_peer(Swarm::local_peer_id(&service2));
|
||||
}
|
||||
},
|
||||
ServiceState::Disconnected => service1_state = ServiceState::ConnectedAgain,
|
||||
ServiceState::FirstConnec | ServiceState::ConnectedAgain => panic!(),
|
||||
}
|
||||
},
|
||||
Async::Ready(Some(LegacyProtoOut::CustomProtocolClosed { .. })) => {
|
||||
match service1_state {
|
||||
ServiceState::FirstConnec => service1_state = ServiceState::Disconnected,
|
||||
ServiceState::ConnectedAgain| ServiceState::NotConnected |
|
||||
ServiceState::Disconnected => panic!(),
|
||||
}
|
||||
},
|
||||
Async::NotReady => service1_not_ready = true,
|
||||
_ => panic!()
|
||||
}
|
||||
|
||||
match service2.poll().unwrap() {
|
||||
Async::Ready(Some(LegacyProtoOut::CustomProtocolOpen { .. })) => {
|
||||
match service2_state {
|
||||
ServiceState::NotConnected => {
|
||||
service2_state = ServiceState::FirstConnec;
|
||||
if service1_state == ServiceState::FirstConnec {
|
||||
service1.disconnect_peer(Swarm::local_peer_id(&service2));
|
||||
}
|
||||
},
|
||||
ServiceState::Disconnected => service2_state = ServiceState::ConnectedAgain,
|
||||
ServiceState::FirstConnec | ServiceState::ConnectedAgain => panic!(),
|
||||
}
|
||||
},
|
||||
Async::Ready(Some(LegacyProtoOut::CustomProtocolClosed { .. })) => {
|
||||
match service2_state {
|
||||
ServiceState::FirstConnec => service2_state = ServiceState::Disconnected,
|
||||
ServiceState::ConnectedAgain| ServiceState::NotConnected |
|
||||
ServiceState::Disconnected => panic!(),
|
||||
}
|
||||
},
|
||||
Async::NotReady if service1_not_ready => break,
|
||||
Async::NotReady => {}
|
||||
_ => panic!()
|
||||
}
|
||||
}
|
||||
|
||||
if service1_state == ServiceState::ConnectedAgain && service2_state == ServiceState::ConnectedAgain {
|
||||
Ok(Async::Ready(()))
|
||||
} else {
|
||||
Ok(Async::NotReady)
|
||||
}
|
||||
})).unwrap();
|
||||
|
||||
// Do a second 3-seconds run to make sure we don't get disconnected immediately again.
|
||||
let mut delay = tokio::timer::Delay::new(Instant::now() + Duration::from_secs(3));
|
||||
runtime.block_on(future::poll_fn(|| -> Result<_, io::Error> {
|
||||
match service1.poll().unwrap() {
|
||||
Async::NotReady => {},
|
||||
_ => panic!()
|
||||
}
|
||||
|
||||
match service2.poll().unwrap() {
|
||||
Async::NotReady => {},
|
||||
_ => panic!()
|
||||
}
|
||||
|
||||
if let Async::Ready(()) = delay.poll().unwrap() {
|
||||
Ok(Async::Ready(()))
|
||||
} else {
|
||||
Ok(Async::NotReady)
|
||||
}
|
||||
})).unwrap();
|
||||
}
|
||||
@@ -0,0 +1,292 @@
|
||||
// Copyright 2018-2019 Parity Technologies (UK) Ltd.
|
||||
// This file is part of Substrate.
|
||||
|
||||
// Substrate is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// Substrate is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
use crate::config::ProtocolId;
|
||||
use bytes::{Bytes, BytesMut};
|
||||
use libp2p::core::{Negotiated, Endpoint, UpgradeInfo, InboundUpgrade, OutboundUpgrade, upgrade::ProtocolName};
|
||||
use libp2p::tokio_codec::Framed;
|
||||
use std::{collections::VecDeque, io, vec::IntoIter as VecIntoIter};
|
||||
use futures::{prelude::*, future, stream};
|
||||
use tokio_io::{AsyncRead, AsyncWrite};
|
||||
use unsigned_varint::codec::UviBytes;
|
||||
|
||||
/// Connection upgrade for a single protocol.
|
||||
///
|
||||
/// Note that "a single protocol" here refers to `par` for example. However
|
||||
/// each protocol can have multiple different versions for networking purposes.
|
||||
pub struct RegisteredProtocol {
|
||||
/// Id of the protocol for API purposes.
|
||||
id: ProtocolId,
|
||||
/// Base name of the protocol as advertised on the network.
|
||||
/// Ends with `/` so that we can append a version number behind.
|
||||
base_name: Bytes,
|
||||
/// List of protocol versions that we support.
|
||||
/// Ordered in descending order so that the best comes first.
|
||||
supported_versions: Vec<u8>,
|
||||
}
|
||||
|
||||
impl RegisteredProtocol {
|
||||
/// Creates a new `RegisteredProtocol`. The `custom_data` parameter will be
|
||||
/// passed inside the `RegisteredProtocolOutput`.
|
||||
pub fn new(protocol: impl Into<ProtocolId>, versions: &[u8])
|
||||
-> Self {
|
||||
let protocol = protocol.into();
|
||||
let mut base_name = Bytes::from_static(b"/substrate/");
|
||||
base_name.extend_from_slice(protocol.as_bytes());
|
||||
base_name.extend_from_slice(b"/");
|
||||
|
||||
RegisteredProtocol {
|
||||
base_name,
|
||||
id: protocol,
|
||||
supported_versions: {
|
||||
let mut tmp = versions.to_vec();
|
||||
tmp.sort_unstable_by(|a, b| b.cmp(&a));
|
||||
tmp
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Clone for RegisteredProtocol {
|
||||
fn clone(&self) -> Self {
|
||||
RegisteredProtocol {
|
||||
id: self.id.clone(),
|
||||
base_name: self.base_name.clone(),
|
||||
supported_versions: self.supported_versions.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Output of a `RegisteredProtocol` upgrade.
|
||||
pub struct RegisteredProtocolSubstream<TSubstream> {
|
||||
/// If true, we are in the process of closing the sink.
|
||||
is_closing: bool,
|
||||
/// Whether the local node opened this substream (dialer), or we received this substream from
|
||||
/// the remote (listener).
|
||||
endpoint: Endpoint,
|
||||
/// Buffer of packets to send.
|
||||
send_queue: VecDeque<Vec<u8>>,
|
||||
/// If true, we should call `poll_complete` on the inner sink.
|
||||
requires_poll_complete: bool,
|
||||
/// The underlying substream.
|
||||
inner: stream::Fuse<Framed<Negotiated<TSubstream>, UviBytes<Vec<u8>>>>,
|
||||
/// Version of the protocol that was negotiated.
|
||||
protocol_version: u8,
|
||||
/// If true, we have sent a "remote is clogged" event recently and shouldn't send another one
|
||||
/// unless the buffer empties then fills itself again.
|
||||
clogged_fuse: bool,
|
||||
}
|
||||
|
||||
impl<TSubstream> RegisteredProtocolSubstream<TSubstream> {
|
||||
/// Returns the version of the protocol that was negotiated.
|
||||
pub fn protocol_version(&self) -> u8 {
|
||||
self.protocol_version
|
||||
}
|
||||
|
||||
/// Returns whether the local node opened this substream (dialer), or we received this
|
||||
/// substream from the remote (listener).
|
||||
pub fn endpoint(&self) -> Endpoint {
|
||||
self.endpoint
|
||||
}
|
||||
|
||||
/// Starts a graceful shutdown process on this substream.
|
||||
///
|
||||
/// Note that "graceful" means that we sent a closing message. We don't wait for any
|
||||
/// confirmation from the remote.
|
||||
///
|
||||
/// After calling this, the stream is guaranteed to finish soon-ish.
|
||||
pub fn shutdown(&mut self) {
|
||||
self.is_closing = true;
|
||||
self.send_queue.clear();
|
||||
}
|
||||
|
||||
/// Sends a message to the substream.
|
||||
pub fn send_message(&mut self, data: Vec<u8>) {
|
||||
if self.is_closing {
|
||||
return
|
||||
}
|
||||
|
||||
self.send_queue.push_back(data);
|
||||
}
|
||||
}
|
||||
|
||||
/// Event produced by the `RegisteredProtocolSubstream`.
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum RegisteredProtocolEvent {
|
||||
/// Received a message from the remote.
|
||||
Message(BytesMut),
|
||||
|
||||
/// Diagnostic event indicating that the connection is clogged and we should avoid sending too
|
||||
/// many messages to it.
|
||||
Clogged {
|
||||
/// Copy of the messages that are within the buffer, for further diagnostic.
|
||||
messages: Vec<Vec<u8>>,
|
||||
},
|
||||
}
|
||||
|
||||
impl<TSubstream> Stream for RegisteredProtocolSubstream<TSubstream>
|
||||
where TSubstream: AsyncRead + AsyncWrite {
|
||||
type Item = RegisteredProtocolEvent;
|
||||
type Error = io::Error;
|
||||
|
||||
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
|
||||
// Flushing the local queue.
|
||||
while let Some(packet) = self.send_queue.pop_front() {
|
||||
match self.inner.start_send(packet)? {
|
||||
AsyncSink::NotReady(packet) => {
|
||||
self.send_queue.push_front(packet);
|
||||
break
|
||||
},
|
||||
AsyncSink::Ready => self.requires_poll_complete = true,
|
||||
}
|
||||
}
|
||||
|
||||
// If we are closing, close as soon as the Sink is closed.
|
||||
if self.is_closing {
|
||||
return Ok(self.inner.close()?.map(|()| None))
|
||||
}
|
||||
|
||||
// Indicating that the remote is clogged if that's the case.
|
||||
if self.send_queue.len() >= 2048 {
|
||||
if !self.clogged_fuse {
|
||||
// Note: this fuse is important not just for preventing us from flooding the logs;
|
||||
// if you remove the fuse, then we will always return early from this function and
|
||||
// thus never read any message from the network.
|
||||
self.clogged_fuse = true;
|
||||
return Ok(Async::Ready(Some(RegisteredProtocolEvent::Clogged {
|
||||
messages: self.send_queue.iter()
|
||||
.map(|m| m.clone())
|
||||
.collect(),
|
||||
})))
|
||||
}
|
||||
} else {
|
||||
self.clogged_fuse = false;
|
||||
}
|
||||
|
||||
// Flushing if necessary.
|
||||
if self.requires_poll_complete {
|
||||
if let Async::Ready(()) = self.inner.poll_complete()? {
|
||||
self.requires_poll_complete = false;
|
||||
}
|
||||
}
|
||||
|
||||
// Receiving incoming packets.
|
||||
// Note that `inner` is wrapped in a `Fuse`, therefore we can poll it forever.
|
||||
match self.inner.poll()? {
|
||||
Async::Ready(Some(data)) => {
|
||||
Ok(Async::Ready(Some(RegisteredProtocolEvent::Message(data))))
|
||||
}
|
||||
Async::Ready(None) =>
|
||||
if !self.requires_poll_complete && self.send_queue.is_empty() {
|
||||
Ok(Async::Ready(None))
|
||||
} else {
|
||||
Ok(Async::NotReady)
|
||||
}
|
||||
Async::NotReady => Ok(Async::NotReady),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl UpgradeInfo for RegisteredProtocol {
|
||||
type Info = RegisteredProtocolName;
|
||||
type InfoIter = VecIntoIter<Self::Info>;
|
||||
|
||||
#[inline]
|
||||
fn protocol_info(&self) -> Self::InfoIter {
|
||||
// Report each version as an individual protocol.
|
||||
self.supported_versions.iter().map(|&version| {
|
||||
let num = version.to_string();
|
||||
|
||||
let mut name = self.base_name.clone();
|
||||
name.extend_from_slice(num.as_bytes());
|
||||
RegisteredProtocolName {
|
||||
name,
|
||||
version,
|
||||
}
|
||||
}).collect::<Vec<_>>().into_iter()
|
||||
}
|
||||
}
|
||||
|
||||
/// Implementation of `ProtocolName` for a custom protocol.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct RegisteredProtocolName {
|
||||
/// Protocol name, as advertised on the wire.
|
||||
name: Bytes,
|
||||
/// Version number. Stored in string form in `name`, but duplicated here for easier retrieval.
|
||||
version: u8,
|
||||
}
|
||||
|
||||
impl ProtocolName for RegisteredProtocolName {
|
||||
fn protocol_name(&self) -> &[u8] {
|
||||
&self.name
|
||||
}
|
||||
}
|
||||
|
||||
impl<TSubstream> InboundUpgrade<TSubstream> for RegisteredProtocol
|
||||
where TSubstream: AsyncRead + AsyncWrite,
|
||||
{
|
||||
type Output = RegisteredProtocolSubstream<TSubstream>;
|
||||
type Future = future::FutureResult<Self::Output, io::Error>;
|
||||
type Error = io::Error;
|
||||
|
||||
fn upgrade_inbound(
|
||||
self,
|
||||
socket: Negotiated<TSubstream>,
|
||||
info: Self::Info,
|
||||
) -> Self::Future {
|
||||
let framed = {
|
||||
let mut codec = UviBytes::default();
|
||||
codec.set_max_len(16 * 1024 * 1024); // 16 MiB hard limit for packets.
|
||||
Framed::new(socket, codec)
|
||||
};
|
||||
|
||||
future::ok(RegisteredProtocolSubstream {
|
||||
is_closing: false,
|
||||
endpoint: Endpoint::Listener,
|
||||
send_queue: VecDeque::new(),
|
||||
requires_poll_complete: false,
|
||||
inner: framed.fuse(),
|
||||
protocol_version: info.version,
|
||||
clogged_fuse: false,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl<TSubstream> OutboundUpgrade<TSubstream> for RegisteredProtocol
|
||||
where TSubstream: AsyncRead + AsyncWrite,
|
||||
{
|
||||
type Output = <Self as InboundUpgrade<TSubstream>>::Output;
|
||||
type Future = <Self as InboundUpgrade<TSubstream>>::Future;
|
||||
type Error = <Self as InboundUpgrade<TSubstream>>::Error;
|
||||
|
||||
fn upgrade_outbound(
|
||||
self,
|
||||
socket: Negotiated<TSubstream>,
|
||||
info: Self::Info,
|
||||
) -> Self::Future {
|
||||
let framed = Framed::new(socket, UviBytes::default());
|
||||
|
||||
future::ok(RegisteredProtocolSubstream {
|
||||
is_closing: false,
|
||||
endpoint: Endpoint::Dialer,
|
||||
send_queue: VecDeque::new(),
|
||||
requires_poll_complete: false,
|
||||
inner: framed.fuse(),
|
||||
protocol_version: info.version,
|
||||
clogged_fuse: false,
|
||||
})
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user