mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-04-27 06:57:58 +00:00
Wait for all notifications protocols to be open before reporting opening (#6821)
* Wait for all notifications protocols to be open before reporting opening * Update client/network/src/protocol/generic_proto/handler/notif_out.rs Co-authored-by: Max Inden <mail@max-inden.de> * Concern * Fix attempt * Another fix attempt * Update client/network/src/protocol/generic_proto/handler/group.rs Co-authored-by: Max Inden <mail@max-inden.de> Co-authored-by: parity-processbot <> Co-authored-by: Max Inden <mail@max-inden.de>
This commit is contained in:
@@ -107,9 +107,17 @@ pub struct NotifsHandler {
|
||||
/// Handlers for outbound substreams, and the initial handshake message we send.
|
||||
out_handlers: Vec<(NotifsOutHandler, Arc<RwLock<Vec<u8>>>)>,
|
||||
|
||||
/// Whether we are the connection dialer or listener.
|
||||
endpoint: ConnectedPoint,
|
||||
|
||||
/// Handler for backwards-compatibility.
|
||||
legacy: LegacyProtoHandler,
|
||||
|
||||
/// In the situation where `legacy.is_open()` is true, but we haven't sent out any
|
||||
/// [`NotifsHandlerOut::Open`] event yet, this contains the handshake received on the legacy
|
||||
/// substream.
|
||||
pending_legacy_handshake: Option<Vec<u8>>,
|
||||
|
||||
/// State of this handler.
|
||||
enabled: EnabledState,
|
||||
|
||||
@@ -123,6 +131,9 @@ pub struct NotifsHandler {
|
||||
/// We use two different channels in order to have two different channel sizes, but from the
|
||||
/// receiving point of view, the two channels are the same.
|
||||
/// The receivers are fused in case the user drops the [`NotificationsSink`] entirely.
|
||||
///
|
||||
/// Contains `Some` if and only if it has been reported to the user that the substreams are
|
||||
/// open.
|
||||
notifications_sink_rx: Option<
|
||||
stream::Select<
|
||||
stream::Fuse<mpsc::Receiver<NotificationsSinkMessage>>,
|
||||
@@ -159,7 +170,9 @@ impl IntoProtocolsHandler for NotifsHandlerProto {
|
||||
.into_iter()
|
||||
.map(|(proto, msg)| (proto.into_handler(remote_peer_id, connected_point), msg))
|
||||
.collect(),
|
||||
endpoint: connected_point.clone(),
|
||||
legacy: self.legacy.into_handler(remote_peer_id, connected_point),
|
||||
pending_legacy_handshake: None,
|
||||
enabled: EnabledState::Initial,
|
||||
pending_in: Vec::new(),
|
||||
notifications_sink_rx: None,
|
||||
@@ -617,87 +630,80 @@ impl ProtocolsHandler for NotifsHandler {
|
||||
}
|
||||
}
|
||||
|
||||
if let Poll::Ready(ev) = self.legacy.poll(cx) {
|
||||
return match ev {
|
||||
ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol, info: () } =>
|
||||
Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest {
|
||||
protocol: protocol.map_upgrade(EitherUpgrade::B),
|
||||
info: None,
|
||||
}),
|
||||
ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::CustomProtocolOpen {
|
||||
endpoint,
|
||||
received_handshake,
|
||||
..
|
||||
}) => {
|
||||
let (async_tx, async_rx) = mpsc::channel(ASYNC_NOTIFICATIONS_BUFFER_SIZE);
|
||||
let (sync_tx, sync_rx) = mpsc::channel(SYNC_NOTIFICATIONS_BUFFER_SIZE);
|
||||
let notifications_sink = NotificationsSink {
|
||||
inner: Arc::new(NotificationsSinkInner {
|
||||
async_channel: FuturesMutex::new(async_tx),
|
||||
sync_channel: Mutex::new(sync_tx),
|
||||
}),
|
||||
};
|
||||
|
||||
debug_assert!(self.notifications_sink_rx.is_none());
|
||||
self.notifications_sink_rx = Some(stream::select(async_rx.fuse(), sync_rx.fuse()));
|
||||
|
||||
Poll::Ready(ProtocolsHandlerEvent::Custom(
|
||||
NotifsHandlerOut::Open { endpoint, received_handshake, notifications_sink }
|
||||
))
|
||||
},
|
||||
ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::CustomProtocolClosed { endpoint, reason }) => {
|
||||
// We consciously drop the receivers despite notifications being potentially
|
||||
// still buffered up.
|
||||
debug_assert!(self.notifications_sink_rx.is_some());
|
||||
self.notifications_sink_rx = None;
|
||||
|
||||
Poll::Ready(ProtocolsHandlerEvent::Custom(
|
||||
NotifsHandlerOut::Closed { endpoint, reason }
|
||||
))
|
||||
},
|
||||
ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::CustomMessage { message }) =>
|
||||
Poll::Ready(ProtocolsHandlerEvent::Custom(
|
||||
NotifsHandlerOut::CustomMessage { message }
|
||||
)),
|
||||
ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::ProtocolError { is_severe, error }) =>
|
||||
Poll::Ready(ProtocolsHandlerEvent::Custom(
|
||||
NotifsHandlerOut::ProtocolError { is_severe, error }
|
||||
)),
|
||||
ProtocolsHandlerEvent::Close(err) =>
|
||||
Poll::Ready(ProtocolsHandlerEvent::Close(NotifsHandlerError::Legacy(err))),
|
||||
}
|
||||
}
|
||||
|
||||
for (handler_num, (handler, handshake_message)) in self.in_handlers.iter_mut().enumerate() {
|
||||
while let Poll::Ready(ev) = handler.poll(cx) {
|
||||
// If `self.pending_legacy_handshake` is `Some`, we are in a state where the legacy
|
||||
// substream is open but the user isn't aware yet of the substreams being open.
|
||||
// When that is the case, neither the legacy substream nor the incoming notifications
|
||||
// substreams should be polled, otherwise there is a risk of receiving messages from them.
|
||||
if self.pending_legacy_handshake.is_none() {
|
||||
while let Poll::Ready(ev) = self.legacy.poll(cx) {
|
||||
match ev {
|
||||
ProtocolsHandlerEvent::OutboundSubstreamRequest { .. } =>
|
||||
error!("Incoming substream handler tried to open a substream"),
|
||||
ProtocolsHandlerEvent::Close(err) => void::unreachable(err),
|
||||
ProtocolsHandlerEvent::Custom(NotifsInHandlerOut::OpenRequest(_)) =>
|
||||
match self.enabled {
|
||||
EnabledState::Initial => self.pending_in.push(handler_num),
|
||||
EnabledState::Enabled => {
|
||||
// We create `handshake_message` on a separate line to be sure
|
||||
// that the lock is released as soon as possible.
|
||||
let handshake_message = handshake_message.read().clone();
|
||||
handler.inject_event(NotifsInHandlerIn::Accept(handshake_message))
|
||||
},
|
||||
EnabledState::Disabled =>
|
||||
handler.inject_event(NotifsInHandlerIn::Refuse),
|
||||
},
|
||||
ProtocolsHandlerEvent::Custom(NotifsInHandlerOut::Closed) => {},
|
||||
ProtocolsHandlerEvent::Custom(NotifsInHandlerOut::Notif(message)) => {
|
||||
// Note that right now the legacy substream has precedence over
|
||||
// everything. If it is not open, then we consider that nothing is open.
|
||||
if self.legacy.is_open() {
|
||||
let msg = NotifsHandlerOut::Notification {
|
||||
message,
|
||||
protocol_name: handler.protocol_name().to_owned().into(),
|
||||
};
|
||||
return Poll::Ready(ProtocolsHandlerEvent::Custom(msg));
|
||||
}
|
||||
ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol, info: () } =>
|
||||
return Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest {
|
||||
protocol: protocol.map_upgrade(EitherUpgrade::B),
|
||||
info: None,
|
||||
}),
|
||||
ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::CustomProtocolOpen {
|
||||
received_handshake,
|
||||
..
|
||||
}) => {
|
||||
self.pending_legacy_handshake = Some(received_handshake);
|
||||
cx.waker().wake_by_ref();
|
||||
return Poll::Pending;
|
||||
},
|
||||
ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::CustomProtocolClosed { reason, .. }) => {
|
||||
// We consciously drop the receivers despite notifications being potentially
|
||||
// still buffered up.
|
||||
debug_assert!(self.notifications_sink_rx.is_some());
|
||||
self.notifications_sink_rx = None;
|
||||
|
||||
return Poll::Ready(ProtocolsHandlerEvent::Custom(
|
||||
NotifsHandlerOut::Closed { endpoint: self.endpoint.clone(), reason }
|
||||
))
|
||||
},
|
||||
ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::CustomMessage { message }) => {
|
||||
debug_assert!(self.notifications_sink_rx.is_some());
|
||||
return Poll::Ready(ProtocolsHandlerEvent::Custom(
|
||||
NotifsHandlerOut::CustomMessage { message }
|
||||
))
|
||||
},
|
||||
ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::ProtocolError { is_severe, error }) =>
|
||||
return Poll::Ready(ProtocolsHandlerEvent::Custom(
|
||||
NotifsHandlerOut::ProtocolError { is_severe, error }
|
||||
)),
|
||||
ProtocolsHandlerEvent::Close(err) =>
|
||||
return Poll::Ready(ProtocolsHandlerEvent::Close(NotifsHandlerError::Legacy(err))),
|
||||
}
|
||||
}
|
||||
|
||||
for (handler_num, (handler, handshake_message)) in self.in_handlers.iter_mut().enumerate() {
|
||||
while let Poll::Ready(ev) = handler.poll(cx) {
|
||||
match ev {
|
||||
ProtocolsHandlerEvent::OutboundSubstreamRequest { .. } =>
|
||||
error!("Incoming substream handler tried to open a substream"),
|
||||
ProtocolsHandlerEvent::Close(err) => void::unreachable(err),
|
||||
ProtocolsHandlerEvent::Custom(NotifsInHandlerOut::OpenRequest(_)) =>
|
||||
match self.enabled {
|
||||
EnabledState::Initial => self.pending_in.push(handler_num),
|
||||
EnabledState::Enabled => {
|
||||
// We create `handshake_message` on a separate line to be sure
|
||||
// that the lock is released as soon as possible.
|
||||
let handshake_message = handshake_message.read().clone();
|
||||
handler.inject_event(NotifsInHandlerIn::Accept(handshake_message))
|
||||
},
|
||||
EnabledState::Disabled =>
|
||||
handler.inject_event(NotifsInHandlerIn::Refuse),
|
||||
},
|
||||
ProtocolsHandlerEvent::Custom(NotifsInHandlerOut::Closed) => {},
|
||||
ProtocolsHandlerEvent::Custom(NotifsInHandlerOut::Notif(message)) => {
|
||||
if self.notifications_sink_rx.is_some() {
|
||||
let msg = NotifsHandlerOut::Notification {
|
||||
message,
|
||||
protocol_name: handler.protocol_name().to_owned().into(),
|
||||
};
|
||||
return Poll::Ready(ProtocolsHandlerEvent::Custom(msg));
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -725,6 +731,30 @@ impl ProtocolsHandler for NotifsHandler {
|
||||
}
|
||||
}
|
||||
|
||||
if self.out_handlers.iter().all(|(h, _)| h.is_open() || h.is_refused()) {
|
||||
if let Some(handshake) = self.pending_legacy_handshake.take() {
|
||||
let (async_tx, async_rx) = mpsc::channel(ASYNC_NOTIFICATIONS_BUFFER_SIZE);
|
||||
let (sync_tx, sync_rx) = mpsc::channel(SYNC_NOTIFICATIONS_BUFFER_SIZE);
|
||||
let notifications_sink = NotificationsSink {
|
||||
inner: Arc::new(NotificationsSinkInner {
|
||||
async_channel: FuturesMutex::new(async_tx),
|
||||
sync_channel: Mutex::new(sync_tx),
|
||||
}),
|
||||
};
|
||||
|
||||
debug_assert!(self.notifications_sink_rx.is_none());
|
||||
self.notifications_sink_rx = Some(stream::select(async_rx.fuse(), sync_rx.fuse()));
|
||||
|
||||
return Poll::Ready(ProtocolsHandlerEvent::Custom(
|
||||
NotifsHandlerOut::Open {
|
||||
endpoint: self.endpoint.clone(),
|
||||
received_handshake: handshake,
|
||||
notifications_sink
|
||||
}
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
|
||||
@@ -222,16 +222,12 @@ pub enum LegacyProtoHandlerOut {
|
||||
/// Handshake message that has been sent to us.
|
||||
/// This is normally a "Status" message, but this out of the concern of this code.
|
||||
received_handshake: Vec<u8>,
|
||||
/// The connected endpoint.
|
||||
endpoint: ConnectedPoint,
|
||||
},
|
||||
|
||||
/// Closed a custom protocol with the remote.
|
||||
CustomProtocolClosed {
|
||||
/// Reason why the substream closed, for diagnostic purposes.
|
||||
reason: Cow<'static, str>,
|
||||
/// The connected endpoint.
|
||||
endpoint: ConnectedPoint,
|
||||
},
|
||||
|
||||
/// Receives a message on a custom protocol substream.
|
||||
@@ -250,18 +246,6 @@ pub enum LegacyProtoHandlerOut {
|
||||
}
|
||||
|
||||
impl LegacyProtoHandler {
|
||||
/// Returns true if the legacy substream is currently open.
|
||||
pub fn is_open(&self) -> bool {
|
||||
match &self.state {
|
||||
ProtocolState::Init { substreams, .. } => !substreams.is_empty(),
|
||||
ProtocolState::Opening { .. } => false,
|
||||
ProtocolState::Normal { substreams, .. } => !substreams.is_empty(),
|
||||
ProtocolState::Disabled { .. } => false,
|
||||
ProtocolState::KillAsap => false,
|
||||
ProtocolState::Poisoned => false,
|
||||
}
|
||||
}
|
||||
|
||||
/// Enables the handler.
|
||||
fn enable(&mut self) {
|
||||
self.state = match mem::replace(&mut self.state, ProtocolState::Poisoned) {
|
||||
@@ -285,7 +269,6 @@ impl LegacyProtoHandler {
|
||||
} else {
|
||||
let event = LegacyProtoHandlerOut::CustomProtocolOpen {
|
||||
version: incoming[0].0.protocol_version(),
|
||||
endpoint: self.endpoint.clone(),
|
||||
received_handshake: mem::replace(&mut incoming[0].1, Vec::new()),
|
||||
};
|
||||
self.events_queue.push_back(ProtocolsHandlerEvent::Custom(event));
|
||||
@@ -399,7 +382,6 @@ impl LegacyProtoHandler {
|
||||
if substreams.is_empty() {
|
||||
let event = LegacyProtoHandlerOut::CustomProtocolClosed {
|
||||
reason: "Legacy substream clogged".into(),
|
||||
endpoint: self.endpoint.clone()
|
||||
};
|
||||
self.state = ProtocolState::Disabled {
|
||||
shutdown: shutdown.into_iter().collect(),
|
||||
@@ -413,7 +395,6 @@ impl LegacyProtoHandler {
|
||||
if substreams.is_empty() {
|
||||
let event = LegacyProtoHandlerOut::CustomProtocolClosed {
|
||||
reason: "All substreams have been closed by the remote".into(),
|
||||
endpoint: self.endpoint.clone()
|
||||
};
|
||||
self.state = ProtocolState::Disabled {
|
||||
shutdown: shutdown.into_iter().collect(),
|
||||
@@ -426,7 +407,6 @@ impl LegacyProtoHandler {
|
||||
if substreams.is_empty() {
|
||||
let event = LegacyProtoHandlerOut::CustomProtocolClosed {
|
||||
reason: format!("Error on the last substream: {:?}", err).into(),
|
||||
endpoint: self.endpoint.clone()
|
||||
};
|
||||
self.state = ProtocolState::Disabled {
|
||||
shutdown: shutdown.into_iter().collect(),
|
||||
@@ -492,7 +472,6 @@ impl LegacyProtoHandler {
|
||||
ProtocolState::Opening { .. } => {
|
||||
let event = LegacyProtoHandlerOut::CustomProtocolOpen {
|
||||
version: substream.protocol_version(),
|
||||
endpoint: self.endpoint.clone(),
|
||||
received_handshake,
|
||||
};
|
||||
self.events_queue.push_back(ProtocolsHandlerEvent::Custom(event));
|
||||
|
||||
@@ -203,6 +203,22 @@ impl NotifsOutHandler {
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns `true` if there has been an attempt to open the substream, but the remote refused
|
||||
/// the substream.
|
||||
///
|
||||
/// Always returns `false` if the handler is in a disabled state.
|
||||
pub fn is_refused(&self) -> bool {
|
||||
match &self.state {
|
||||
State::Disabled => false,
|
||||
State::DisabledOpening => false,
|
||||
State::DisabledOpen(_) => false,
|
||||
State::Opening { .. } => false,
|
||||
State::Refused => true,
|
||||
State::Open { .. } => false,
|
||||
State::Poisoned => false,
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the name of the protocol that we negotiate.
|
||||
pub fn protocol_name(&self) -> &[u8] {
|
||||
&self.protocol_name
|
||||
|
||||
Reference in New Issue
Block a user