diff --git a/substrate/client/network/src/protocol/notifications/handler.rs b/substrate/client/network/src/protocol/notifications/handler.rs index a0c49fa592..db0385bea8 100644 --- a/substrate/client/network/src/protocol/notifications/handler.rs +++ b/substrate/client/network/src/protocol/notifications/handler.rs @@ -365,10 +365,12 @@ struct NotificationsSinkInner { /// Sender to use in asynchronous contexts. Uses an asynchronous mutex. async_channel: FuturesMutex>, /// Sender to use in synchronous contexts. Uses a synchronous mutex. + /// Contains `None` if the channel was full at some point, in which case the channel will + /// be closed in the near future anyway. /// This channel has a large capacity and is meant to be used in contexts where /// back-pressure cannot be properly exerted. /// It will be removed in a future version. - sync_channel: Mutex>, + sync_channel: Mutex>>, } /// Message emitted through the [`NotificationsSink`] and processed by the background task @@ -400,14 +402,20 @@ impl NotificationsSink { /// This method will be removed in a future version. pub fn send_sync_notification<'a>(&'a self, message: impl Into>) { let mut lock = self.inner.sync_channel.lock(); - let result = - lock.try_send(NotificationsSinkMessage::Notification { message: message.into() }); - if result.is_err() { - // Cloning the `mpsc::Sender` guarantees the allocation of an extra spot in the - // buffer, and therefore `try_send` will succeed. - let _result2 = lock.clone().try_send(NotificationsSinkMessage::ForceClose); - debug_assert!(_result2.map(|()| true).unwrap_or_else(|err| err.is_disconnected())); + if let Some(tx) = lock.as_mut() { + let result = + tx.try_send(NotificationsSinkMessage::Notification { message: message.into() }); + + if result.is_err() { + // Cloning the `mpsc::Sender` guarantees the allocation of an extra spot in the + // buffer, and therefore `try_send` will succeed. + let _result2 = tx.clone().try_send(NotificationsSinkMessage::ForceClose); + debug_assert!(_result2.map(|()| true).unwrap_or_else(|err| err.is_disconnected())); + + // Destroy the sender in order to not send more `ForceClose` messages. + *lock = None; + } } } @@ -554,7 +562,7 @@ impl ProtocolsHandler for NotifsHandler { inner: Arc::new(NotificationsSinkInner { peer_id: self.peer_id, async_channel: FuturesMutex::new(async_tx), - sync_channel: Mutex::new(sync_tx), + sync_channel: Mutex::new(Some(sync_tx)), }), };