Ensure that handshake is sent back even in case of back-pressure (#6979)

* Ensure that handshake is sent back even in case of back-pressure

* Update client/network/src/protocol/generic_proto/handler/group.rs

Co-authored-by: Max Inden <mail@max-inden.de>

* Also process OpenRequest and Closed

* Fix bad merge

* God I'm so lost with all these merges

* Immediately return Closed

Co-authored-by: Max Inden <mail@max-inden.de>
This commit is contained in:
Pierre Krieger
2020-09-02 16:30:41 +02:00
committed by GitHub
parent aa5a10802c
commit 2f9e2577c1
3 changed files with 110 additions and 29 deletions
@@ -674,36 +674,48 @@ impl ProtocolsHandler for NotifsHandler {
return Poll::Ready(ProtocolsHandlerEvent::Close(NotifsHandlerError::Legacy(err))),
}
}
}
for (handler_num, (handler, handshake_message)) in self.in_handlers.iter_mut().enumerate() {
while let Poll::Ready(ev) = handler.poll(cx) {
match ev {
ProtocolsHandlerEvent::OutboundSubstreamRequest { .. } =>
error!("Incoming substream handler tried to open a substream"),
ProtocolsHandlerEvent::Close(err) => void::unreachable(err),
ProtocolsHandlerEvent::Custom(NotifsInHandlerOut::OpenRequest(_)) =>
match self.enabled {
EnabledState::Initial => self.pending_in.push(handler_num),
EnabledState::Enabled => {
// We create `handshake_message` on a separate line to be sure
// that the lock is released as soon as possible.
let handshake_message = handshake_message.read().clone();
handler.inject_event(NotifsInHandlerIn::Accept(handshake_message))
},
EnabledState::Disabled =>
handler.inject_event(NotifsInHandlerIn::Refuse),
for (handler_num, (handler, handshake_message)) in self.in_handlers.iter_mut().enumerate() {
loop {
let poll = if self.pending_legacy_handshake.is_none() {
handler.poll(cx)
} else {
handler.poll_process(cx)
};
let ev = match poll {
Poll::Ready(e) => e,
Poll::Pending => break,
};
match ev {
ProtocolsHandlerEvent::OutboundSubstreamRequest { .. } =>
error!("Incoming substream handler tried to open a substream"),
ProtocolsHandlerEvent::Close(err) => void::unreachable(err),
ProtocolsHandlerEvent::Custom(NotifsInHandlerOut::OpenRequest(_)) =>
match self.enabled {
EnabledState::Initial => self.pending_in.push(handler_num),
EnabledState::Enabled => {
// We create `handshake_message` on a separate line to be sure
// that the lock is released as soon as possible.
let handshake_message = handshake_message.read().clone();
handler.inject_event(NotifsInHandlerIn::Accept(handshake_message))
},
ProtocolsHandlerEvent::Custom(NotifsInHandlerOut::Closed) => {},
ProtocolsHandlerEvent::Custom(NotifsInHandlerOut::Notif(message)) => {
if self.notifications_sink_rx.is_some() {
let msg = NotifsHandlerOut::Notification {
message,
protocol_name: handler.protocol_name().clone(),
};
return Poll::Ready(ProtocolsHandlerEvent::Custom(msg));
}
EnabledState::Disabled =>
handler.inject_event(NotifsInHandlerIn::Refuse),
},
}
ProtocolsHandlerEvent::Custom(NotifsInHandlerOut::Closed) => {},
ProtocolsHandlerEvent::Custom(NotifsInHandlerOut::Notif(message)) => {
debug_assert!(self.pending_legacy_handshake.is_none());
if self.notifications_sink_rx.is_some() {
let msg = NotifsHandlerOut::Notification {
message,
protocol_name: handler.protocol_name().clone(),
};
return Poll::Ready(ProtocolsHandlerEvent::Custom(msg));
}
},
}
}
}
@@ -139,6 +139,33 @@ impl NotifsInHandler {
pub fn protocol_name(&self) -> &Cow<'static, str> {
self.in_protocol.protocol_name()
}
/// Equivalent to the `poll` method of `ProtocolsHandler`, except that it is guaranteed to
/// never generate [`NotifsInHandlerOut::Notif`].
///
/// Use this method in situations where it is not desirable to receive events but still
/// necessary to drive any potential incoming handshake or request.
pub fn poll_process(
&mut self,
cx: &mut Context
) -> Poll<
ProtocolsHandlerEvent<DeniedUpgrade, (), NotifsInHandlerOut, void::Void>
> {
if let Some(event) = self.events_queue.pop_front() {
return Poll::Ready(event)
}
match self.substream.as_mut().map(|s| NotificationsInSubstream::poll_process(Pin::new(s), cx)) {
None | Some(Poll::Pending) => {},
Some(Poll::Ready(Ok(v))) => match v {},
Some(Poll::Ready(Err(_))) => {
self.substream = None;
return Poll::Ready(ProtocolsHandlerEvent::Custom(NotifsInHandlerOut::Closed));
},
}
Poll::Pending
}
}
impl ProtocolsHandler for NotifsInHandler {
@@ -39,7 +39,7 @@ use futures::prelude::*;
use futures_codec::Framed;
use libp2p::core::{UpgradeInfo, InboundUpgrade, OutboundUpgrade, upgrade};
use log::error;
use std::{borrow::Cow, io, iter, mem, pin::Pin, task::{Context, Poll}};
use std::{borrow::Cow, convert::Infallible, io, iter, mem, pin::Pin, task::{Context, Poll}};
use unsigned_varint::codec::UviBytes;
/// Maximum allowed size of the two handshake messages, in bytes.
@@ -162,7 +162,7 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + 'static,
}
impl<TSubstream> NotificationsInSubstream<TSubstream>
where TSubstream: AsyncRead + AsyncWrite,
where TSubstream: AsyncRead + AsyncWrite + Unpin,
{
/// Sends the handshake in order to inform the remote that we accept the substream.
pub fn send_handshake(&mut self, message: impl Into<Vec<u8>>) {
@@ -173,6 +173,48 @@ where TSubstream: AsyncRead + AsyncWrite,
self.handshake = NotificationsInSubstreamHandshake::PendingSend(message.into());
}
/// Equivalent to `Stream::poll_next`, except that it only drives the handshake and is
/// guaranteed to not generate any notification.
pub fn poll_process(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<Infallible, io::Error>> {
let mut this = self.project();
loop {
match mem::replace(this.handshake, NotificationsInSubstreamHandshake::Sent) {
NotificationsInSubstreamHandshake::PendingSend(msg) =>
match Sink::poll_ready(this.socket.as_mut(), cx) {
Poll::Ready(_) => {
*this.handshake = NotificationsInSubstreamHandshake::Flush;
match Sink::start_send(this.socket.as_mut(), io::Cursor::new(msg)) {
Ok(()) => {},
Err(err) => return Poll::Ready(Err(err)),
}
},
Poll::Pending => {
*this.handshake = NotificationsInSubstreamHandshake::PendingSend(msg);
return Poll::Pending
}
},
NotificationsInSubstreamHandshake::Flush =>
match Sink::poll_flush(this.socket.as_mut(), cx)? {
Poll::Ready(()) =>
*this.handshake = NotificationsInSubstreamHandshake::Sent,
Poll::Pending => {
*this.handshake = NotificationsInSubstreamHandshake::Flush;
return Poll::Pending
}
},
st @ NotificationsInSubstreamHandshake::NotSent |
st @ NotificationsInSubstreamHandshake::Sent |
st @ NotificationsInSubstreamHandshake::ClosingInResponseToRemote |
st @ NotificationsInSubstreamHandshake::BothSidesClosed => {
*this.handshake = st;
return Poll::Pending;
}
}
}
}
}
impl<TSubstream> Stream for NotificationsInSubstream<TSubstream>