Revert "Properly close notification substreams (#8534)" (#8646)

This reverts commit 6c9c687a31.

Co-authored-by: André Silva <andrerfosilva@gmail.com>
This commit is contained in:
Pierre Krieger
2021-04-20 20:39:07 +02:00
committed by GitHub
parent 86c5dc848f
commit 54ae55f439
2 changed files with 65 additions and 300 deletions
@@ -159,16 +159,6 @@ enum State {
Closed {
/// True if an outgoing substream is still in the process of being opened.
pending_opening: bool,
/// Outbound substream that has been accepted by the remote. Being closed.
out_substream_closing: Option<NotificationsOutSubstream<NegotiatedSubstream>>,
/// Substream opened by the remote. Being closed.
in_substream_closing: Option<NotificationsInSubstream<NegotiatedSubstream>>,
/// Substream re-opened by the remote. Not to be closed after `in_substream_closing` has
/// been closed.
in_substream_reopened: Option<NotificationsInSubstream<NegotiatedSubstream>>,
},
/// Protocol is in the "Closed" state. A [`NotifsHandlerOut::OpenDesiredByRemote`] has been
@@ -177,9 +167,6 @@ enum State {
/// Substream opened by the remote and that hasn't been accepted/rejected yet.
in_substream: NotificationsInSubstream<NegotiatedSubstream>,
/// Outbound substream that has been accepted by the remote. Being closed.
out_substream_closing: Option<NotificationsOutSubstream<NegotiatedSubstream>>,
/// See [`State::Closed::pending_opening`].
pending_opening: bool,
},
@@ -190,15 +177,8 @@ enum State {
/// A [`NotifsHandlerOut::OpenResultOk`] or a [`NotifsHandlerOut::OpenResultErr`] event must
/// be emitted when transitionning to respectively [`State::Open`] or [`State::Closed`].
Opening {
/// Outbound substream that has been accepted by the remote. Being closed. An outbound
/// substream request has been emitted towards libp2p if and only if this field is `None`.
out_substream_closing: Option<NotificationsOutSubstream<NegotiatedSubstream>>,
/// Substream re-opened by the remote. Has been accepted.
in_substream_reopened: Option<NotificationsInSubstream<NegotiatedSubstream>>,
/// Substream opened by the remote. Being closed.
in_substream_closing: Option<NotificationsInSubstream<NegotiatedSubstream>>,
/// Substream opened by the remote. If `Some`, has been accepted.
in_substream: Option<NotificationsInSubstream<NegotiatedSubstream>>,
},
/// Protocol is in the "Open" state.
@@ -247,9 +227,6 @@ impl IntoProtocolsHandler for NotifsHandlerProto {
handshake,
state: State::Closed {
pending_opening: false,
in_substream_closing: None,
in_substream_reopened: None,
out_substream_closing: None,
},
max_notification_size: max_size,
}
@@ -510,16 +487,7 @@ impl ProtocolsHandler for NotifsHandler {
) {
let mut protocol_info = &mut self.protocols[protocol_index];
match protocol_info.state {
State::Closed {
ref mut pending_opening,
ref mut out_substream_closing,
ref mut in_substream_closing,
ref mut in_substream_reopened
}
if in_substream_closing.is_none() && in_substream_reopened.is_none()
=> {
debug_assert!(!(out_substream_closing.is_some() && *pending_opening));
State::Closed { pending_opening } => {
self.events_queue.push_back(ProtocolsHandlerEvent::Custom(
NotifsHandlerOut::OpenDesiredByRemote {
protocol_index,
@@ -528,31 +496,9 @@ impl ProtocolsHandler for NotifsHandler {
protocol_info.state = State::OpenDesiredByRemote {
in_substream: new_substream,
out_substream_closing: out_substream_closing.take(),
pending_opening: *pending_opening,
pending_opening,
};
},
State::Opening { ref mut in_substream_closing, ref mut in_substream_reopened, .. } => {
*in_substream_closing = None;
// Create `handshake_message` on a separate line to be sure that the
// lock is released as soon as possible.
let handshake_message = protocol_info.handshake.read().clone();
new_substream.send_handshake(handshake_message);
*in_substream_reopened = Some(new_substream);
},
State::Open { ref mut in_substream, .. } if in_substream.is_none() => {
// Create `handshake_message` on a separate line to be sure that the
// lock is released as soon as possible.
let handshake_message = protocol_info.handshake.read().clone();
new_substream.send_handshake(handshake_message);
*in_substream = Some(new_substream);
},
State::Closed { .. } |
State::Open { .. } |
State::OpenDesiredByRemote { .. } => {
// If a substream already exists, silently drop the new one.
// Note that we drop the substream, which will send an equivalent to a
@@ -563,6 +509,19 @@ impl ProtocolsHandler for NotifsHandler {
// to do.
return;
},
State::Opening { ref mut in_substream, .. } |
State::Open { ref mut in_substream, .. } => {
if in_substream.is_some() {
// Same remark as above.
return;
}
// Create `handshake_message` on a separate line to be sure that the
// lock is released as soon as possible.
let handshake_message = protocol_info.handshake.read().clone();
new_substream.send_handshake(handshake_message);
*in_substream = Some(new_substream);
},
}
}
@@ -572,24 +531,16 @@ impl ProtocolsHandler for NotifsHandler {
protocol_index: Self::OutboundOpenInfo
) {
match self.protocols[protocol_index].state {
State::Closed { ref mut pending_opening, ref mut out_substream_closing, .. } |
State::OpenDesiredByRemote { ref mut pending_opening, ref mut out_substream_closing, .. } => {
debug_assert!(out_substream_closing.is_none());
State::Closed { ref mut pending_opening } |
State::OpenDesiredByRemote { ref mut pending_opening, .. } => {
debug_assert!(*pending_opening);
*out_substream_closing = Some(substream);
*pending_opening = false;
}
State::Open { .. } => {
error!(target: "sub-libp2p", "☎️ State mismatch in notifications handler");
debug_assert!(false);
}
State::Opening {
ref mut in_substream_reopened, ref mut in_substream_closing,
ref mut out_substream_closing
} => {
debug_assert!(out_substream_closing.is_none());
debug_assert!(!(in_substream_reopened.is_some() && in_substream_closing.is_some()));
State::Opening { ref mut in_substream } => {
let (async_tx, async_rx) = mpsc::channel(ASYNC_NOTIFICATIONS_BUFFER_SIZE);
let (sync_tx, sync_rx) = mpsc::channel(SYNC_NOTIFICATIONS_BUFFER_SIZE);
let notifications_sink = NotificationsSink {
@@ -603,7 +554,7 @@ impl ProtocolsHandler for NotifsHandler {
self.protocols[protocol_index].state = State::Open {
notifications_sink_rx: stream::select(async_rx.fuse(), sync_rx.fuse()).peekable(),
out_substream: Some(substream),
in_substream: in_substream_reopened.take().or(in_substream_closing.take()),
in_substream: in_substream.take(),
};
self.events_queue.push_back(ProtocolsHandlerEvent::Custom(
@@ -623,13 +574,8 @@ impl ProtocolsHandler for NotifsHandler {
NotifsHandlerIn::Open { protocol_index } => {
let protocol_info = &mut self.protocols[protocol_index];
match &mut protocol_info.state {
State::Closed {
ref mut pending_opening,
ref mut in_substream_closing,
ref mut in_substream_reopened,
ref mut out_substream_closing
} => {
if !*pending_opening && out_substream_closing.is_none() {
State::Closed { pending_opening } => {
if !*pending_opening {
let proto = NotificationsOut::new(
protocol_info.name.clone(),
protocol_info.handshake.read().clone(),
@@ -642,31 +588,14 @@ impl ProtocolsHandler for NotifsHandler {
});
}
debug_assert!(!(in_substream_reopened.is_some() && in_substream_closing.is_some()));
protocol_info.state = State::Opening {
in_substream_closing: in_substream_closing.take(),
in_substream_reopened: in_substream_reopened.take(),
out_substream_closing: out_substream_closing.take(),
in_substream: None,
};
},
State::OpenDesiredByRemote { .. } => {
// The state change is done in two steps because of borrowing issues.
let (pending_opening, out_substream_closing, mut in_substream) = match
mem::replace(&mut protocol_info.state,
State::Opening {
in_substream_closing: None, in_substream_reopened: None,
out_substream_closing: None,
})
{
State::OpenDesiredByRemote { pending_opening, out_substream_closing, in_substream, .. } =>
(pending_opening, out_substream_closing, in_substream),
_ => unreachable!()
};
State::OpenDesiredByRemote { pending_opening, in_substream } => {
let handshake_message = protocol_info.handshake.read().clone();
if !pending_opening && out_substream_closing.is_none() {
if !*pending_opening {
let proto = NotificationsOut::new(
protocol_info.name.clone(),
handshake_message.clone(),
@@ -681,13 +610,17 @@ impl ProtocolsHandler for NotifsHandler {
in_substream.send_handshake(handshake_message);
// The state change is done in two steps because of borrowing issues.
let in_substream = match
mem::replace(&mut protocol_info.state, State::Opening { in_substream: None })
{
State::OpenDesiredByRemote { in_substream, .. } => in_substream,
_ => unreachable!()
};
protocol_info.state = State::Opening {
out_substream_closing,
in_substream_closing: None,
in_substream_reopened: Some(in_substream),
in_substream: Some(in_substream),
};
},
State::Opening { .. } |
State::Open { .. } => {
// As documented, it is forbidden to send an `Open` while there is already
@@ -699,30 +632,15 @@ impl ProtocolsHandler for NotifsHandler {
},
NotifsHandlerIn::Close { protocol_index } => {
match &mut self.protocols[protocol_index].state {
State::Open { in_substream, out_substream, .. } => {
if let Some(in_substream) = in_substream.as_mut() {
in_substream.set_close_desired();
}
match self.protocols[protocol_index].state {
State::Open { .. } => {
self.protocols[protocol_index].state = State::Closed {
in_substream_closing: in_substream.take(),
in_substream_reopened: None,
out_substream_closing: out_substream.take(),
pending_opening: false,
};
},
State::Opening { in_substream_closing, in_substream_reopened, out_substream_closing } => {
debug_assert!(!(in_substream_reopened.is_some() && in_substream_closing.is_some()));
let pending_opening = out_substream_closing.is_none();
if let Some(in_substream_reopened) = in_substream_reopened.as_mut() {
in_substream_reopened.set_close_desired();
}
State::Opening { .. } => {
self.protocols[protocol_index].state = State::Closed {
in_substream_closing: in_substream_reopened.take().or(in_substream_closing.take()),
in_substream_reopened: None,
out_substream_closing: out_substream_closing.take(),
pending_opening,
pending_opening: true,
};
self.events_queue.push_back(ProtocolsHandlerEvent::Custom(
@@ -731,23 +649,8 @@ impl ProtocolsHandler for NotifsHandler {
}
));
},
State::OpenDesiredByRemote { .. } => {
let (mut in_substream, pending_opening, out_substream_closing) = match mem::replace(
&mut self.protocols[protocol_index].state,
State::Closed { pending_opening: false, in_substream_closing: None,
in_substream_reopened: None, out_substream_closing: None,
}
) {
State::OpenDesiredByRemote { in_substream, pending_opening, out_substream_closing } =>
(in_substream, pending_opening, out_substream_closing),
_ => unreachable!("Can only enter this branch after OpenDesiredByRemote; qed")
};
in_substream.set_close_desired();
State::OpenDesiredByRemote { pending_opening, .. } => {
self.protocols[protocol_index].state = State::Closed {
in_substream_closing: Some(in_substream),
in_substream_reopened: None,
out_substream_closing,
pending_opening,
};
}
@@ -769,30 +672,14 @@ impl ProtocolsHandler for NotifsHandler {
_: ProtocolsHandlerUpgrErr<NotificationsHandshakeError>
) {
match self.protocols[num].state {
State::Closed { ref mut pending_opening, ref mut out_substream_closing, .. } |
State::OpenDesiredByRemote { ref mut pending_opening, ref mut out_substream_closing, .. } => {
debug_assert!(out_substream_closing.is_none());
State::Closed { ref mut pending_opening } |
State::OpenDesiredByRemote { ref mut pending_opening, .. } => {
debug_assert!(*pending_opening);
*pending_opening = false;
}
State::Opening {
ref mut out_substream_closing,
ref mut in_substream_closing,
ref mut in_substream_reopened,
..
} => {
debug_assert!(!(in_substream_reopened.is_some() && in_substream_closing.is_some()));
debug_assert!(out_substream_closing.is_none());
if let Some(in_substream_reopened) = in_substream_reopened.as_mut() {
in_substream_reopened.set_close_desired();
}
State::Opening { .. } => {
self.protocols[num].state = State::Closed {
out_substream_closing: None,
in_substream_closing: in_substream_reopened.take().or(in_substream_closing.take()),
in_substream_reopened: None,
pending_opening: false,
};
@@ -901,67 +788,15 @@ impl ProtocolsHandler for NotifsHandler {
}
}
// Try close outbound substreams that are marked for closing.
for protocol_index in 0..self.protocols.len() {
match &mut self.protocols[protocol_index].state {
State::Closed { out_substream_closing: ref mut substream @ Some(_), .. } |
State::OpenDesiredByRemote { out_substream_closing: ref mut substream @ Some(_), .. } |
State::Opening { out_substream_closing: ref mut substream @ Some(_), .. } => {
match Sink::poll_close(Pin::new(substream.as_mut().unwrap()), cx) {
Poll::Pending => {},
Poll::Ready(_) => {
*substream = None;
if matches!(self.protocols[protocol_index].state, State::Opening { .. }) {
let protocol_info = &mut self.protocols[protocol_index];
let proto = NotificationsOut::new(
protocol_info.name.clone(),
protocol_info.handshake.read().clone(),
protocol_info.max_notification_size
);
self.events_queue.push_back(ProtocolsHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(proto, protocol_index)
.with_timeout(OPEN_TIMEOUT),
});
}
}
}
}
_ => {}
}
if let State::Closed {
pending_opening,
out_substream_closing: None,
in_substream_closing,
in_substream_reopened: ref mut in_substream_reopened @ Some(_),
..
} = &mut self.protocols[protocol_index].state {
debug_assert!(!*pending_opening);
debug_assert!(in_substream_closing.is_none());
self.events_queue.push_back(ProtocolsHandlerEvent::Custom(
NotifsHandlerOut::OpenDesiredByRemote {
protocol_index,
}
));
self.protocols[protocol_index].state = State::OpenDesiredByRemote {
in_substream: in_substream_reopened.take()
.expect("The if let above ensures that this is Some ; qed"),
out_substream_closing: None,
pending_opening: false,
};
}
}
// Poll inbound substreams.
for protocol_index in 0..self.protocols.len() {
// Inbound substreams being closed is always tolerated, except for the
// `OpenDesiredByRemote` state which might need to be switched back to `Closed`.
match &mut self.protocols[protocol_index].state {
State::Open { in_substream: None, .. } => {}
State::Closed { .. } |
State::Open { in_substream: None, .. } |
State::Opening { in_substream: None } => {}
State::Open { in_substream: in_substream @ Some(_), .. } => {
match Stream::poll_next(Pin::new(in_substream.as_mut().unwrap()), cx) {
Poll::Pending => {},
@@ -977,16 +812,13 @@ impl ProtocolsHandler for NotifsHandler {
}
}
State::OpenDesiredByRemote { in_substream, pending_opening, out_substream_closing } => {
State::OpenDesiredByRemote { in_substream, pending_opening } => {
match NotificationsInSubstream::poll_process(Pin::new(in_substream), cx) {
Poll::Pending => {},
Poll::Ready(Ok(void)) => match void {},
Poll::Ready(Err(_)) => {
self.protocols[protocol_index].state = State::Closed {
pending_opening: *pending_opening,
in_substream_closing: None,
in_substream_reopened: None,
out_substream_closing: out_substream_closing.take(),
};
return Poll::Ready(ProtocolsHandlerEvent::Custom(
NotifsHandlerOut::CloseDesired { protocol_index }
@@ -995,40 +827,13 @@ impl ProtocolsHandler for NotifsHandler {
}
}
State::Opening { in_substream_closing: None, in_substream_reopened: None, .. } |
State::Closed { in_substream_closing: None, in_substream_reopened: None, .. } => {}
State::Opening {
in_substream_closing: ref mut in_substream @ Some(_),
in_substream_reopened: None,
..
} |
State::Opening {
in_substream_closing: None,
in_substream_reopened: ref mut in_substream @ Some(_),
..
} |
State::Closed {
in_substream_closing: ref mut in_substream @ Some(_),
in_substream_reopened: None,
..
} |
State::Closed {
in_substream_closing: None,
in_substream_reopened: ref mut in_substream @ Some(_),
..
} => {
State::Opening { in_substream: in_substream @ Some(_), .. } => {
match NotificationsInSubstream::poll_process(Pin::new(in_substream.as_mut().unwrap()), cx) {
Poll::Pending => {},
Poll::Ready(Ok(void)) => match void {},
Poll::Ready(Err(_)) => *in_substream = None,
}
}
State::Opening { in_substream_closing: Some(_), in_substream_reopened: Some(_), .. } |
State::Closed { in_substream_closing: Some(_), in_substream_reopened: Some(_), .. } => {
debug_assert!(false);
}
}
}
@@ -88,11 +88,10 @@ enum NotificationsInSubstreamHandshake {
PendingSend(Vec<u8>),
/// Handshake message was pushed in the socket. Still need to flush.
Flush,
/// Ready to receive notifications. Handshake message successfully sent and flushed, or
/// sending side closed before handshake sent.
Normal { write_side_open: bool },
/// Closing our writing side.
Closing { remote_write_open: bool },
/// Handshake message successfully sent and flushed.
Sent,
/// Remote has closed their writing side. We close our own writing side in return.
ClosingInResponseToRemote,
/// Both our side and the remote have closed their writing side.
BothSidesClosed,
}
@@ -170,30 +169,8 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + 'static,
impl<TSubstream> NotificationsInSubstream<TSubstream>
where TSubstream: AsyncRead + AsyncWrite + Unpin,
{
/// Closes the writing side of the substream, indicating to the remote that we would like this
/// substream to be closed.
pub fn set_close_desired(&mut self) {
match self.handshake {
NotificationsInSubstreamHandshake::PendingSend(_) |
NotificationsInSubstreamHandshake::Flush |
NotificationsInSubstreamHandshake::NotSent |
NotificationsInSubstreamHandshake::Normal { write_side_open: true } => {
self.handshake = NotificationsInSubstreamHandshake::Closing { remote_write_open: true };
}
NotificationsInSubstreamHandshake::Normal { write_side_open: false } |
NotificationsInSubstreamHandshake::Closing { .. } |
NotificationsInSubstreamHandshake::BothSidesClosed => {}
}
}
/// Sends the handshake in order to inform the remote that we accept the substream.
///
/// Has no effect if `set_close_desired` has been called.
pub fn send_handshake(&mut self, message: impl Into<Vec<u8>>) {
if matches!(self.handshake, NotificationsInSubstreamHandshake::Normal { write_side_open: false }) {
return;
}
if !matches!(self.handshake, NotificationsInSubstreamHandshake::NotSent) {
error!(target: "sub-libp2p", "Tried to send handshake twice");
return;
@@ -208,7 +185,7 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin,
let mut this = self.project();
loop {
match mem::replace(this.handshake, NotificationsInSubstreamHandshake::NotSent) {
match mem::replace(this.handshake, NotificationsInSubstreamHandshake::Sent) {
NotificationsInSubstreamHandshake::PendingSend(msg) =>
match Sink::poll_ready(this.socket.as_mut(), cx) {
Poll::Ready(_) => {
@@ -226,28 +203,16 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin,
NotificationsInSubstreamHandshake::Flush =>
match Sink::poll_flush(this.socket.as_mut(), cx)? {
Poll::Ready(()) =>
*this.handshake = NotificationsInSubstreamHandshake::Normal { write_side_open: true },
*this.handshake = NotificationsInSubstreamHandshake::Sent,
Poll::Pending => {
*this.handshake = NotificationsInSubstreamHandshake::Flush;
return Poll::Pending
}
},
NotificationsInSubstreamHandshake::Closing { remote_write_open } =>
match Sink::poll_close(this.socket.as_mut(), cx)? {
Poll::Ready(()) => if remote_write_open {
*this.handshake = NotificationsInSubstreamHandshake::Normal { write_side_open: false }
} else {
*this.handshake = NotificationsInSubstreamHandshake::BothSidesClosed;
},
Poll::Pending => {
*this.handshake = NotificationsInSubstreamHandshake::Closing { remote_write_open };
return Poll::Pending
}
},
st @ NotificationsInSubstreamHandshake::NotSent |
st @ NotificationsInSubstreamHandshake::Normal { .. } |
st @ NotificationsInSubstreamHandshake::Sent |
st @ NotificationsInSubstreamHandshake::ClosingInResponseToRemote |
st @ NotificationsInSubstreamHandshake::BothSidesClosed => {
*this.handshake = st;
return Poll::Pending;
@@ -267,7 +232,7 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin,
// This `Stream` implementation first tries to send back the handshake if necessary.
loop {
match mem::replace(this.handshake, NotificationsInSubstreamHandshake::NotSent) {
match mem::replace(this.handshake, NotificationsInSubstreamHandshake::Sent) {
NotificationsInSubstreamHandshake::NotSent => {
*this.handshake = NotificationsInSubstreamHandshake::NotSent;
return Poll::Pending
@@ -289,39 +254,34 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin,
NotificationsInSubstreamHandshake::Flush =>
match Sink::poll_flush(this.socket.as_mut(), cx)? {
Poll::Ready(()) =>
*this.handshake = NotificationsInSubstreamHandshake::Normal { write_side_open: true },
*this.handshake = NotificationsInSubstreamHandshake::Sent,
Poll::Pending => {
*this.handshake = NotificationsInSubstreamHandshake::Flush;
return Poll::Pending
}
},
NotificationsInSubstreamHandshake::Normal { write_side_open } => {
NotificationsInSubstreamHandshake::Sent => {
match Stream::poll_next(this.socket.as_mut(), cx) {
Poll::Ready(None) if write_side_open =>
*this.handshake =
NotificationsInSubstreamHandshake::Closing { remote_write_open: false },
Poll::Ready(None) =>
*this.handshake = NotificationsInSubstreamHandshake::BothSidesClosed,
Poll::Ready(None) => *this.handshake =
NotificationsInSubstreamHandshake::ClosingInResponseToRemote,
Poll::Ready(Some(msg)) => {
*this.handshake = NotificationsInSubstreamHandshake::Normal { write_side_open };
*this.handshake = NotificationsInSubstreamHandshake::Sent;
return Poll::Ready(Some(msg))
},
Poll::Pending => {
*this.handshake = NotificationsInSubstreamHandshake::Normal { write_side_open };
*this.handshake = NotificationsInSubstreamHandshake::Sent;
return Poll::Pending
},
}
},
NotificationsInSubstreamHandshake::Closing { remote_write_open } =>
NotificationsInSubstreamHandshake::ClosingInResponseToRemote =>
match Sink::poll_close(this.socket.as_mut(), cx)? {
Poll::Ready(()) if remote_write_open =>
*this.handshake = NotificationsInSubstreamHandshake::Normal { write_side_open: false },
Poll::Ready(()) =>
*this.handshake = NotificationsInSubstreamHandshake::BothSidesClosed,
Poll::Pending => {
*this.handshake = NotificationsInSubstreamHandshake::Closing { remote_write_open };
*this.handshake = NotificationsInSubstreamHandshake::ClosingInResponseToRemote;
return Poll::Pending
}
},