diff --git a/substrate/client/network/src/protocol/generic_proto/handler/notif_out.rs b/substrate/client/network/src/protocol/generic_proto/handler/notif_out.rs index 63b460bf8c..dd38826496 100644 --- a/substrate/client/network/src/protocol/generic_proto/handler/notif_out.rs +++ b/substrate/client/network/src/protocol/generic_proto/handler/notif_out.rs @@ -79,13 +79,14 @@ impl IntoProtocolsHandler for NotifsOutHandlerProto { DeniedUpgrade } - fn into_handler(self, _: &PeerId, _: &ConnectedPoint) -> Self::Handler { + fn into_handler(self, peer_id: &PeerId, _: &ConnectedPoint) -> Self::Handler { NotifsOutHandler { protocol_name: self.protocol_name, when_connection_open: Instant::now(), queue_size_report: self.queue_size_report, state: State::Disabled, events_queue: SmallVec::new(), + peer_id: peer_id.clone(), } } } @@ -116,6 +117,9 @@ pub struct NotifsOutHandler { /// This queue must only ever be modified to insert elements at the back, or remove the first /// element. events_queue: SmallVec<[ProtocolsHandlerEvent; 16]>, + + /// Who we are connected to. + peer_id: PeerId, } /// Our relationship with the node we're connected to. @@ -308,16 +312,17 @@ impl ProtocolsHandler for NotifsOutHandler { NotifsOutHandlerIn::Send(msg) => if let State::Open { substream, .. } = &mut self.state { - if let Some(Ok(_)) = substream.send(msg).now_or_never() { - if let Some(metric) = &self.queue_size_report { - metric.observe(substream.queue_len() as f64); - } - } else { + if substream.push_message(msg).is_err() { log::warn!( target: "sub-libp2p", - "📞 Failed to push message to queue, dropped it" + "📞 Notifications queue with peer {} is full, dropped message (protocol: {:?})", + self.peer_id, + self.protocol_name, ); } + if let Some(metric) = &self.queue_size_report { + metric.observe(substream.queue_len() as f64); + } } else { // This is an API misuse. log::warn!( diff --git a/substrate/client/network/src/protocol/generic_proto/upgrade/notifications.rs b/substrate/client/network/src/protocol/generic_proto/upgrade/notifications.rs index de42d85c80..f626110a33 100644 --- a/substrate/client/network/src/protocol/generic_proto/upgrade/notifications.rs +++ b/substrate/client/network/src/protocol/generic_proto/upgrade/notifications.rs @@ -43,8 +43,7 @@ use unsigned_varint::codec::UviBytes; /// Maximum allowed size of the two handshake messages, in bytes. const MAX_HANDSHAKE_SIZE: usize = 1024; -/// Maximum number of buffered messages before we consider the remote unresponsive and kill the -/// substream. +/// Maximum number of buffered messages before we refuse to accept more. const MAX_PENDING_MESSAGES: usize = 256; /// Upgrade that accepts a substream, sends back a status message, then becomes a unidirectional @@ -285,6 +284,18 @@ impl NotificationsOutSubstream { pub fn queue_len(&self) -> u32 { u32::try_from(self.messages_queue.len()).unwrap_or(u32::max_value()) } + + /// Push a message to the queue of messages. + /// + /// This has the same effect as the `Sink::start_send` implementation. + pub fn push_message(&mut self, item: Vec) -> Result<(), NotificationsOutError> { + if self.messages_queue.len() >= MAX_PENDING_MESSAGES { + return Err(NotificationsOutError::Clogged); + } + + self.messages_queue.push_back(item); + Ok(()) + } } impl Sink> for NotificationsOutSubstream @@ -297,12 +308,7 @@ impl Sink> for NotificationsOutSubstream } fn start_send(mut self: Pin<&mut Self>, item: Vec) -> Result<(), Self::Error> { - if self.messages_queue.len() >= MAX_PENDING_MESSAGES { - return Err(NotificationsOutError::Clogged); - } - - self.messages_queue.push_back(item); - Ok(()) + self.push_message(item) } fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll> {