From 5f004b44284c75622a4778f82b27809cec6186f6 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Thu, 11 Mar 2021 14:57:06 +0100 Subject: [PATCH] Remove legacy network protocol (#8296) --- substrate/client/network/src/protocol.rs | 100 +----- .../src/protocol/generic_proto/behaviour.rs | 54 +--- .../src/protocol/generic_proto/handler.rs | 204 +++--------- .../src/protocol/generic_proto/tests.rs | 5 +- .../src/protocol/generic_proto/upgrade.rs | 8 +- .../protocol/generic_proto/upgrade/legacy.rs | 293 ------------------ 6 files changed, 47 insertions(+), 617 deletions(-) delete mode 100644 substrate/client/network/src/protocol/generic_proto/upgrade/legacy.rs diff --git a/substrate/client/network/src/protocol.rs b/substrate/client/network/src/protocol.rs index bddd79269f..7f321775b1 100644 --- a/substrate/client/network/src/protocol.rs +++ b/substrate/client/network/src/protocol.rs @@ -24,7 +24,7 @@ use crate::{ utils::{interval, LruHashSet}, }; -use bytes::{Bytes, BytesMut}; +use bytes::Bytes; use codec::{Decode, DecodeAll, Encode}; use futures::{channel::oneshot, prelude::*}; use generic_proto::{GenericProto, GenericProtoOut}; @@ -75,11 +75,6 @@ const MAX_BLOCK_ANNOUNCE_SIZE: u64 = 1024 * 1024; // Must be equal to `max(MAX_BLOCK_ANNOUNCE_SIZE, MAX_TRANSACTIONS_SIZE)`. pub(crate) const BLOCK_ANNOUNCES_TRANSACTIONS_SUBSTREAM_SIZE: u64 = 16 * 1024 * 1024; -/// Current protocol version. -pub(crate) const CURRENT_VERSION: u32 = 6; -/// Lowest version we support -pub(crate) const MIN_VERSION: u32 = 3; - /// Identifier of the peerset for the block announces protocol. const HARDCODED_PEERSETS_SYNC: sc_peerset::SetId = sc_peerset::SetId::from(0); /// Number of hardcoded peersets (the constants right above). Any set whose identifier is equal or @@ -254,26 +249,6 @@ impl BlockAnnouncesHandshake { } } -/// Builds a SCALE-encoded "Status" message to send as handshake for the legacy protocol. -fn build_status_message( - protocol_config: &ProtocolConfig, - best_number: NumberFor, - best_hash: B::Hash, - genesis_hash: B::Hash, -) -> Vec { - let status = message::generic::Status { - version: CURRENT_VERSION, - min_supported_version: MIN_VERSION, - genesis_hash, - roles: protocol_config.roles.into(), - best_number, - best_hash, - chain_status: Vec::new(), // TODO: find a way to make this backwards-compatible - }; - - Message::::Status(status).encode() -} - impl Protocol { /// Create a new instance. pub fn new( @@ -375,8 +350,6 @@ impl Protocol { }); let behaviour = { - let versions = &((MIN_VERSION as u8)..=(CURRENT_VERSION as u8)).collect::>(); - let best_number = info.best_number; let best_hash = info.best_hash; let genesis_hash = info.genesis_hash; @@ -389,9 +362,6 @@ impl Protocol { ).encode(); GenericProto::new( - protocol_id.clone(), - versions, - build_status_message::(&config, best_number, best_hash, genesis_hash), peerset, iter::once((block_announces_protocol, block_announces_handshake, MAX_BLOCK_ANNOUNCE_SIZE)) .chain(network_config.extra_sets.iter() @@ -511,9 +481,6 @@ impl Protocol { self.sync.update_chain_info(&hash, number); - self.behaviour.set_legacy_handshake_message( - build_status_message::(&self.config, number, hash, self.genesis_hash), - ); self.behaviour.set_notif_protocol_handshake( HARDCODED_PEERSETS_SYNC, BlockAnnouncesHandshake::::build( @@ -539,64 +506,6 @@ impl Protocol { self.peers.iter().map(|(id, peer)| (id, &peer.info)) } - fn on_custom_message( - &mut self, - who: PeerId, - data: BytesMut, - ) -> CustomMessageOutcome { - let message = match as Decode>::decode(&mut &data[..]) { - Ok(message) => message, - Err(err) => { - debug!( - target: "sync", - "Couldn't decode packet sent by {}: {:?}: {}", - who, - data, - err, - ); - self.peerset_handle.report_peer(who, rep::BAD_MESSAGE); - return CustomMessageOutcome::None; - } - }; - - match message { - GenericMessage::Status(_) => - debug!(target: "sub-libp2p", "Received unexpected Status"), - GenericMessage::BlockAnnounce(announce) => - self.push_block_announce_validation(who.clone(), announce), - GenericMessage::Transactions(_) => - warn!(target: "sub-libp2p", "Received unexpected Transactions"), - GenericMessage::BlockResponse(_) => - warn!(target: "sub-libp2p", "Received unexpected BlockResponse"), - GenericMessage::RemoteCallResponse(_) => - warn!(target: "sub-libp2p", "Received unexpected RemoteCallResponse"), - GenericMessage::RemoteReadResponse(_) => - warn!(target: "sub-libp2p", "Received unexpected RemoteReadResponse"), - GenericMessage::RemoteHeaderResponse(_) => - warn!(target: "sub-libp2p", "Received unexpected RemoteHeaderResponse"), - GenericMessage::RemoteChangesResponse(_) => - warn!(target: "sub-libp2p", "Received unexpected RemoteChangesResponse"), - GenericMessage::BlockRequest(_) | - GenericMessage::RemoteReadChildRequest(_) | - GenericMessage::RemoteCallRequest(_) | - GenericMessage::RemoteReadRequest(_) | - GenericMessage::RemoteHeaderRequest(_) | - GenericMessage::RemoteChangesRequest(_) | - GenericMessage::Consensus(_) | - GenericMessage::ConsensusBatch(_) => { - debug!( - target: "sub-libp2p", - "Received no longer supported legacy request from {:?}", - who - ); - self.behaviour.disconnect_peer(&who, HARDCODED_PEERSETS_SYNC); - self.peerset_handle.report_peer(who, rep::BAD_PROTOCOL); - }, - } - - CustomMessageOutcome::None - } - fn prepare_block_request( &mut self, who: PeerId, @@ -1547,13 +1456,6 @@ impl NetworkBehaviour for Protocol { } } }, - GenericProtoOut::LegacyMessage { peer_id, message } => { - if self.peers.contains_key(&peer_id) { - self.on_custom_message(peer_id, message) - } else { - CustomMessageOutcome::None - } - }, GenericProtoOut::Notification { peer_id, set_id, message } => match set_id { HARDCODED_PEERSETS_SYNC if self.peers.contains_key(&peer_id) => { diff --git a/substrate/client/network/src/protocol/generic_proto/behaviour.rs b/substrate/client/network/src/protocol/generic_proto/behaviour.rs index 3283ea33a0..77a54e09ea 100644 --- a/substrate/client/network/src/protocol/generic_proto/behaviour.rs +++ b/substrate/client/network/src/protocol/generic_proto/behaviour.rs @@ -16,10 +16,8 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . -use crate::config::ProtocolId; use crate::protocol::generic_proto::{ - handler::{NotificationsSink, NotifsHandlerProto, NotifsHandlerOut, NotifsHandlerIn}, - upgrade::RegisteredProtocol + handler::{NotificationsSink, NotifsHandlerProto, NotifsHandlerOut, NotifsHandlerIn} }; use bytes::BytesMut; @@ -97,9 +95,6 @@ use wasm_timer::Instant; /// accommodates for any number of connections. /// pub struct GenericProto { - /// Legacy protocol to open with peers. Never modified. - legacy_protocol: RegisteredProtocol, - /// Notification protocols. Entries are only ever added and not removed. /// Contains, for each protocol, the protocol name and the message to send as part of the /// initial handshake. @@ -346,14 +341,6 @@ pub enum GenericProtoOut { set_id: sc_peerset::SetId, }, - /// Receives a message on the legacy substream. - LegacyMessage { - /// Id of the peer the message came from. - peer_id: PeerId, - /// Message that has been received. - message: BytesMut, - }, - /// Receives a message on a custom protocol substream. /// /// Also concerns received notifications for the notifications API. @@ -370,9 +357,6 @@ pub enum GenericProtoOut { impl GenericProto { /// Creates a `CustomProtos`. pub fn new( - protocol: impl Into, - versions: &[u8], - handshake_message: Vec, peerset: sc_peerset::Peerset, notif_protocols: impl Iterator, Vec, u64)>, ) -> Self { @@ -382,11 +366,7 @@ impl GenericProto { assert!(!notif_protocols.is_empty()); - let legacy_handshake_message = Arc::new(RwLock::new(handshake_message)); - let legacy_protocol = RegisteredProtocol::new(protocol, versions, legacy_handshake_message); - GenericProto { - legacy_protocol, notif_protocols, peerset, peers: FnvHashMap::default(), @@ -412,14 +392,6 @@ impl GenericProto { } } - /// Modifies the handshake of the legacy protocol. - pub fn set_legacy_handshake_message( - &mut self, - handshake_message: impl Into> - ) { - *self.legacy_protocol.handshake_message().write() = handshake_message.into(); - } - /// Returns the number of discovered nodes that we keep in memory. pub fn num_discovered_peers(&self) -> usize { self.peerset.num_discovered_peers() @@ -1046,10 +1018,7 @@ impl NetworkBehaviour for GenericProto { type OutEvent = GenericProtoOut; fn new_handler(&mut self) -> Self::ProtocolsHandler { - NotifsHandlerProto::new( - self.legacy_protocol.clone(), - self.notif_protocols.clone(), - ) + NotifsHandlerProto::new(self.notif_protocols.clone()) } fn addresses_of_peer(&mut self, _: &PeerId) -> Vec { @@ -1900,25 +1869,6 @@ impl NetworkBehaviour for GenericProto { }; } - NotifsHandlerOut::CustomMessage { message } => { - if self.is_open(&source, sc_peerset::SetId::from(0)) { // TODO: using set 0 here is hacky - trace!(target: "sub-libp2p", "Handler({:?}) => Message", source); - trace!(target: "sub-libp2p", "External API <= Message({:?})", source); - let event = GenericProtoOut::LegacyMessage { - peer_id: source, - message, - }; - - self.events.push_back(NetworkBehaviourAction::GenerateEvent(event)); - } else { - trace!( - target: "sub-libp2p", - "Handler({:?}) => Post-close message. Dropping message.", - source, - ); - } - } - NotifsHandlerOut::Notification { protocol_index, message } => { let set_id = sc_peerset::SetId::from(protocol_index); if self.is_open(&source, set_id) { diff --git a/substrate/client/network/src/protocol/generic_proto/handler.rs b/substrate/client/network/src/protocol/generic_proto/handler.rs index 6fdcef1d7a..0db249f90a 100644 --- a/substrate/client/network/src/protocol/generic_proto/handler.rs +++ b/substrate/client/network/src/protocol/generic_proto/handler.rs @@ -60,14 +60,12 @@ use crate::protocol::generic_proto::{ upgrade::{ NotificationsIn, NotificationsOut, NotificationsInSubstream, NotificationsOutSubstream, - NotificationsHandshakeError, RegisteredProtocol, RegisteredProtocolSubstream, - RegisteredProtocolEvent, UpgradeCollec + NotificationsHandshakeError, UpgradeCollec }, }; use bytes::BytesMut; -use libp2p::core::{either::EitherOutput, ConnectedPoint, PeerId}; -use libp2p::core::upgrade::{SelectUpgrade, InboundUpgrade, OutboundUpgrade}; +use libp2p::core::{ConnectedPoint, PeerId, upgrade::{InboundUpgrade, OutboundUpgrade}}; use libp2p::swarm::{ ProtocolsHandler, ProtocolsHandlerEvent, IntoProtocolsHandler, @@ -83,7 +81,6 @@ use futures::{ }; use log::error; use parking_lot::{Mutex, RwLock}; -use smallvec::SmallVec; use std::{borrow::Cow, collections::VecDeque, mem, pin::Pin, str, sync::Arc, task::{Context, Poll}, time::Duration}; use wasm_timer::Instant; @@ -114,9 +111,6 @@ pub struct NotifsHandlerProto { /// Name of protocols, prototypes for upgrades for inbound substreams, and the message we /// send or respond with in the handshake. protocols: Vec<(Cow<'static, str>, NotificationsIn, Arc>>, u64)>, - - /// Configuration for the legacy protocol upgrade. - legacy_protocol: RegisteredProtocol, } /// The actual handler once the connection has been established. @@ -135,15 +129,6 @@ pub struct NotifsHandler { /// Remote we are connected to. peer_id: PeerId, - /// Configuration for the legacy protocol upgrade. - legacy_protocol: RegisteredProtocol, - - /// The substreams where bidirectional communications happen. - legacy_substreams: SmallVec<[RegisteredProtocolSubstream; 4]>, - - /// Contains substreams which are being shut down. - legacy_shutdown: SmallVec<[RegisteredProtocolSubstream; 4]>, - /// Events to return in priority from `poll`. events_queue: VecDeque< ProtocolsHandlerEvent @@ -227,12 +212,10 @@ enum State { impl IntoProtocolsHandler for NotifsHandlerProto { type Handler = NotifsHandler; - fn inbound_protocol(&self) -> SelectUpgrade, RegisteredProtocol> { - let protocols = self.protocols.iter() + fn inbound_protocol(&self) -> UpgradeCollec { + self.protocols.iter() .map(|(_, p, _, _)| p.clone()) - .collect::>(); - - SelectUpgrade::new(protocols, self.legacy_protocol.clone()) + .collect::>() } fn into_handler(self, peer_id: &PeerId, connected_point: &ConnectedPoint) -> Self::Handler { @@ -251,9 +234,6 @@ impl IntoProtocolsHandler for NotifsHandlerProto { peer_id: peer_id.clone(), endpoint: connected_point.clone(), when_connection_open: Instant::now(), - legacy_protocol: self.legacy_protocol, - legacy_substreams: SmallVec::new(), - legacy_shutdown: SmallVec::new(), events_queue: VecDeque::with_capacity(16), } } @@ -332,17 +312,6 @@ pub enum NotifsHandlerOut { protocol_index: usize, }, - /// Received a non-gossiping message on the legacy substream. - /// - /// Can only happen when the handler is in the open state. - CustomMessage { - /// Message that has been received. - /// - /// Keep in mind that this can be a `ConsensusMessage` message, which then contains a - /// notification. - message: BytesMut, - }, - /// Received a message on a custom protocol substream. /// /// Can only happen when the handler is in the open state. @@ -476,7 +445,6 @@ impl NotifsHandlerProto { /// is always the same whether we open a substream ourselves or respond to handshake from /// the remote. pub fn new( - legacy_protocol: RegisteredProtocol, list: impl Into, Arc>>, u64)>>, ) -> Self { let protocols = list @@ -489,7 +457,6 @@ impl NotifsHandlerProto { NotifsHandlerProto { protocols, - legacy_protocol, } } } @@ -498,7 +465,7 @@ impl ProtocolsHandler for NotifsHandler { type InEvent = NotifsHandlerIn; type OutEvent = NotifsHandlerOut; type Error = NotifsHandlerError; - type InboundProtocol = SelectUpgrade, RegisteredProtocol>; + type InboundProtocol = UpgradeCollec; type OutboundProtocol = NotificationsOut; // Index within the `out_protocols`. type OutboundOpenInfo = usize; @@ -509,69 +476,51 @@ impl ProtocolsHandler for NotifsHandler { .map(|p| p.in_upgrade.clone()) .collect::>(); - let with_legacy = SelectUpgrade::new(protocols, self.legacy_protocol.clone()); - SubstreamProtocol::new(with_legacy, ()) + SubstreamProtocol::new(protocols, ()) } fn inject_fully_negotiated_inbound( &mut self, - out: >::Output, + ((_remote_handshake, mut new_substream), protocol_index): + >::Output, (): () ) { - match out { - // Received notifications substream. - EitherOutput::First(((_remote_handshake, mut new_substream), protocol_index)) => { - let mut protocol_info = &mut self.protocols[protocol_index]; - match protocol_info.state { - State::Closed { pending_opening } => { - self.events_queue.push_back(ProtocolsHandlerEvent::Custom( - NotifsHandlerOut::OpenDesiredByRemote { - protocol_index, - } - )); + let mut protocol_info = &mut self.protocols[protocol_index]; + match protocol_info.state { + State::Closed { pending_opening } => { + self.events_queue.push_back(ProtocolsHandlerEvent::Custom( + NotifsHandlerOut::OpenDesiredByRemote { + protocol_index, + } + )); - protocol_info.state = State::OpenDesiredByRemote { - in_substream: new_substream, - pending_opening, - }; - }, - State::OpenDesiredByRemote { .. } => { - // If a substream already exists, silently drop the new one. - // Note that we drop the substream, which will send an equivalent to a - // TCP "RST" to the remote and force-close the substream. It might - // seem like an unclean way to get rid of a substream. However, keep - // in mind that it is invalid for the remote to open multiple such - // substreams, and therefore sending a "RST" is the most correct thing - // to do. - return; - }, - State::Opening { ref mut in_substream, .. } | - State::Open { ref mut in_substream, .. } => { - if in_substream.is_some() { - // Same remark as above. - return; - } - - // Create `handshake_message` on a separate line to be sure that the - // lock is released as soon as possible. - let handshake_message = protocol_info.handshake.read().clone(); - new_substream.send_handshake(handshake_message); - *in_substream = Some(new_substream); - }, + protocol_info.state = State::OpenDesiredByRemote { + in_substream: new_substream, + pending_opening, }; - } - - // Received legacy substream. - EitherOutput::Second((substream, _handshake)) => { - // Note: while we awknowledge legacy substreams and handle incoming messages, - // it doesn't trigger any `OpenDesiredByRemote` event as a way to simplify the - // logic of this code. - // Since mid-2019, legacy substreams are supposed to be used at the same time as - // notifications substreams, and not in isolation. Nodes that open legacy - // substreams in isolation are considered deprecated. - if self.legacy_substreams.len() <= 4 { - self.legacy_substreams.push(substream); + }, + State::OpenDesiredByRemote { .. } => { + // If a substream already exists, silently drop the new one. + // Note that we drop the substream, which will send an equivalent to a + // TCP "RST" to the remote and force-close the substream. It might + // seem like an unclean way to get rid of a substream. However, keep + // in mind that it is invalid for the remote to open multiple such + // substreams, and therefore sending a "RST" is the most correct thing + // to do. + return; + }, + State::Opening { ref mut in_substream, .. } | + State::Open { ref mut in_substream, .. } => { + if in_substream.is_some() { + // Same remark as above. + return; } + + // Create `handshake_message` on a separate line to be sure that the + // lock is released as soon as possible. + let handshake_message = protocol_info.handshake.read().clone(); + new_substream.send_handshake(handshake_message); + *in_substream = Some(new_substream); }, } } @@ -683,11 +632,6 @@ impl ProtocolsHandler for NotifsHandler { }, NotifsHandlerIn::Close { protocol_index } => { - for mut substream in self.legacy_substreams.drain(..) { - substream.shutdown(); - self.legacy_shutdown.push(substream); - } - match self.protocols[protocol_index].state { State::Open { .. } => { self.protocols[protocol_index].state = State::Closed { @@ -752,10 +696,6 @@ impl ProtocolsHandler for NotifsHandler { } fn connection_keep_alive(&self) -> KeepAlive { - if !self.legacy_substreams.is_empty() { - return KeepAlive::Yes; - } - // `Yes` if any protocol has some activity. if self.protocols.iter().any(|p| !matches!(p.state, State::Closed { .. })) { return KeepAlive::Yes; @@ -883,68 +823,8 @@ impl ProtocolsHandler for NotifsHandler { } } } - - // The legacy substreams are polled only if the state is `Open`. Otherwise, it would be - // possible to receive notifications that would need to get silently discarded. - if matches!(self.protocols[0].state, State::Open { .. }) { - for n in (0..self.legacy_substreams.len()).rev() { - let mut substream = self.legacy_substreams.swap_remove(n); - let poll_outcome = Pin::new(&mut substream).poll_next(cx); - match poll_outcome { - Poll::Pending => self.legacy_substreams.push(substream), - Poll::Ready(Some(Ok(RegisteredProtocolEvent::Message(message)))) => { - self.legacy_substreams.push(substream); - return Poll::Ready(ProtocolsHandlerEvent::Custom( - NotifsHandlerOut::CustomMessage { message } - )) - }, - Poll::Ready(Some(Ok(RegisteredProtocolEvent::Clogged))) => { - return Poll::Ready(ProtocolsHandlerEvent::Close( - NotifsHandlerError::SyncNotificationsClogged - )) - } - Poll::Ready(None) | Poll::Ready(Some(Err(_))) => { - if matches!(poll_outcome, Poll::Ready(None)) { - self.legacy_shutdown.push(substream); - } - - if let State::Open { out_substream, .. } = &mut self.protocols[0].state { - if !out_substream.is_some() { - *out_substream = None; - return Poll::Ready(ProtocolsHandlerEvent::Custom( - NotifsHandlerOut::CloseDesired { - protocol_index: 0, - } - )) - } - } - } - } - } - } } - shutdown_list(&mut self.legacy_shutdown, cx); - Poll::Pending } } - -/// Given a list of substreams, tries to shut them down. The substreams that have been successfully -/// shut down are removed from the list. -fn shutdown_list - (list: &mut SmallVec>>, - cx: &mut Context) -{ - 'outer: for n in (0..list.len()).rev() { - let mut substream = list.swap_remove(n); - loop { - match substream.poll_next_unpin(cx) { - Poll::Ready(Some(Ok(_))) => {} - Poll::Pending => break, - Poll::Ready(Some(Err(_))) | Poll::Ready(None) => continue 'outer, - } - } - list.push(substream); - } -} diff --git a/substrate/client/network/src/protocol/generic_proto/tests.rs b/substrate/client/network/src/protocol/generic_proto/tests.rs index 967c0e9f8d..2c80fe8523 100644 --- a/substrate/client/network/src/protocol/generic_proto/tests.rs +++ b/substrate/client/network/src/protocol/generic_proto/tests.rs @@ -80,10 +80,7 @@ fn build_nodes() -> (Swarm, Swarm) { }); let behaviour = CustomProtoWithAddr { - inner: GenericProto::new( - "test", &[1], vec![], peerset, - iter::once(("/foo".into(), Vec::new(), 1024 * 1024)) - ), + inner: GenericProto::new(peerset, iter::once(("/foo".into(), Vec::new(), 1024 * 1024))), addrs: addrs .iter() .enumerate() diff --git a/substrate/client/network/src/protocol/generic_proto/upgrade.rs b/substrate/client/network/src/protocol/generic_proto/upgrade.rs index 6917742d8a..b23e5eab06 100644 --- a/substrate/client/network/src/protocol/generic_proto/upgrade.rs +++ b/substrate/client/network/src/protocol/generic_proto/upgrade.rs @@ -15,13 +15,8 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . + pub use self::collec::UpgradeCollec; -pub use self::legacy::{ - RegisteredProtocol, - RegisteredProtocolEvent, - RegisteredProtocolName, - RegisteredProtocolSubstream -}; pub use self::notifications::{ NotificationsIn, NotificationsInSubstream, @@ -32,5 +27,4 @@ pub use self::notifications::{ }; mod collec; -mod legacy; mod notifications; diff --git a/substrate/client/network/src/protocol/generic_proto/upgrade/legacy.rs b/substrate/client/network/src/protocol/generic_proto/upgrade/legacy.rs deleted file mode 100644 index 6a5ceb5571..0000000000 --- a/substrate/client/network/src/protocol/generic_proto/upgrade/legacy.rs +++ /dev/null @@ -1,293 +0,0 @@ -// This file is part of Substrate. - -// Copyright (C) 2018-2021 Parity Technologies (UK) Ltd. -// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with this program. If not, see . - -use crate::config::ProtocolId; -use bytes::BytesMut; -use futures::prelude::*; -use asynchronous_codec::Framed; -use libp2p::core::{UpgradeInfo, InboundUpgrade, OutboundUpgrade, upgrade::ProtocolName}; -use parking_lot::RwLock; -use std::{collections::VecDeque, io, pin::Pin, sync::Arc, vec::IntoIter as VecIntoIter}; -use std::task::{Context, Poll}; -use unsigned_varint::codec::UviBytes; - -/// Connection upgrade for a single protocol. -/// -/// Note that "a single protocol" here refers to `par` for example. However -/// each protocol can have multiple different versions for networking purposes. -pub struct RegisteredProtocol { - /// Id of the protocol for API purposes. - id: ProtocolId, - /// Base name of the protocol as advertised on the network. - /// Ends with `/` so that we can append a version number behind. - base_name: Vec, - /// List of protocol versions that we support. - /// Ordered in descending order so that the best comes first. - supported_versions: Vec, - /// Handshake to send after the substream is open. - handshake_message: Arc>>, -} - -impl RegisteredProtocol { - /// Creates a new `RegisteredProtocol`. - pub fn new(protocol: impl Into, versions: &[u8], handshake_message: Arc>>) - -> Self { - let protocol = protocol.into(); - let mut base_name = b"/substrate/".to_vec(); - base_name.extend_from_slice(protocol.as_ref().as_bytes()); - base_name.extend_from_slice(b"/"); - - RegisteredProtocol { - base_name, - id: protocol, - supported_versions: { - let mut tmp = versions.to_vec(); - tmp.sort_by(|a, b| b.cmp(&a)); - tmp - }, - handshake_message, - } - } - - /// Returns the `Arc` to the handshake message that was passed at initialization. - pub fn handshake_message(&self) -> &Arc>> { - &self.handshake_message - } -} - -impl Clone for RegisteredProtocol { - fn clone(&self) -> Self { - RegisteredProtocol { - id: self.id.clone(), - base_name: self.base_name.clone(), - supported_versions: self.supported_versions.clone(), - handshake_message: self.handshake_message.clone(), - } - } -} - -/// Output of a `RegisteredProtocol` upgrade. -pub struct RegisteredProtocolSubstream { - /// If true, we are in the process of closing the sink. - is_closing: bool, - /// Buffer of packets to send. - send_queue: VecDeque, - /// If true, we should call `poll_complete` on the inner sink. - requires_poll_flush: bool, - /// The underlying substream. - inner: stream::Fuse>>, - /// If true, we have sent a "remote is clogged" event recently and shouldn't send another one - /// unless the buffer empties then fills itself again. - clogged_fuse: bool, -} - -impl RegisteredProtocolSubstream { - /// Starts a graceful shutdown process on this substream. - /// - /// Note that "graceful" means that we sent a closing message. We don't wait for any - /// confirmation from the remote. - /// - /// After calling this, the stream is guaranteed to finish soon-ish. - pub fn shutdown(&mut self) { - self.is_closing = true; - self.send_queue.clear(); - } -} - -/// Event produced by the `RegisteredProtocolSubstream`. -#[derive(Debug, Clone)] -pub enum RegisteredProtocolEvent { - /// Received a message from the remote. - Message(BytesMut), - - /// Diagnostic event indicating that the connection is clogged and we should avoid sending too - /// many messages to it. - Clogged, -} - -impl Stream for RegisteredProtocolSubstream -where TSubstream: AsyncRead + AsyncWrite + Unpin { - type Item = Result; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - // Flushing the local queue. - while !self.send_queue.is_empty() { - match Pin::new(&mut self.inner).poll_ready(cx) { - Poll::Ready(Ok(())) => {}, - Poll::Ready(Err(err)) => return Poll::Ready(Some(Err(err))), - Poll::Pending => break, - } - - if let Some(packet) = self.send_queue.pop_front() { - Pin::new(&mut self.inner).start_send(packet)?; - self.requires_poll_flush = true; - } - } - - // If we are closing, close as soon as the Sink is closed. - if self.is_closing { - return match Pin::new(&mut self.inner).poll_close(cx) { - Poll::Pending => Poll::Pending, - Poll::Ready(Ok(_)) => Poll::Ready(None), - Poll::Ready(Err(err)) => Poll::Ready(Some(Err(err))), - } - } - - // Indicating that the remote is clogged if that's the case. - if self.send_queue.len() >= 1536 { - if !self.clogged_fuse { - // Note: this fuse is important not just for preventing us from flooding the logs; - // if you remove the fuse, then we will always return early from this function and - // thus never read any message from the network. - self.clogged_fuse = true; - return Poll::Ready(Some(Ok(RegisteredProtocolEvent::Clogged))) - } - } else { - self.clogged_fuse = false; - } - - // Flushing if necessary. - if self.requires_poll_flush { - if let Poll::Ready(()) = Pin::new(&mut self.inner).poll_flush(cx)? { - self.requires_poll_flush = false; - } - } - - // Receiving incoming packets. - // Note that `inner` is wrapped in a `Fuse`, therefore we can poll it forever. - match Pin::new(&mut self.inner).poll_next(cx)? { - Poll::Ready(Some(data)) => { - Poll::Ready(Some(Ok(RegisteredProtocolEvent::Message(data)))) - } - Poll::Ready(None) => - if !self.requires_poll_flush && self.send_queue.is_empty() { - Poll::Ready(None) - } else { - Poll::Pending - } - Poll::Pending => Poll::Pending, - } - } -} - -impl UpgradeInfo for RegisteredProtocol { - type Info = RegisteredProtocolName; - type InfoIter = VecIntoIter; - - #[inline] - fn protocol_info(&self) -> Self::InfoIter { - // Report each version as an individual protocol. - self.supported_versions.iter().map(|&version| { - let num = version.to_string(); - - let mut name = self.base_name.clone(); - name.extend_from_slice(num.as_bytes()); - RegisteredProtocolName { - name, - version, - } - }).collect::>().into_iter() - } -} - -/// Implementation of `ProtocolName` for a custom protocol. -#[derive(Debug, Clone)] -pub struct RegisteredProtocolName { - /// Protocol name, as advertised on the wire. - name: Vec, - /// Version number. Stored in string form in `name`, but duplicated here for easier retrieval. - version: u8, -} - -impl ProtocolName for RegisteredProtocolName { - fn protocol_name(&self) -> &[u8] { - &self.name - } -} - -impl InboundUpgrade for RegisteredProtocol -where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + 'static, -{ - type Output = (RegisteredProtocolSubstream, Vec); - type Future = Pin> + Send>>; - type Error = io::Error; - - fn upgrade_inbound( - self, - socket: TSubstream, - _: Self::Info, - ) -> Self::Future { - Box::pin(async move { - let mut framed = { - let mut codec = UviBytes::default(); - codec.set_max_len(16 * 1024 * 1024); // 16 MiB hard limit for packets. - Framed::new(socket, codec) - }; - - let handshake = BytesMut::from(&self.handshake_message.read()[..]); - framed.send(handshake).await?; - let received_handshake = framed.next().await - .ok_or_else(|| io::ErrorKind::UnexpectedEof)??; - - Ok((RegisteredProtocolSubstream { - is_closing: false, - send_queue: VecDeque::new(), - requires_poll_flush: false, - inner: framed.fuse(), - clogged_fuse: false, - }, received_handshake.to_vec())) - }) - } -} - -impl OutboundUpgrade for RegisteredProtocol -where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + 'static, -{ - type Output = >::Output; - type Future = >::Future; - type Error = >::Error; - - fn upgrade_outbound( - self, - socket: TSubstream, - _: Self::Info, - ) -> Self::Future { - Box::pin(async move { - let mut framed = { - let mut codec = UviBytes::default(); - codec.set_max_len(16 * 1024 * 1024); // 16 MiB hard limit for packets. - Framed::new(socket, codec) - }; - - let handshake = BytesMut::from(&self.handshake_message.read()[..]); - framed.send(handshake).await?; - let received_handshake = framed.next().await - .ok_or_else(|| { - io::Error::new(io::ErrorKind::UnexpectedEof, "Failed to receive handshake") - })??; - - Ok((RegisteredProtocolSubstream { - is_closing: false, - send_queue: VecDeque::new(), - requires_poll_flush: false, - inner: framed.fuse(), - clogged_fuse: false, - }, received_handshake.to_vec())) - }) - } -}