Don't send ForceClose repeatedly in send_sync_notification (#10348)

This commit is contained in:
Pierre Krieger
2021-11-25 09:30:07 +01:00
committed by GitHub
parent d94027edb4
commit 5e2b93c2ea
@@ -365,10 +365,12 @@ struct NotificationsSinkInner {
/// Sender to use in asynchronous contexts. Uses an asynchronous mutex.
async_channel: FuturesMutex<mpsc::Sender<NotificationsSinkMessage>>,
/// Sender to use in synchronous contexts. Uses a synchronous mutex.
/// Contains `None` if the channel was full at some point, in which case the channel will
/// be closed in the near future anyway.
/// This channel has a large capacity and is meant to be used in contexts where
/// back-pressure cannot be properly exerted.
/// It will be removed in a future version.
sync_channel: Mutex<mpsc::Sender<NotificationsSinkMessage>>,
sync_channel: Mutex<Option<mpsc::Sender<NotificationsSinkMessage>>>,
}
/// Message emitted through the [`NotificationsSink`] and processed by the background task
@@ -400,14 +402,20 @@ impl NotificationsSink {
/// This method will be removed in a future version.
pub fn send_sync_notification<'a>(&'a self, message: impl Into<Vec<u8>>) {
let mut lock = self.inner.sync_channel.lock();
let result =
lock.try_send(NotificationsSinkMessage::Notification { message: message.into() });
if result.is_err() {
// Cloning the `mpsc::Sender` guarantees the allocation of an extra spot in the
// buffer, and therefore `try_send` will succeed.
let _result2 = lock.clone().try_send(NotificationsSinkMessage::ForceClose);
debug_assert!(_result2.map(|()| true).unwrap_or_else(|err| err.is_disconnected()));
if let Some(tx) = lock.as_mut() {
let result =
tx.try_send(NotificationsSinkMessage::Notification { message: message.into() });
if result.is_err() {
// Cloning the `mpsc::Sender` guarantees the allocation of an extra spot in the
// buffer, and therefore `try_send` will succeed.
let _result2 = tx.clone().try_send(NotificationsSinkMessage::ForceClose);
debug_assert!(_result2.map(|()| true).unwrap_or_else(|err| err.is_disconnected()));
// Destroy the sender in order to not send more `ForceClose` messages.
*lock = None;
}
}
}
@@ -554,7 +562,7 @@ impl ProtocolsHandler for NotifsHandler {
inner: Arc::new(NotificationsSinkInner {
peer_id: self.peer_id,
async_channel: FuturesMutex::new(async_tx),
sync_channel: Mutex::new(sync_tx),
sync_channel: Mutex::new(Some(sync_tx)),
}),
};