Don't close inbound notifications substreams immediately (#6781)

* Don't close inbound notifications substreams immediately

* Fix not closing in return to node A closing
This commit is contained in:
Pierre Krieger
2020-07-31 16:42:53 +02:00
committed by GitHub
parent 9ca2300b05
commit a1786a92ec
3 changed files with 46 additions and 16 deletions
@@ -163,11 +163,9 @@ impl ProtocolsHandler for NotifsInHandler {
}
// Note that we drop the existing substream, which will send an equivalent to a TCP "RST"
// to the remote and force-close the substream. It might seem like an unclean way to get
// to the remote and force-close the substream. It might seem like an unclean way to get
// rid of a substream. However, keep in mind that it is invalid for the remote to open
// multiple such substreams, and therefore sending a "RST" is the correct thing to do.
// Also note that we have already closed our writing side during the initial handshake,
// and we can't close "more" than that anyway.
// multiple such substreams, and therefore sending a "RST" is not an incorrect thing to do.
self.substream = Some(proto);
self.events_queue.push_back(ProtocolsHandlerEvent::Custom(NotifsInHandlerOut::OpenRequest(msg)));
@@ -22,12 +22,13 @@
/// higher-level logic. This message is prefixed with a variable-length integer message length.
/// This message can be empty, in which case `0` is sent.
/// - If node B accepts the substream, it sends back a message with the same properties.
/// Afterwards, the sending side of B is closed.
/// - If instead B refuses the connection (which typically happens because no empty slot is
/// available), then it immediately closes the substream without sending back anything.
/// - Node A can then send notifications to B, prefixed with a variable-length integer indicating
/// the length of the message.
/// - Node A closes its writing side if it doesn't want the notifications substream anymore.
/// - Either node A or node B can signal that it doesn't want this notifications substream anymore
/// by closing its writing side. The other party should respond by also closing their own
/// writing side soon after.
///
/// Notification substreams are unidirectional. If A opens a substream with B, then B is
/// encouraged but not required to open a substream to A as well.
@@ -80,9 +81,13 @@ enum NotificationsInSubstreamHandshake {
/// User gave us the handshake message. Trying to push it in the socket.
PendingSend(Vec<u8>),
/// Handshake message was pushed in the socket. Still need to flush.
Close,
/// Handshake message successfully sent.
Flush,
/// Handshake message successfully sent and flushed.
Sent,
/// Remote has closed their writing side. We close our own writing side in return.
ClosingInResponseToRemote,
/// Both our side and the remote have closed their writing side.
BothSidesClosed,
}
/// A substream for outgoing notification messages.
@@ -177,8 +182,6 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin,
// This `Stream` implementation first tries to send back the handshake if necessary.
loop {
match mem::replace(this.handshake, NotificationsInSubstreamHandshake::Sent) {
NotificationsInSubstreamHandshake::Sent =>
return Stream::poll_next(this.socket.as_mut(), cx),
NotificationsInSubstreamHandshake::NotSent => {
*this.handshake = NotificationsInSubstreamHandshake::NotSent;
return Poll::Pending
@@ -186,7 +189,7 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin,
NotificationsInSubstreamHandshake::PendingSend(msg) =>
match Sink::poll_ready(this.socket.as_mut(), cx) {
Poll::Ready(_) => {
*this.handshake = NotificationsInSubstreamHandshake::Close;
*this.handshake = NotificationsInSubstreamHandshake::Flush;
match Sink::start_send(this.socket.as_mut(), io::Cursor::new(msg)) {
Ok(()) => {},
Err(err) => return Poll::Ready(Some(Err(err))),
@@ -197,15 +200,43 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin,
return Poll::Pending
}
},
NotificationsInSubstreamHandshake::Close =>
match Sink::poll_close(this.socket.as_mut(), cx)? {
NotificationsInSubstreamHandshake::Flush =>
match Sink::poll_flush(this.socket.as_mut(), cx)? {
Poll::Ready(()) =>
*this.handshake = NotificationsInSubstreamHandshake::Sent,
Poll::Pending => {
*this.handshake = NotificationsInSubstreamHandshake::Close;
*this.handshake = NotificationsInSubstreamHandshake::Flush;
return Poll::Pending
}
},
NotificationsInSubstreamHandshake::Sent => {
match Stream::poll_next(this.socket.as_mut(), cx) {
Poll::Ready(None) => *this.handshake =
NotificationsInSubstreamHandshake::ClosingInResponseToRemote,
Poll::Ready(Some(msg)) => {
*this.handshake = NotificationsInSubstreamHandshake::Sent;
return Poll::Ready(Some(msg))
},
Poll::Pending => {
*this.handshake = NotificationsInSubstreamHandshake::Sent;
return Poll::Pending
},
}
},
NotificationsInSubstreamHandshake::ClosingInResponseToRemote =>
match Sink::poll_close(this.socket.as_mut(), cx)? {
Poll::Ready(()) =>
*this.handshake = NotificationsInSubstreamHandshake::BothSidesClosed,
Poll::Pending => {
*this.handshake = NotificationsInSubstreamHandshake::ClosingInResponseToRemote;
return Poll::Pending
}
},
NotificationsInSubstreamHandshake::BothSidesClosed =>
return Poll::Ready(None),
}
}
}