Remove legacy network protocol (#8296)

This commit is contained in:
Pierre Krieger
2021-03-11 14:57:06 +01:00
committed by GitHub
parent 8f2517b06e
commit 5f004b4428
6 changed files with 47 additions and 617 deletions
+1 -99
View File
@@ -24,7 +24,7 @@ use crate::{
utils::{interval, LruHashSet},
};
use bytes::{Bytes, BytesMut};
use bytes::Bytes;
use codec::{Decode, DecodeAll, Encode};
use futures::{channel::oneshot, prelude::*};
use generic_proto::{GenericProto, GenericProtoOut};
@@ -75,11 +75,6 @@ const MAX_BLOCK_ANNOUNCE_SIZE: u64 = 1024 * 1024;
// Must be equal to `max(MAX_BLOCK_ANNOUNCE_SIZE, MAX_TRANSACTIONS_SIZE)`.
pub(crate) const BLOCK_ANNOUNCES_TRANSACTIONS_SUBSTREAM_SIZE: u64 = 16 * 1024 * 1024;
/// Current protocol version.
pub(crate) const CURRENT_VERSION: u32 = 6;
/// Lowest version we support
pub(crate) const MIN_VERSION: u32 = 3;
/// Identifier of the peerset for the block announces protocol.
const HARDCODED_PEERSETS_SYNC: sc_peerset::SetId = sc_peerset::SetId::from(0);
/// Number of hardcoded peersets (the constants right above). Any set whose identifier is equal or
@@ -254,26 +249,6 @@ impl<B: BlockT> BlockAnnouncesHandshake<B> {
}
}
/// Builds a SCALE-encoded "Status" message to send as handshake for the legacy protocol.
fn build_status_message<B: BlockT>(
protocol_config: &ProtocolConfig,
best_number: NumberFor<B>,
best_hash: B::Hash,
genesis_hash: B::Hash,
) -> Vec<u8> {
let status = message::generic::Status {
version: CURRENT_VERSION,
min_supported_version: MIN_VERSION,
genesis_hash,
roles: protocol_config.roles.into(),
best_number,
best_hash,
chain_status: Vec::new(), // TODO: find a way to make this backwards-compatible
};
Message::<B>::Status(status).encode()
}
impl<B: BlockT> Protocol<B> {
/// Create a new instance.
pub fn new(
@@ -375,8 +350,6 @@ impl<B: BlockT> Protocol<B> {
});
let behaviour = {
let versions = &((MIN_VERSION as u8)..=(CURRENT_VERSION as u8)).collect::<Vec<u8>>();
let best_number = info.best_number;
let best_hash = info.best_hash;
let genesis_hash = info.genesis_hash;
@@ -389,9 +362,6 @@ impl<B: BlockT> Protocol<B> {
).encode();
GenericProto::new(
protocol_id.clone(),
versions,
build_status_message::<B>(&config, best_number, best_hash, genesis_hash),
peerset,
iter::once((block_announces_protocol, block_announces_handshake, MAX_BLOCK_ANNOUNCE_SIZE))
.chain(network_config.extra_sets.iter()
@@ -511,9 +481,6 @@ impl<B: BlockT> Protocol<B> {
self.sync.update_chain_info(&hash, number);
self.behaviour.set_legacy_handshake_message(
build_status_message::<B>(&self.config, number, hash, self.genesis_hash),
);
self.behaviour.set_notif_protocol_handshake(
HARDCODED_PEERSETS_SYNC,
BlockAnnouncesHandshake::<B>::build(
@@ -539,64 +506,6 @@ impl<B: BlockT> Protocol<B> {
self.peers.iter().map(|(id, peer)| (id, &peer.info))
}
fn on_custom_message(
&mut self,
who: PeerId,
data: BytesMut,
) -> CustomMessageOutcome<B> {
let message = match <Message<B> as Decode>::decode(&mut &data[..]) {
Ok(message) => message,
Err(err) => {
debug!(
target: "sync",
"Couldn't decode packet sent by {}: {:?}: {}",
who,
data,
err,
);
self.peerset_handle.report_peer(who, rep::BAD_MESSAGE);
return CustomMessageOutcome::None;
}
};
match message {
GenericMessage::Status(_) =>
debug!(target: "sub-libp2p", "Received unexpected Status"),
GenericMessage::BlockAnnounce(announce) =>
self.push_block_announce_validation(who.clone(), announce),
GenericMessage::Transactions(_) =>
warn!(target: "sub-libp2p", "Received unexpected Transactions"),
GenericMessage::BlockResponse(_) =>
warn!(target: "sub-libp2p", "Received unexpected BlockResponse"),
GenericMessage::RemoteCallResponse(_) =>
warn!(target: "sub-libp2p", "Received unexpected RemoteCallResponse"),
GenericMessage::RemoteReadResponse(_) =>
warn!(target: "sub-libp2p", "Received unexpected RemoteReadResponse"),
GenericMessage::RemoteHeaderResponse(_) =>
warn!(target: "sub-libp2p", "Received unexpected RemoteHeaderResponse"),
GenericMessage::RemoteChangesResponse(_) =>
warn!(target: "sub-libp2p", "Received unexpected RemoteChangesResponse"),
GenericMessage::BlockRequest(_) |
GenericMessage::RemoteReadChildRequest(_) |
GenericMessage::RemoteCallRequest(_) |
GenericMessage::RemoteReadRequest(_) |
GenericMessage::RemoteHeaderRequest(_) |
GenericMessage::RemoteChangesRequest(_) |
GenericMessage::Consensus(_) |
GenericMessage::ConsensusBatch(_) => {
debug!(
target: "sub-libp2p",
"Received no longer supported legacy request from {:?}",
who
);
self.behaviour.disconnect_peer(&who, HARDCODED_PEERSETS_SYNC);
self.peerset_handle.report_peer(who, rep::BAD_PROTOCOL);
},
}
CustomMessageOutcome::None
}
fn prepare_block_request(
&mut self,
who: PeerId,
@@ -1547,13 +1456,6 @@ impl<B: BlockT> NetworkBehaviour for Protocol<B> {
}
}
},
GenericProtoOut::LegacyMessage { peer_id, message } => {
if self.peers.contains_key(&peer_id) {
self.on_custom_message(peer_id, message)
} else {
CustomMessageOutcome::None
}
},
GenericProtoOut::Notification { peer_id, set_id, message } =>
match set_id {
HARDCODED_PEERSETS_SYNC if self.peers.contains_key(&peer_id) => {
@@ -16,10 +16,8 @@
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use crate::config::ProtocolId;
use crate::protocol::generic_proto::{
handler::{NotificationsSink, NotifsHandlerProto, NotifsHandlerOut, NotifsHandlerIn},
upgrade::RegisteredProtocol
handler::{NotificationsSink, NotifsHandlerProto, NotifsHandlerOut, NotifsHandlerIn}
};
use bytes::BytesMut;
@@ -97,9 +95,6 @@ use wasm_timer::Instant;
/// accommodates for any number of connections.
///
pub struct GenericProto {
/// Legacy protocol to open with peers. Never modified.
legacy_protocol: RegisteredProtocol,
/// Notification protocols. Entries are only ever added and not removed.
/// Contains, for each protocol, the protocol name and the message to send as part of the
/// initial handshake.
@@ -346,14 +341,6 @@ pub enum GenericProtoOut {
set_id: sc_peerset::SetId,
},
/// Receives a message on the legacy substream.
LegacyMessage {
/// Id of the peer the message came from.
peer_id: PeerId,
/// Message that has been received.
message: BytesMut,
},
/// Receives a message on a custom protocol substream.
///
/// Also concerns received notifications for the notifications API.
@@ -370,9 +357,6 @@ pub enum GenericProtoOut {
impl GenericProto {
/// Creates a `CustomProtos`.
pub fn new(
protocol: impl Into<ProtocolId>,
versions: &[u8],
handshake_message: Vec<u8>,
peerset: sc_peerset::Peerset,
notif_protocols: impl Iterator<Item = (Cow<'static, str>, Vec<u8>, u64)>,
) -> Self {
@@ -382,11 +366,7 @@ impl GenericProto {
assert!(!notif_protocols.is_empty());
let legacy_handshake_message = Arc::new(RwLock::new(handshake_message));
let legacy_protocol = RegisteredProtocol::new(protocol, versions, legacy_handshake_message);
GenericProto {
legacy_protocol,
notif_protocols,
peerset,
peers: FnvHashMap::default(),
@@ -412,14 +392,6 @@ impl GenericProto {
}
}
/// Modifies the handshake of the legacy protocol.
pub fn set_legacy_handshake_message(
&mut self,
handshake_message: impl Into<Vec<u8>>
) {
*self.legacy_protocol.handshake_message().write() = handshake_message.into();
}
/// Returns the number of discovered nodes that we keep in memory.
pub fn num_discovered_peers(&self) -> usize {
self.peerset.num_discovered_peers()
@@ -1046,10 +1018,7 @@ impl NetworkBehaviour for GenericProto {
type OutEvent = GenericProtoOut;
fn new_handler(&mut self) -> Self::ProtocolsHandler {
NotifsHandlerProto::new(
self.legacy_protocol.clone(),
self.notif_protocols.clone(),
)
NotifsHandlerProto::new(self.notif_protocols.clone())
}
fn addresses_of_peer(&mut self, _: &PeerId) -> Vec<Multiaddr> {
@@ -1900,25 +1869,6 @@ impl NetworkBehaviour for GenericProto {
};
}
NotifsHandlerOut::CustomMessage { message } => {
if self.is_open(&source, sc_peerset::SetId::from(0)) { // TODO: using set 0 here is hacky
trace!(target: "sub-libp2p", "Handler({:?}) => Message", source);
trace!(target: "sub-libp2p", "External API <= Message({:?})", source);
let event = GenericProtoOut::LegacyMessage {
peer_id: source,
message,
};
self.events.push_back(NetworkBehaviourAction::GenerateEvent(event));
} else {
trace!(
target: "sub-libp2p",
"Handler({:?}) => Post-close message. Dropping message.",
source,
);
}
}
NotifsHandlerOut::Notification { protocol_index, message } => {
let set_id = sc_peerset::SetId::from(protocol_index);
if self.is_open(&source, set_id) {
@@ -60,14 +60,12 @@
use crate::protocol::generic_proto::{
upgrade::{
NotificationsIn, NotificationsOut, NotificationsInSubstream, NotificationsOutSubstream,
NotificationsHandshakeError, RegisteredProtocol, RegisteredProtocolSubstream,
RegisteredProtocolEvent, UpgradeCollec
NotificationsHandshakeError, UpgradeCollec
},
};
use bytes::BytesMut;
use libp2p::core::{either::EitherOutput, ConnectedPoint, PeerId};
use libp2p::core::upgrade::{SelectUpgrade, InboundUpgrade, OutboundUpgrade};
use libp2p::core::{ConnectedPoint, PeerId, upgrade::{InboundUpgrade, OutboundUpgrade}};
use libp2p::swarm::{
ProtocolsHandler, ProtocolsHandlerEvent,
IntoProtocolsHandler,
@@ -83,7 +81,6 @@ use futures::{
};
use log::error;
use parking_lot::{Mutex, RwLock};
use smallvec::SmallVec;
use std::{borrow::Cow, collections::VecDeque, mem, pin::Pin, str, sync::Arc, task::{Context, Poll}, time::Duration};
use wasm_timer::Instant;
@@ -114,9 +111,6 @@ pub struct NotifsHandlerProto {
/// Name of protocols, prototypes for upgrades for inbound substreams, and the message we
/// send or respond with in the handshake.
protocols: Vec<(Cow<'static, str>, NotificationsIn, Arc<RwLock<Vec<u8>>>, u64)>,
/// Configuration for the legacy protocol upgrade.
legacy_protocol: RegisteredProtocol,
}
/// The actual handler once the connection has been established.
@@ -135,15 +129,6 @@ pub struct NotifsHandler {
/// Remote we are connected to.
peer_id: PeerId,
/// Configuration for the legacy protocol upgrade.
legacy_protocol: RegisteredProtocol,
/// The substreams where bidirectional communications happen.
legacy_substreams: SmallVec<[RegisteredProtocolSubstream<NegotiatedSubstream>; 4]>,
/// Contains substreams which are being shut down.
legacy_shutdown: SmallVec<[RegisteredProtocolSubstream<NegotiatedSubstream>; 4]>,
/// Events to return in priority from `poll`.
events_queue: VecDeque<
ProtocolsHandlerEvent<NotificationsOut, usize, NotifsHandlerOut, NotifsHandlerError>
@@ -227,12 +212,10 @@ enum State {
impl IntoProtocolsHandler for NotifsHandlerProto {
type Handler = NotifsHandler;
fn inbound_protocol(&self) -> SelectUpgrade<UpgradeCollec<NotificationsIn>, RegisteredProtocol> {
let protocols = self.protocols.iter()
fn inbound_protocol(&self) -> UpgradeCollec<NotificationsIn> {
self.protocols.iter()
.map(|(_, p, _, _)| p.clone())
.collect::<UpgradeCollec<_>>();
SelectUpgrade::new(protocols, self.legacy_protocol.clone())
.collect::<UpgradeCollec<_>>()
}
fn into_handler(self, peer_id: &PeerId, connected_point: &ConnectedPoint) -> Self::Handler {
@@ -251,9 +234,6 @@ impl IntoProtocolsHandler for NotifsHandlerProto {
peer_id: peer_id.clone(),
endpoint: connected_point.clone(),
when_connection_open: Instant::now(),
legacy_protocol: self.legacy_protocol,
legacy_substreams: SmallVec::new(),
legacy_shutdown: SmallVec::new(),
events_queue: VecDeque::with_capacity(16),
}
}
@@ -332,17 +312,6 @@ pub enum NotifsHandlerOut {
protocol_index: usize,
},
/// Received a non-gossiping message on the legacy substream.
///
/// Can only happen when the handler is in the open state.
CustomMessage {
/// Message that has been received.
///
/// Keep in mind that this can be a `ConsensusMessage` message, which then contains a
/// notification.
message: BytesMut,
},
/// Received a message on a custom protocol substream.
///
/// Can only happen when the handler is in the open state.
@@ -476,7 +445,6 @@ impl NotifsHandlerProto {
/// is always the same whether we open a substream ourselves or respond to handshake from
/// the remote.
pub fn new(
legacy_protocol: RegisteredProtocol,
list: impl Into<Vec<(Cow<'static, str>, Arc<RwLock<Vec<u8>>>, u64)>>,
) -> Self {
let protocols = list
@@ -489,7 +457,6 @@ impl NotifsHandlerProto {
NotifsHandlerProto {
protocols,
legacy_protocol,
}
}
}
@@ -498,7 +465,7 @@ impl ProtocolsHandler for NotifsHandler {
type InEvent = NotifsHandlerIn;
type OutEvent = NotifsHandlerOut;
type Error = NotifsHandlerError;
type InboundProtocol = SelectUpgrade<UpgradeCollec<NotificationsIn>, RegisteredProtocol>;
type InboundProtocol = UpgradeCollec<NotificationsIn>;
type OutboundProtocol = NotificationsOut;
// Index within the `out_protocols`.
type OutboundOpenInfo = usize;
@@ -509,69 +476,51 @@ impl ProtocolsHandler for NotifsHandler {
.map(|p| p.in_upgrade.clone())
.collect::<UpgradeCollec<_>>();
let with_legacy = SelectUpgrade::new(protocols, self.legacy_protocol.clone());
SubstreamProtocol::new(with_legacy, ())
SubstreamProtocol::new(protocols, ())
}
fn inject_fully_negotiated_inbound(
&mut self,
out: <Self::InboundProtocol as InboundUpgrade<NegotiatedSubstream>>::Output,
((_remote_handshake, mut new_substream), protocol_index):
<Self::InboundProtocol as InboundUpgrade<NegotiatedSubstream>>::Output,
(): ()
) {
match out {
// Received notifications substream.
EitherOutput::First(((_remote_handshake, mut new_substream), protocol_index)) => {
let mut protocol_info = &mut self.protocols[protocol_index];
match protocol_info.state {
State::Closed { pending_opening } => {
self.events_queue.push_back(ProtocolsHandlerEvent::Custom(
NotifsHandlerOut::OpenDesiredByRemote {
protocol_index,
}
));
let mut protocol_info = &mut self.protocols[protocol_index];
match protocol_info.state {
State::Closed { pending_opening } => {
self.events_queue.push_back(ProtocolsHandlerEvent::Custom(
NotifsHandlerOut::OpenDesiredByRemote {
protocol_index,
}
));
protocol_info.state = State::OpenDesiredByRemote {
in_substream: new_substream,
pending_opening,
};
},
State::OpenDesiredByRemote { .. } => {
// If a substream already exists, silently drop the new one.
// Note that we drop the substream, which will send an equivalent to a
// TCP "RST" to the remote and force-close the substream. It might
// seem like an unclean way to get rid of a substream. However, keep
// in mind that it is invalid for the remote to open multiple such
// substreams, and therefore sending a "RST" is the most correct thing
// to do.
return;
},
State::Opening { ref mut in_substream, .. } |
State::Open { ref mut in_substream, .. } => {
if in_substream.is_some() {
// Same remark as above.
return;
}
// Create `handshake_message` on a separate line to be sure that the
// lock is released as soon as possible.
let handshake_message = protocol_info.handshake.read().clone();
new_substream.send_handshake(handshake_message);
*in_substream = Some(new_substream);
},
protocol_info.state = State::OpenDesiredByRemote {
in_substream: new_substream,
pending_opening,
};
}
// Received legacy substream.
EitherOutput::Second((substream, _handshake)) => {
// Note: while we awknowledge legacy substreams and handle incoming messages,
// it doesn't trigger any `OpenDesiredByRemote` event as a way to simplify the
// logic of this code.
// Since mid-2019, legacy substreams are supposed to be used at the same time as
// notifications substreams, and not in isolation. Nodes that open legacy
// substreams in isolation are considered deprecated.
if self.legacy_substreams.len() <= 4 {
self.legacy_substreams.push(substream);
},
State::OpenDesiredByRemote { .. } => {
// If a substream already exists, silently drop the new one.
// Note that we drop the substream, which will send an equivalent to a
// TCP "RST" to the remote and force-close the substream. It might
// seem like an unclean way to get rid of a substream. However, keep
// in mind that it is invalid for the remote to open multiple such
// substreams, and therefore sending a "RST" is the most correct thing
// to do.
return;
},
State::Opening { ref mut in_substream, .. } |
State::Open { ref mut in_substream, .. } => {
if in_substream.is_some() {
// Same remark as above.
return;
}
// Create `handshake_message` on a separate line to be sure that the
// lock is released as soon as possible.
let handshake_message = protocol_info.handshake.read().clone();
new_substream.send_handshake(handshake_message);
*in_substream = Some(new_substream);
},
}
}
@@ -683,11 +632,6 @@ impl ProtocolsHandler for NotifsHandler {
},
NotifsHandlerIn::Close { protocol_index } => {
for mut substream in self.legacy_substreams.drain(..) {
substream.shutdown();
self.legacy_shutdown.push(substream);
}
match self.protocols[protocol_index].state {
State::Open { .. } => {
self.protocols[protocol_index].state = State::Closed {
@@ -752,10 +696,6 @@ impl ProtocolsHandler for NotifsHandler {
}
fn connection_keep_alive(&self) -> KeepAlive {
if !self.legacy_substreams.is_empty() {
return KeepAlive::Yes;
}
// `Yes` if any protocol has some activity.
if self.protocols.iter().any(|p| !matches!(p.state, State::Closed { .. })) {
return KeepAlive::Yes;
@@ -883,68 +823,8 @@ impl ProtocolsHandler for NotifsHandler {
}
}
}
// The legacy substreams are polled only if the state is `Open`. Otherwise, it would be
// possible to receive notifications that would need to get silently discarded.
if matches!(self.protocols[0].state, State::Open { .. }) {
for n in (0..self.legacy_substreams.len()).rev() {
let mut substream = self.legacy_substreams.swap_remove(n);
let poll_outcome = Pin::new(&mut substream).poll_next(cx);
match poll_outcome {
Poll::Pending => self.legacy_substreams.push(substream),
Poll::Ready(Some(Ok(RegisteredProtocolEvent::Message(message)))) => {
self.legacy_substreams.push(substream);
return Poll::Ready(ProtocolsHandlerEvent::Custom(
NotifsHandlerOut::CustomMessage { message }
))
},
Poll::Ready(Some(Ok(RegisteredProtocolEvent::Clogged))) => {
return Poll::Ready(ProtocolsHandlerEvent::Close(
NotifsHandlerError::SyncNotificationsClogged
))
}
Poll::Ready(None) | Poll::Ready(Some(Err(_))) => {
if matches!(poll_outcome, Poll::Ready(None)) {
self.legacy_shutdown.push(substream);
}
if let State::Open { out_substream, .. } = &mut self.protocols[0].state {
if !out_substream.is_some() {
*out_substream = None;
return Poll::Ready(ProtocolsHandlerEvent::Custom(
NotifsHandlerOut::CloseDesired {
protocol_index: 0,
}
))
}
}
}
}
}
}
}
shutdown_list(&mut self.legacy_shutdown, cx);
Poll::Pending
}
}
/// Given a list of substreams, tries to shut them down. The substreams that have been successfully
/// shut down are removed from the list.
fn shutdown_list
(list: &mut SmallVec<impl smallvec::Array<Item = RegisteredProtocolSubstream<NegotiatedSubstream>>>,
cx: &mut Context)
{
'outer: for n in (0..list.len()).rev() {
let mut substream = list.swap_remove(n);
loop {
match substream.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(_))) => {}
Poll::Pending => break,
Poll::Ready(Some(Err(_))) | Poll::Ready(None) => continue 'outer,
}
}
list.push(substream);
}
}
@@ -80,10 +80,7 @@ fn build_nodes() -> (Swarm<CustomProtoWithAddr>, Swarm<CustomProtoWithAddr>) {
});
let behaviour = CustomProtoWithAddr {
inner: GenericProto::new(
"test", &[1], vec![], peerset,
iter::once(("/foo".into(), Vec::new(), 1024 * 1024))
),
inner: GenericProto::new(peerset, iter::once(("/foo".into(), Vec::new(), 1024 * 1024))),
addrs: addrs
.iter()
.enumerate()
@@ -15,13 +15,8 @@
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
pub use self::collec::UpgradeCollec;
pub use self::legacy::{
RegisteredProtocol,
RegisteredProtocolEvent,
RegisteredProtocolName,
RegisteredProtocolSubstream
};
pub use self::notifications::{
NotificationsIn,
NotificationsInSubstream,
@@ -32,5 +27,4 @@ pub use self::notifications::{
};
mod collec;
mod legacy;
mod notifications;
@@ -1,293 +0,0 @@
// This file is part of Substrate.
// Copyright (C) 2018-2021 Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use crate::config::ProtocolId;
use bytes::BytesMut;
use futures::prelude::*;
use asynchronous_codec::Framed;
use libp2p::core::{UpgradeInfo, InboundUpgrade, OutboundUpgrade, upgrade::ProtocolName};
use parking_lot::RwLock;
use std::{collections::VecDeque, io, pin::Pin, sync::Arc, vec::IntoIter as VecIntoIter};
use std::task::{Context, Poll};
use unsigned_varint::codec::UviBytes;
/// Connection upgrade for a single protocol.
///
/// Note that "a single protocol" here refers to `par` for example. However
/// each protocol can have multiple different versions for networking purposes.
pub struct RegisteredProtocol {
/// Id of the protocol for API purposes.
id: ProtocolId,
/// Base name of the protocol as advertised on the network.
/// Ends with `/` so that we can append a version number behind.
base_name: Vec<u8>,
/// List of protocol versions that we support.
/// Ordered in descending order so that the best comes first.
supported_versions: Vec<u8>,
/// Handshake to send after the substream is open.
handshake_message: Arc<RwLock<Vec<u8>>>,
}
impl RegisteredProtocol {
/// Creates a new `RegisteredProtocol`.
pub fn new(protocol: impl Into<ProtocolId>, versions: &[u8], handshake_message: Arc<RwLock<Vec<u8>>>)
-> Self {
let protocol = protocol.into();
let mut base_name = b"/substrate/".to_vec();
base_name.extend_from_slice(protocol.as_ref().as_bytes());
base_name.extend_from_slice(b"/");
RegisteredProtocol {
base_name,
id: protocol,
supported_versions: {
let mut tmp = versions.to_vec();
tmp.sort_by(|a, b| b.cmp(&a));
tmp
},
handshake_message,
}
}
/// Returns the `Arc` to the handshake message that was passed at initialization.
pub fn handshake_message(&self) -> &Arc<RwLock<Vec<u8>>> {
&self.handshake_message
}
}
impl Clone for RegisteredProtocol {
fn clone(&self) -> Self {
RegisteredProtocol {
id: self.id.clone(),
base_name: self.base_name.clone(),
supported_versions: self.supported_versions.clone(),
handshake_message: self.handshake_message.clone(),
}
}
}
/// Output of a `RegisteredProtocol` upgrade.
pub struct RegisteredProtocolSubstream<TSubstream> {
/// If true, we are in the process of closing the sink.
is_closing: bool,
/// Buffer of packets to send.
send_queue: VecDeque<BytesMut>,
/// If true, we should call `poll_complete` on the inner sink.
requires_poll_flush: bool,
/// The underlying substream.
inner: stream::Fuse<Framed<TSubstream, UviBytes<BytesMut>>>,
/// If true, we have sent a "remote is clogged" event recently and shouldn't send another one
/// unless the buffer empties then fills itself again.
clogged_fuse: bool,
}
impl<TSubstream> RegisteredProtocolSubstream<TSubstream> {
/// Starts a graceful shutdown process on this substream.
///
/// Note that "graceful" means that we sent a closing message. We don't wait for any
/// confirmation from the remote.
///
/// After calling this, the stream is guaranteed to finish soon-ish.
pub fn shutdown(&mut self) {
self.is_closing = true;
self.send_queue.clear();
}
}
/// Event produced by the `RegisteredProtocolSubstream`.
#[derive(Debug, Clone)]
pub enum RegisteredProtocolEvent {
/// Received a message from the remote.
Message(BytesMut),
/// Diagnostic event indicating that the connection is clogged and we should avoid sending too
/// many messages to it.
Clogged,
}
impl<TSubstream> Stream for RegisteredProtocolSubstream<TSubstream>
where TSubstream: AsyncRead + AsyncWrite + Unpin {
type Item = Result<RegisteredProtocolEvent, io::Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
// Flushing the local queue.
while !self.send_queue.is_empty() {
match Pin::new(&mut self.inner).poll_ready(cx) {
Poll::Ready(Ok(())) => {},
Poll::Ready(Err(err)) => return Poll::Ready(Some(Err(err))),
Poll::Pending => break,
}
if let Some(packet) = self.send_queue.pop_front() {
Pin::new(&mut self.inner).start_send(packet)?;
self.requires_poll_flush = true;
}
}
// If we are closing, close as soon as the Sink is closed.
if self.is_closing {
return match Pin::new(&mut self.inner).poll_close(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(Ok(_)) => Poll::Ready(None),
Poll::Ready(Err(err)) => Poll::Ready(Some(Err(err))),
}
}
// Indicating that the remote is clogged if that's the case.
if self.send_queue.len() >= 1536 {
if !self.clogged_fuse {
// Note: this fuse is important not just for preventing us from flooding the logs;
// if you remove the fuse, then we will always return early from this function and
// thus never read any message from the network.
self.clogged_fuse = true;
return Poll::Ready(Some(Ok(RegisteredProtocolEvent::Clogged)))
}
} else {
self.clogged_fuse = false;
}
// Flushing if necessary.
if self.requires_poll_flush {
if let Poll::Ready(()) = Pin::new(&mut self.inner).poll_flush(cx)? {
self.requires_poll_flush = false;
}
}
// Receiving incoming packets.
// Note that `inner` is wrapped in a `Fuse`, therefore we can poll it forever.
match Pin::new(&mut self.inner).poll_next(cx)? {
Poll::Ready(Some(data)) => {
Poll::Ready(Some(Ok(RegisteredProtocolEvent::Message(data))))
}
Poll::Ready(None) =>
if !self.requires_poll_flush && self.send_queue.is_empty() {
Poll::Ready(None)
} else {
Poll::Pending
}
Poll::Pending => Poll::Pending,
}
}
}
impl UpgradeInfo for RegisteredProtocol {
type Info = RegisteredProtocolName;
type InfoIter = VecIntoIter<Self::Info>;
#[inline]
fn protocol_info(&self) -> Self::InfoIter {
// Report each version as an individual protocol.
self.supported_versions.iter().map(|&version| {
let num = version.to_string();
let mut name = self.base_name.clone();
name.extend_from_slice(num.as_bytes());
RegisteredProtocolName {
name,
version,
}
}).collect::<Vec<_>>().into_iter()
}
}
/// Implementation of `ProtocolName` for a custom protocol.
#[derive(Debug, Clone)]
pub struct RegisteredProtocolName {
/// Protocol name, as advertised on the wire.
name: Vec<u8>,
/// Version number. Stored in string form in `name`, but duplicated here for easier retrieval.
version: u8,
}
impl ProtocolName for RegisteredProtocolName {
fn protocol_name(&self) -> &[u8] {
&self.name
}
}
impl<TSubstream> InboundUpgrade<TSubstream> for RegisteredProtocol
where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
type Output = (RegisteredProtocolSubstream<TSubstream>, Vec<u8>);
type Future = Pin<Box<dyn Future<Output = Result<Self::Output, io::Error>> + Send>>;
type Error = io::Error;
fn upgrade_inbound(
self,
socket: TSubstream,
_: Self::Info,
) -> Self::Future {
Box::pin(async move {
let mut framed = {
let mut codec = UviBytes::default();
codec.set_max_len(16 * 1024 * 1024); // 16 MiB hard limit for packets.
Framed::new(socket, codec)
};
let handshake = BytesMut::from(&self.handshake_message.read()[..]);
framed.send(handshake).await?;
let received_handshake = framed.next().await
.ok_or_else(|| io::ErrorKind::UnexpectedEof)??;
Ok((RegisteredProtocolSubstream {
is_closing: false,
send_queue: VecDeque::new(),
requires_poll_flush: false,
inner: framed.fuse(),
clogged_fuse: false,
}, received_handshake.to_vec()))
})
}
}
impl<TSubstream> OutboundUpgrade<TSubstream> for RegisteredProtocol
where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
type Output = <Self as InboundUpgrade<TSubstream>>::Output;
type Future = <Self as InboundUpgrade<TSubstream>>::Future;
type Error = <Self as InboundUpgrade<TSubstream>>::Error;
fn upgrade_outbound(
self,
socket: TSubstream,
_: Self::Info,
) -> Self::Future {
Box::pin(async move {
let mut framed = {
let mut codec = UviBytes::default();
codec.set_max_len(16 * 1024 * 1024); // 16 MiB hard limit for packets.
Framed::new(socket, codec)
};
let handshake = BytesMut::from(&self.handshake_message.read()[..]);
framed.send(handshake).await?;
let received_handshake = framed.next().await
.ok_or_else(|| {
io::Error::new(io::ErrorKind::UnexpectedEof, "Failed to receive handshake")
})??;
Ok((RegisteredProtocolSubstream {
is_closing: false,
send_queue: VecDeque::new(),
requires_poll_flush: false,
inner: framed.fuse(),
clogged_fuse: false,
}, received_handshake.to_vec()))
})
}
}