No longer actively open legacy substreams (#7076)

* Allow remotes to not open a legacy substream

* No longer actively open legacy substreams

* Misc fixes

* Line width

* Special case first protocol as the one bearing the handshake

* Legacy opening state no longer keeps connection alive

* Remove now-unused code

* Simplify inject_dial_upgrade_error

* [chaos:basic]

* [chaos:basic]

* [chaos:basic]
This commit is contained in:
Pierre Krieger
2020-10-16 13:07:05 +02:00
committed by GitHub
parent 0dcb16b513
commit 385c4ddf69
3 changed files with 61 additions and 172 deletions
@@ -1349,27 +1349,6 @@ impl NetworkBehaviour for GenericProto {
self.events.push_back(NetworkBehaviourAction::GenerateEvent(event));
}
// Don't do anything for non-severe errors except report them.
NotifsHandlerOut::ProtocolError { is_severe, ref error } if !is_severe => {
debug!(target: "sub-libp2p", "Handler({:?}) => Benign protocol error: {:?}",
source, error)
}
NotifsHandlerOut::ProtocolError { error, .. } => {
debug!(target: "sub-libp2p",
"Handler({:?}) => Severe protocol error: {:?}",
source, error);
// A severe protocol error happens when we detect a "bad" peer, such as a peer on
// a different chain, or a peer that doesn't speak the same protocol(s). We
// decrease the peer's reputation, hence lowering the chances we try this peer
// again in the short term.
self.peerset.report_peer(
source.clone(),
sc_peerset::ReputationChange::new(i32::min_value(), "Protocol error")
);
self.disconnect_peer_inner(&source, Some(Duration::from_secs(5)));
}
}
}
@@ -53,8 +53,8 @@ use crate::protocol::generic_proto::{
};
use bytes::BytesMut;
use libp2p::core::{either::{EitherError, EitherOutput}, ConnectedPoint, PeerId};
use libp2p::core::upgrade::{EitherUpgrade, UpgradeError, SelectUpgrade, InboundUpgrade, OutboundUpgrade};
use libp2p::core::{either::EitherOutput, ConnectedPoint, PeerId};
use libp2p::core::upgrade::{UpgradeError, SelectUpgrade, InboundUpgrade, OutboundUpgrade};
use libp2p::swarm::{
ProtocolsHandler, ProtocolsHandlerEvent,
IntoProtocolsHandler,
@@ -70,7 +70,7 @@ use futures::{
};
use log::{debug, error};
use parking_lot::{Mutex, RwLock};
use std::{borrow::Cow, error, io, str, sync::Arc, task::{Context, Poll}};
use std::{borrow::Cow, str, sync::Arc, task::{Context, Poll}};
/// Number of pending notifications in asynchronous contexts.
/// See [`NotificationsSink::reserve_notification`] for context.
@@ -230,14 +230,6 @@ pub enum NotifsHandlerOut {
/// Message that has been received.
message: BytesMut,
},
/// An error has happened on the protocol level with this node.
ProtocolError {
/// If true the error is severe, such as a protocol violation.
is_severe: bool,
/// The error that happened.
error: Box<dyn error::Error + Send + Sync>,
},
}
/// Sink connected directly to the node background task. Allows sending notifications to the peer.
@@ -401,9 +393,9 @@ impl ProtocolsHandler for NotifsHandler {
type OutEvent = NotifsHandlerOut;
type Error = NotifsHandlerError;
type InboundProtocol = SelectUpgrade<UpgradeCollec<NotificationsIn>, RegisteredProtocol>;
type OutboundProtocol = EitherUpgrade<NotificationsOut, RegisteredProtocol>;
// Index within the `out_handlers`; None for legacy
type OutboundOpenInfo = Option<usize>;
type OutboundProtocol = NotificationsOut;
// Index within the `out_handlers`
type OutboundOpenInfo = usize;
type InboundOpenInfo = ();
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, ()> {
@@ -433,13 +425,7 @@ impl ProtocolsHandler for NotifsHandler {
out: <Self::OutboundProtocol as OutboundUpgrade<NegotiatedSubstream>>::Output,
num: Self::OutboundOpenInfo
) {
match (out, num) {
(EitherOutput::First(out), Some(num)) =>
self.out_handlers[num].0.inject_fully_negotiated_outbound(out, ()),
(EitherOutput::Second(out), None) =>
self.legacy.inject_fully_negotiated_outbound(out, ()),
_ => error!("inject_fully_negotiated_outbound called with wrong parameters"),
}
self.out_handlers[num].0.inject_fully_negotiated_outbound(out, ())
}
fn inject_event(&mut self, message: NotifsHandlerIn) {
@@ -488,45 +474,30 @@ impl ProtocolsHandler for NotifsHandler {
fn inject_dial_upgrade_error(
&mut self,
num: Option<usize>,
err: ProtocolsHandlerUpgrErr<EitherError<NotificationsHandshakeError, io::Error>>
num: usize,
err: ProtocolsHandlerUpgrErr<NotificationsHandshakeError>
) {
match (err, num) {
(ProtocolsHandlerUpgrErr::Timeout, Some(num)) =>
match err {
ProtocolsHandlerUpgrErr::Timeout =>
self.out_handlers[num].0.inject_dial_upgrade_error(
(),
ProtocolsHandlerUpgrErr::Timeout
),
(ProtocolsHandlerUpgrErr::Timeout, None) =>
self.legacy.inject_dial_upgrade_error((), ProtocolsHandlerUpgrErr::Timeout),
(ProtocolsHandlerUpgrErr::Timer, Some(num)) =>
ProtocolsHandlerUpgrErr::Timer =>
self.out_handlers[num].0.inject_dial_upgrade_error(
(),
ProtocolsHandlerUpgrErr::Timer
),
(ProtocolsHandlerUpgrErr::Timer, None) =>
self.legacy.inject_dial_upgrade_error((), ProtocolsHandlerUpgrErr::Timer),
(ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(err)), Some(num)) =>
ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(err)) =>
self.out_handlers[num].0.inject_dial_upgrade_error(
(),
ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(err))
),
(ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(err)), None) =>
self.legacy.inject_dial_upgrade_error(
(),
ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(err))
),
(ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(EitherError::A(err))), Some(num)) =>
ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(err)) =>
self.out_handlers[num].0.inject_dial_upgrade_error(
(),
ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(err))
),
(ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(EitherError::B(err))), None) =>
self.legacy.inject_dial_upgrade_error(
(),
ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(err))
),
_ => error!("inject_dial_upgrade_error called with bad parameters"),
}
}
@@ -632,12 +603,8 @@ impl ProtocolsHandler for NotifsHandler {
if self.pending_handshake.is_none() {
while let Poll::Ready(ev) = self.legacy.poll(cx) {
match ev {
ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol } =>
return Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest {
protocol: protocol
.map_upgrade(EitherUpgrade::B)
.map_info(|()| None)
}),
ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol, .. } =>
match *protocol.info() {},
ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::CustomProtocolOpen {
received_handshake,
..
@@ -663,10 +630,6 @@ impl ProtocolsHandler for NotifsHandler {
NotifsHandlerOut::CustomMessage { message }
))
},
ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::ProtocolError { is_severe, error }) =>
return Poll::Ready(ProtocolsHandlerEvent::Custom(
NotifsHandlerOut::ProtocolError { is_severe, error }
)),
ProtocolsHandlerEvent::Close(err) =>
return Poll::Ready(ProtocolsHandlerEvent::Close(NotifsHandlerError::Legacy(err))),
}
@@ -723,8 +686,7 @@ impl ProtocolsHandler for NotifsHandler {
ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol } =>
return Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest {
protocol: protocol
.map_upgrade(EitherUpgrade::A)
.map_info(|()| Some(handler_num))
.map_info(|()| handler_num),
}),
ProtocolsHandlerEvent::Close(err) => void::unreachable(err),
@@ -30,8 +30,8 @@ use libp2p::swarm::{
};
use log::{debug, error};
use smallvec::{smallvec, SmallVec};
use std::{borrow::Cow, collections::VecDeque, error, fmt, io, mem, time::Duration};
use std::{pin::Pin, task::{Context, Poll}};
use std::{borrow::Cow, collections::VecDeque, convert::Infallible, error, fmt, io, mem};
use std::{pin::Pin, task::{Context, Poll}, time::Duration};
/// Implements the `IntoProtocolsHandler` trait of libp2p.
///
@@ -108,10 +108,9 @@ impl IntoProtocolsHandler for LegacyProtoHandlerProto {
self.protocol.clone()
}
fn into_handler(self, remote_peer_id: &PeerId, connected_point: &ConnectedPoint) -> Self::Handler {
fn into_handler(self, remote_peer_id: &PeerId, _: &ConnectedPoint) -> Self::Handler {
LegacyProtoHandler {
protocol: self.protocol,
endpoint: connected_point.clone(),
remote_peer_id: remote_peer_id.clone(),
state: ProtocolState::Init {
substreams: SmallVec::new(),
@@ -134,15 +133,13 @@ pub struct LegacyProtoHandler {
/// any influence on the behaviour.
remote_peer_id: PeerId,
/// Whether we are the connection dialer or listener. Used to determine who, between the local
/// node and the remote node, has priority.
endpoint: ConnectedPoint,
/// Queue of events to send to the outside.
///
/// This queue must only ever be modified to insert elements at the back, or remove the first
/// element.
events_queue: VecDeque<ProtocolsHandlerEvent<RegisteredProtocol, (), LegacyProtoHandlerOut, ConnectionKillError>>,
events_queue: VecDeque<
ProtocolsHandlerEvent<RegisteredProtocol, Infallible, LegacyProtoHandlerOut, ConnectionKillError>
>,
}
/// State of the handler.
@@ -156,12 +153,9 @@ enum ProtocolState {
init_deadline: Delay,
},
/// Handler is opening a substream in order to activate itself.
/// Handler is ready to accept incoming substreams.
/// If we are in this state, we haven't sent any `CustomProtocolOpen` yet.
Opening {
/// Deadline after which the opening is abnormally long.
deadline: Delay,
},
Opening,
/// Normal operating mode. Contains the substreams that are open.
/// If we are in this state, we have sent a `CustomProtocolOpen` message to the outside.
@@ -229,14 +223,6 @@ pub enum LegacyProtoHandlerOut {
/// Message that has been received.
message: BytesMut,
},
/// An error has happened on the protocol level with this node.
ProtocolError {
/// If true the error is severe, such as a protocol violation.
is_severe: bool,
/// The error that happened.
error: Box<dyn error::Error + Send + Sync>,
},
}
impl LegacyProtoHandler {
@@ -251,14 +237,7 @@ impl LegacyProtoHandler {
ProtocolState::Init { substreams: mut incoming, .. } => {
if incoming.is_empty() {
if let ConnectedPoint::Dialer { .. } = self.endpoint {
self.events_queue.push_back(ProtocolsHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(self.protocol.clone(), ()),
});
}
ProtocolState::Opening {
deadline: Delay::new(Duration::from_secs(60))
}
ProtocolState::Opening
} else {
let event = LegacyProtoHandlerOut::CustomProtocolOpen {
version: incoming[0].0.protocol_version(),
@@ -316,7 +295,7 @@ impl LegacyProtoHandler {
/// Polls the state for events. Optionally returns an event to produce.
#[must_use]
fn poll_state(&mut self, cx: &mut Context)
-> Option<ProtocolsHandlerEvent<RegisteredProtocol, (), LegacyProtoHandlerOut, ConnectionKillError>> {
-> Option<ProtocolsHandlerEvent<RegisteredProtocol, Infallible, LegacyProtoHandlerOut, ConnectionKillError>> {
match mem::replace(&mut self.state, ProtocolState::Poisoned) {
ProtocolState::Poisoned => {
error!(target: "sub-libp2p", "Handler with {:?} is in poisoned state",
@@ -340,21 +319,9 @@ impl LegacyProtoHandler {
None
}
ProtocolState::Opening { mut deadline } => {
match Pin::new(&mut deadline).poll(cx) {
Poll::Ready(()) => {
let event = LegacyProtoHandlerOut::ProtocolError {
is_severe: true,
error: "Timeout when opening protocol".to_string().into(),
};
self.state = ProtocolState::KillAsap;
Some(ProtocolsHandlerEvent::Custom(event))
},
Poll::Pending => {
self.state = ProtocolState::Opening { deadline };
None
},
}
ProtocolState::Opening => {
self.state = ProtocolState::Opening;
None
}
ProtocolState::Normal { mut substreams, mut shutdown } => {
@@ -423,27 +390,35 @@ impl LegacyProtoHandler {
// If `reenable` is `true`, that means we should open the substreams system again
// after all the substreams are closed.
if reenable && shutdown.is_empty() {
self.state = ProtocolState::Opening {
deadline: Delay::new(Duration::from_secs(60))
};
Some(ProtocolsHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(self.protocol.clone(), ()),
})
self.state = ProtocolState::Opening;
} else {
self.state = ProtocolState::Disabled { shutdown, reenable };
None
}
None
}
ProtocolState::KillAsap => None,
}
}
}
/// Called by `inject_fully_negotiated_inbound` and `inject_fully_negotiated_outbound`.
fn inject_fully_negotiated(
impl ProtocolsHandler for LegacyProtoHandler {
type InEvent = LegacyProtoHandlerIn;
type OutEvent = LegacyProtoHandlerOut;
type Error = ConnectionKillError;
type InboundProtocol = RegisteredProtocol;
type OutboundProtocol = RegisteredProtocol;
type OutboundOpenInfo = Infallible;
type InboundOpenInfo = ();
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, ()> {
SubstreamProtocol::new(self.protocol.clone(), ())
}
fn inject_fully_negotiated_inbound(
&mut self,
mut substream: RegisteredProtocolSubstream<NegotiatedSubstream>,
received_handshake: Vec<u8>,
(mut substream, received_handshake): <Self::InboundProtocol as InboundUpgrade<NegotiatedSubstream>>::Output,
(): ()
) {
self.state = match mem::replace(&mut self.state, ProtocolState::Poisoned) {
ProtocolState::Poisoned => {
@@ -487,35 +462,13 @@ impl LegacyProtoHandler {
ProtocolState::KillAsap => ProtocolState::KillAsap,
};
}
}
impl ProtocolsHandler for LegacyProtoHandler {
type InEvent = LegacyProtoHandlerIn;
type OutEvent = LegacyProtoHandlerOut;
type Error = ConnectionKillError;
type InboundProtocol = RegisteredProtocol;
type OutboundProtocol = RegisteredProtocol;
type OutboundOpenInfo = ();
type InboundOpenInfo = ();
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, ()> {
SubstreamProtocol::new(self.protocol.clone(), ())
}
fn inject_fully_negotiated_inbound(
&mut self,
(substream, handshake): <Self::InboundProtocol as InboundUpgrade<NegotiatedSubstream>>::Output,
(): ()
) {
self.inject_fully_negotiated(substream, handshake);
}
fn inject_fully_negotiated_outbound(
&mut self,
(substream, handshake): <Self::OutboundProtocol as OutboundUpgrade<NegotiatedSubstream>>::Output,
_: Self::OutboundOpenInfo
_: <Self::OutboundProtocol as OutboundUpgrade<NegotiatedSubstream>>::Output,
unreachable: Self::OutboundOpenInfo
) {
self.inject_fully_negotiated(substream, handshake);
match unreachable {}
}
fn inject_event(&mut self, message: LegacyProtoHandlerIn) {
@@ -525,24 +478,19 @@ impl ProtocolsHandler for LegacyProtoHandler {
}
}
fn inject_dial_upgrade_error(&mut self, _: (), err: ProtocolsHandlerUpgrErr<io::Error>) {
let is_severe = match err {
ProtocolsHandlerUpgrErr::Upgrade(_) => true,
_ => false,
};
self.events_queue.push_back(ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::ProtocolError {
is_severe,
error: Box::new(err),
}));
fn inject_dial_upgrade_error(
&mut self,
unreachable: Self::OutboundOpenInfo,
_: ProtocolsHandlerUpgrErr<io::Error>
) {
match unreachable {}
}
fn connection_keep_alive(&self) -> KeepAlive {
match self.state {
ProtocolState::Init { .. } | ProtocolState::Opening { .. } |
ProtocolState::Normal { .. } => KeepAlive::Yes,
ProtocolState::Disabled { .. } | ProtocolState::Poisoned |
ProtocolState::KillAsap => KeepAlive::No,
ProtocolState::Init { .. } | ProtocolState::Normal { .. } => KeepAlive::Yes,
ProtocolState::Opening { .. } | ProtocolState::Disabled { .. } |
ProtocolState::Poisoned | ProtocolState::KillAsap => KeepAlive::No,
}
}