Properly close notification substreams (#8534)

* Properly close notification substreams

* Some debug asserts

* Fix state inconsistency

* Remove erroneous debug_assert!

* Some comments
This commit is contained in:
Pierre Krieger
2021-04-15 12:42:47 +02:00
committed by GitHub
parent 14b5acab86
commit 6c9c687a31
2 changed files with 300 additions and 65 deletions
@@ -159,6 +159,16 @@ enum State {
Closed {
/// True if an outgoing substream is still in the process of being opened.
pending_opening: bool,
/// Outbound substream that has been accepted by the remote. Being closed.
out_substream_closing: Option<NotificationsOutSubstream<NegotiatedSubstream>>,
/// Substream opened by the remote. Being closed.
in_substream_closing: Option<NotificationsInSubstream<NegotiatedSubstream>>,
/// Substream re-opened by the remote. Not to be closed after `in_substream_closing` has
/// been closed.
in_substream_reopened: Option<NotificationsInSubstream<NegotiatedSubstream>>,
},
/// Protocol is in the "Closed" state. A [`NotifsHandlerOut::OpenDesiredByRemote`] has been
@@ -167,6 +177,9 @@ enum State {
/// Substream opened by the remote and that hasn't been accepted/rejected yet.
in_substream: NotificationsInSubstream<NegotiatedSubstream>,
/// Outbound substream that has been accepted by the remote. Being closed.
out_substream_closing: Option<NotificationsOutSubstream<NegotiatedSubstream>>,
/// See [`State::Closed::pending_opening`].
pending_opening: bool,
},
@@ -177,8 +190,15 @@ enum State {
/// A [`NotifsHandlerOut::OpenResultOk`] or a [`NotifsHandlerOut::OpenResultErr`] event must
/// be emitted when transitionning to respectively [`State::Open`] or [`State::Closed`].
Opening {
/// Substream opened by the remote. If `Some`, has been accepted.
in_substream: Option<NotificationsInSubstream<NegotiatedSubstream>>,
/// Outbound substream that has been accepted by the remote. Being closed. An outbound
/// substream request has been emitted towards libp2p if and only if this field is `None`.
out_substream_closing: Option<NotificationsOutSubstream<NegotiatedSubstream>>,
/// Substream re-opened by the remote. Has been accepted.
in_substream_reopened: Option<NotificationsInSubstream<NegotiatedSubstream>>,
/// Substream opened by the remote. Being closed.
in_substream_closing: Option<NotificationsInSubstream<NegotiatedSubstream>>,
},
/// Protocol is in the "Open" state.
@@ -227,6 +247,9 @@ impl IntoProtocolsHandler for NotifsHandlerProto {
handshake,
state: State::Closed {
pending_opening: false,
in_substream_closing: None,
in_substream_reopened: None,
out_substream_closing: None,
},
max_notification_size: max_size,
}
@@ -487,7 +510,16 @@ impl ProtocolsHandler for NotifsHandler {
) {
let mut protocol_info = &mut self.protocols[protocol_index];
match protocol_info.state {
State::Closed { pending_opening } => {
State::Closed {
ref mut pending_opening,
ref mut out_substream_closing,
ref mut in_substream_closing,
ref mut in_substream_reopened
}
if in_substream_closing.is_none() && in_substream_reopened.is_none()
=> {
debug_assert!(!(out_substream_closing.is_some() && *pending_opening));
self.events_queue.push_back(ProtocolsHandlerEvent::Custom(
NotifsHandlerOut::OpenDesiredByRemote {
protocol_index,
@@ -496,9 +528,31 @@ impl ProtocolsHandler for NotifsHandler {
protocol_info.state = State::OpenDesiredByRemote {
in_substream: new_substream,
pending_opening,
out_substream_closing: out_substream_closing.take(),
pending_opening: *pending_opening,
};
},
State::Opening { ref mut in_substream_closing, ref mut in_substream_reopened, .. } => {
*in_substream_closing = None;
// Create `handshake_message` on a separate line to be sure that the
// lock is released as soon as possible.
let handshake_message = protocol_info.handshake.read().clone();
new_substream.send_handshake(handshake_message);
*in_substream_reopened = Some(new_substream);
},
State::Open { ref mut in_substream, .. } if in_substream.is_none() => {
// Create `handshake_message` on a separate line to be sure that the
// lock is released as soon as possible.
let handshake_message = protocol_info.handshake.read().clone();
new_substream.send_handshake(handshake_message);
*in_substream = Some(new_substream);
},
State::Closed { .. } |
State::Open { .. } |
State::OpenDesiredByRemote { .. } => {
// If a substream already exists, silently drop the new one.
// Note that we drop the substream, which will send an equivalent to a
@@ -509,19 +563,6 @@ impl ProtocolsHandler for NotifsHandler {
// to do.
return;
},
State::Opening { ref mut in_substream, .. } |
State::Open { ref mut in_substream, .. } => {
if in_substream.is_some() {
// Same remark as above.
return;
}
// Create `handshake_message` on a separate line to be sure that the
// lock is released as soon as possible.
let handshake_message = protocol_info.handshake.read().clone();
new_substream.send_handshake(handshake_message);
*in_substream = Some(new_substream);
},
}
}
@@ -531,16 +572,24 @@ impl ProtocolsHandler for NotifsHandler {
protocol_index: Self::OutboundOpenInfo
) {
match self.protocols[protocol_index].state {
State::Closed { ref mut pending_opening } |
State::OpenDesiredByRemote { ref mut pending_opening, .. } => {
State::Closed { ref mut pending_opening, ref mut out_substream_closing, .. } |
State::OpenDesiredByRemote { ref mut pending_opening, ref mut out_substream_closing, .. } => {
debug_assert!(out_substream_closing.is_none());
debug_assert!(*pending_opening);
*out_substream_closing = Some(substream);
*pending_opening = false;
}
State::Open { .. } => {
error!(target: "sub-libp2p", "☎️ State mismatch in notifications handler");
debug_assert!(false);
}
State::Opening { ref mut in_substream } => {
State::Opening {
ref mut in_substream_reopened, ref mut in_substream_closing,
ref mut out_substream_closing
} => {
debug_assert!(out_substream_closing.is_none());
debug_assert!(!(in_substream_reopened.is_some() && in_substream_closing.is_some()));
let (async_tx, async_rx) = mpsc::channel(ASYNC_NOTIFICATIONS_BUFFER_SIZE);
let (sync_tx, sync_rx) = mpsc::channel(SYNC_NOTIFICATIONS_BUFFER_SIZE);
let notifications_sink = NotificationsSink {
@@ -554,7 +603,7 @@ impl ProtocolsHandler for NotifsHandler {
self.protocols[protocol_index].state = State::Open {
notifications_sink_rx: stream::select(async_rx.fuse(), sync_rx.fuse()).peekable(),
out_substream: Some(substream),
in_substream: in_substream.take(),
in_substream: in_substream_reopened.take().or(in_substream_closing.take()),
};
self.events_queue.push_back(ProtocolsHandlerEvent::Custom(
@@ -574,8 +623,13 @@ impl ProtocolsHandler for NotifsHandler {
NotifsHandlerIn::Open { protocol_index } => {
let protocol_info = &mut self.protocols[protocol_index];
match &mut protocol_info.state {
State::Closed { pending_opening } => {
if !*pending_opening {
State::Closed {
ref mut pending_opening,
ref mut in_substream_closing,
ref mut in_substream_reopened,
ref mut out_substream_closing
} => {
if !*pending_opening && out_substream_closing.is_none() {
let proto = NotificationsOut::new(
protocol_info.name.clone(),
protocol_info.handshake.read().clone(),
@@ -588,14 +642,31 @@ impl ProtocolsHandler for NotifsHandler {
});
}
debug_assert!(!(in_substream_reopened.is_some() && in_substream_closing.is_some()));
protocol_info.state = State::Opening {
in_substream: None,
in_substream_closing: in_substream_closing.take(),
in_substream_reopened: in_substream_reopened.take(),
out_substream_closing: out_substream_closing.take(),
};
},
State::OpenDesiredByRemote { pending_opening, in_substream } => {
State::OpenDesiredByRemote { .. } => {
// The state change is done in two steps because of borrowing issues.
let (pending_opening, out_substream_closing, mut in_substream) = match
mem::replace(&mut protocol_info.state,
State::Opening {
in_substream_closing: None, in_substream_reopened: None,
out_substream_closing: None,
})
{
State::OpenDesiredByRemote { pending_opening, out_substream_closing, in_substream, .. } =>
(pending_opening, out_substream_closing, in_substream),
_ => unreachable!()
};
let handshake_message = protocol_info.handshake.read().clone();
if !*pending_opening {
if !pending_opening && out_substream_closing.is_none() {
let proto = NotificationsOut::new(
protocol_info.name.clone(),
handshake_message.clone(),
@@ -610,17 +681,13 @@ impl ProtocolsHandler for NotifsHandler {
in_substream.send_handshake(handshake_message);
// The state change is done in two steps because of borrowing issues.
let in_substream = match
mem::replace(&mut protocol_info.state, State::Opening { in_substream: None })
{
State::OpenDesiredByRemote { in_substream, .. } => in_substream,
_ => unreachable!()
};
protocol_info.state = State::Opening {
in_substream: Some(in_substream),
out_substream_closing,
in_substream_closing: None,
in_substream_reopened: Some(in_substream),
};
},
State::Opening { .. } |
State::Open { .. } => {
// As documented, it is forbidden to send an `Open` while there is already
@@ -632,15 +699,30 @@ impl ProtocolsHandler for NotifsHandler {
},
NotifsHandlerIn::Close { protocol_index } => {
match self.protocols[protocol_index].state {
State::Open { .. } => {
match &mut self.protocols[protocol_index].state {
State::Open { in_substream, out_substream, .. } => {
if let Some(in_substream) = in_substream.as_mut() {
in_substream.set_close_desired();
}
self.protocols[protocol_index].state = State::Closed {
in_substream_closing: in_substream.take(),
in_substream_reopened: None,
out_substream_closing: out_substream.take(),
pending_opening: false,
};
},
State::Opening { .. } => {
State::Opening { in_substream_closing, in_substream_reopened, out_substream_closing } => {
debug_assert!(!(in_substream_reopened.is_some() && in_substream_closing.is_some()));
let pending_opening = out_substream_closing.is_none();
if let Some(in_substream_reopened) = in_substream_reopened.as_mut() {
in_substream_reopened.set_close_desired();
}
self.protocols[protocol_index].state = State::Closed {
pending_opening: true,
in_substream_closing: in_substream_reopened.take().or(in_substream_closing.take()),
in_substream_reopened: None,
out_substream_closing: out_substream_closing.take(),
pending_opening,
};
self.events_queue.push_back(ProtocolsHandlerEvent::Custom(
@@ -649,8 +731,23 @@ impl ProtocolsHandler for NotifsHandler {
}
));
},
State::OpenDesiredByRemote { pending_opening, .. } => {
State::OpenDesiredByRemote { .. } => {
let (mut in_substream, pending_opening, out_substream_closing) = match mem::replace(
&mut self.protocols[protocol_index].state,
State::Closed { pending_opening: false, in_substream_closing: None,
in_substream_reopened: None, out_substream_closing: None,
}
) {
State::OpenDesiredByRemote { in_substream, pending_opening, out_substream_closing } =>
(in_substream, pending_opening, out_substream_closing),
_ => unreachable!("Can only enter this branch after OpenDesiredByRemote; qed")
};
in_substream.set_close_desired();
self.protocols[protocol_index].state = State::Closed {
in_substream_closing: Some(in_substream),
in_substream_reopened: None,
out_substream_closing,
pending_opening,
};
}
@@ -672,14 +769,30 @@ impl ProtocolsHandler for NotifsHandler {
_: ProtocolsHandlerUpgrErr<NotificationsHandshakeError>
) {
match self.protocols[num].state {
State::Closed { ref mut pending_opening } |
State::OpenDesiredByRemote { ref mut pending_opening, .. } => {
State::Closed { ref mut pending_opening, ref mut out_substream_closing, .. } |
State::OpenDesiredByRemote { ref mut pending_opening, ref mut out_substream_closing, .. } => {
debug_assert!(out_substream_closing.is_none());
debug_assert!(*pending_opening);
*pending_opening = false;
}
State::Opening { .. } => {
State::Opening {
ref mut out_substream_closing,
ref mut in_substream_closing,
ref mut in_substream_reopened,
..
} => {
debug_assert!(!(in_substream_reopened.is_some() && in_substream_closing.is_some()));
debug_assert!(out_substream_closing.is_none());
if let Some(in_substream_reopened) = in_substream_reopened.as_mut() {
in_substream_reopened.set_close_desired();
}
self.protocols[num].state = State::Closed {
out_substream_closing: None,
in_substream_closing: in_substream_reopened.take().or(in_substream_closing.take()),
in_substream_reopened: None,
pending_opening: false,
};
@@ -788,15 +901,67 @@ impl ProtocolsHandler for NotifsHandler {
}
}
// Try close outbound substreams that are marked for closing.
for protocol_index in 0..self.protocols.len() {
match &mut self.protocols[protocol_index].state {
State::Closed { out_substream_closing: ref mut substream @ Some(_), .. } |
State::OpenDesiredByRemote { out_substream_closing: ref mut substream @ Some(_), .. } |
State::Opening { out_substream_closing: ref mut substream @ Some(_), .. } => {
match Sink::poll_close(Pin::new(substream.as_mut().unwrap()), cx) {
Poll::Pending => {},
Poll::Ready(_) => {
*substream = None;
if matches!(self.protocols[protocol_index].state, State::Opening { .. }) {
let protocol_info = &mut self.protocols[protocol_index];
let proto = NotificationsOut::new(
protocol_info.name.clone(),
protocol_info.handshake.read().clone(),
protocol_info.max_notification_size
);
self.events_queue.push_back(ProtocolsHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(proto, protocol_index)
.with_timeout(OPEN_TIMEOUT),
});
}
}
}
}
_ => {}
}
if let State::Closed {
pending_opening,
out_substream_closing: None,
in_substream_closing,
in_substream_reopened: ref mut in_substream_reopened @ Some(_),
..
} = &mut self.protocols[protocol_index].state {
debug_assert!(!*pending_opening);
debug_assert!(in_substream_closing.is_none());
self.events_queue.push_back(ProtocolsHandlerEvent::Custom(
NotifsHandlerOut::OpenDesiredByRemote {
protocol_index,
}
));
self.protocols[protocol_index].state = State::OpenDesiredByRemote {
in_substream: in_substream_reopened.take()
.expect("The if let above ensures that this is Some ; qed"),
out_substream_closing: None,
pending_opening: false,
};
}
}
// Poll inbound substreams.
for protocol_index in 0..self.protocols.len() {
// Inbound substreams being closed is always tolerated, except for the
// `OpenDesiredByRemote` state which might need to be switched back to `Closed`.
match &mut self.protocols[protocol_index].state {
State::Closed { .. } |
State::Open { in_substream: None, .. } |
State::Opening { in_substream: None } => {}
State::Open { in_substream: None, .. } => {}
State::Open { in_substream: in_substream @ Some(_), .. } => {
match Stream::poll_next(Pin::new(in_substream.as_mut().unwrap()), cx) {
Poll::Pending => {},
@@ -812,13 +977,16 @@ impl ProtocolsHandler for NotifsHandler {
}
}
State::OpenDesiredByRemote { in_substream, pending_opening } => {
State::OpenDesiredByRemote { in_substream, pending_opening, out_substream_closing } => {
match NotificationsInSubstream::poll_process(Pin::new(in_substream), cx) {
Poll::Pending => {},
Poll::Ready(Ok(void)) => match void {},
Poll::Ready(Err(_)) => {
self.protocols[protocol_index].state = State::Closed {
pending_opening: *pending_opening,
in_substream_closing: None,
in_substream_reopened: None,
out_substream_closing: out_substream_closing.take(),
};
return Poll::Ready(ProtocolsHandlerEvent::Custom(
NotifsHandlerOut::CloseDesired { protocol_index }
@@ -827,13 +995,40 @@ impl ProtocolsHandler for NotifsHandler {
}
}
State::Opening { in_substream: in_substream @ Some(_), .. } => {
State::Opening { in_substream_closing: None, in_substream_reopened: None, .. } |
State::Closed { in_substream_closing: None, in_substream_reopened: None, .. } => {}
State::Opening {
in_substream_closing: ref mut in_substream @ Some(_),
in_substream_reopened: None,
..
} |
State::Opening {
in_substream_closing: None,
in_substream_reopened: ref mut in_substream @ Some(_),
..
} |
State::Closed {
in_substream_closing: ref mut in_substream @ Some(_),
in_substream_reopened: None,
..
} |
State::Closed {
in_substream_closing: None,
in_substream_reopened: ref mut in_substream @ Some(_),
..
} => {
match NotificationsInSubstream::poll_process(Pin::new(in_substream.as_mut().unwrap()), cx) {
Poll::Pending => {},
Poll::Ready(Ok(void)) => match void {},
Poll::Ready(Err(_)) => *in_substream = None,
}
}
State::Opening { in_substream_closing: Some(_), in_substream_reopened: Some(_), .. } |
State::Closed { in_substream_closing: Some(_), in_substream_reopened: Some(_), .. } => {
debug_assert!(false);
}
}
}