Fix tried to send handshake twice (#5413)

* Fix tried to send handshake twice

* Fix wrong boolean

* Change to debug
This commit is contained in:
Pierre Krieger
2020-03-30 13:33:46 +02:00
committed by GitHub
parent e17a23e907
commit dbba4f8929
3 changed files with 34 additions and 18 deletions
@@ -63,7 +63,8 @@ use libp2p::swarm::{
SubstreamProtocol,
NegotiatedSubstream,
};
use log::error;
use log::{debug, error};
use sp_runtime::ConsensusEngineId;
use std::{borrow::Cow, error, io, str, task::{Context, Poll}};
/// Implements the `IntoProtocolsHandler` trait of libp2p.
@@ -289,6 +290,9 @@ impl ProtocolsHandler for NotifsHandler {
fn inject_event(&mut self, message: NotifsHandlerIn) {
match message {
NotifsHandlerIn::Enable => {
if let EnabledState::Enabled = self.enabled {
debug!("enabling already-enabled handler");
}
self.enabled = EnabledState::Enabled;
self.legacy.inject_event(LegacyProtoHandlerIn::Enable);
for handler in &mut self.out_handlers {
@@ -301,6 +305,9 @@ impl ProtocolsHandler for NotifsHandler {
}
},
NotifsHandlerIn::Disable => {
if let EnabledState::Disabled = self.enabled {
debug!("disabling already-disabled handler");
}
self.legacy.inject_event(LegacyProtoHandlerIn::Disable);
// The notifications protocols start in the disabled state. If we were in the
// "Initial" state, then we shouldn't disable the notifications protocols again.
@@ -36,7 +36,7 @@ use libp2p::swarm::{
};
use log::{error, warn};
use smallvec::SmallVec;
use std::{borrow::Cow, fmt, pin::Pin, str, task::{Context, Poll}};
use std::{borrow::Cow, fmt, pin::Pin, task::{Context, Poll}};
/// Implements the `IntoProtocolsHandler` trait of libp2p.
///
@@ -156,16 +156,19 @@ impl ProtocolsHandler for NotifsInHandler {
&mut self,
(msg, proto): <Self::InboundProtocol as InboundUpgrade<NegotiatedSubstream>>::Output
) {
// If a substream already exists, we drop it and replace it with the new incoming one.
if self.substream.is_some() {
warn!(
target: "sub-libp2p",
"Received duplicate inbound notifications substream for {:?}",
str::from_utf8(self.in_protocol.protocol_name()),
);
return;
self.events_queue.push(ProtocolsHandlerEvent::Custom(NotifsInHandlerOut::Closed));
}
// Note that we drop the existing substream, which will send an equivalent to a TCP "RST"
// to the remote and force-close the substream. It might seem like an unclean way to get
// rid of a substream. However, keep in mind that it is invalid for the remote to open
// multiple such substreams, and therefore sending a "RST" is the correct thing to do.
// Also note that we have already closed our writing side during the initial handshake,
// and we can't close "more" than that anyway.
self.substream = Some(proto);
self.events_queue.push(ProtocolsHandlerEvent::Custom(NotifsInHandlerOut::OpenRequest(msg)));
self.pending_accept_refuses = self.pending_accept_refuses
.checked_add(1)
@@ -235,8 +238,15 @@ impl ProtocolsHandler for NotifsInHandler {
match self.substream.as_mut().map(|s| Stream::poll_next(Pin::new(s), cx)) {
None | Some(Poll::Pending) => {},
Some(Poll::Ready(Some(Ok(msg)))) =>
return Poll::Ready(ProtocolsHandlerEvent::Custom(NotifsInHandlerOut::Notif(msg))),
Some(Poll::Ready(Some(Ok(msg)))) => {
if self.pending_accept_refuses != 0 {
warn!(
target: "sub-libp2p",
"Bad state in inbound-only handler: notif before accepting substream"
);
}
return Poll::Ready(ProtocolsHandlerEvent::Custom(NotifsInHandlerOut::Notif(msg)))
},
Some(Poll::Ready(None)) | Some(Poll::Ready(Some(Err(_)))) => {
self.substream = None;
return Poll::Ready(ProtocolsHandlerEvent::Custom(NotifsInHandlerOut::Closed));
@@ -164,12 +164,9 @@ where TSubstream: AsyncRead + AsyncWrite,
{
/// Sends the handshake in order to inform the remote that we accept the substream.
pub fn send_handshake(&mut self, message: impl Into<Vec<u8>>) {
match self.handshake {
NotificationsInSubstreamHandshake::NotSent => {}
_ => {
error!(target: "sub-libp2p", "Tried to send handshake twice");
return;
}
if !matches!(self.handshake, NotificationsInSubstreamHandshake::NotSent) {
error!(target: "sub-libp2p", "Tried to send handshake twice");
return;
}
self.handshake = NotificationsInSubstreamHandshake::PendingSend(message.into());
@@ -189,8 +186,10 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin,
match mem::replace(this.handshake, NotificationsInSubstreamHandshake::Sent) {
NotificationsInSubstreamHandshake::Sent =>
return Stream::poll_next(this.socket.as_mut(), cx),
NotificationsInSubstreamHandshake::NotSent =>
return Poll::Pending,
NotificationsInSubstreamHandshake::NotSent => {
*this.handshake = NotificationsInSubstreamHandshake::NotSent;
return Poll::Pending
},
NotificationsInSubstreamHandshake::PendingSend(msg) =>
match Sink::poll_ready(this.socket.as_mut(), cx) {
Poll::Ready(_) => {