Report when peer is clogged (#1528)

This commit is contained in:
Pierre Krieger
2019-01-23 14:30:20 +01:00
committed by Gav Wood
parent 28c37ef419
commit cd86643f33
7 changed files with 107 additions and 16 deletions
+14 -3
View File
@@ -160,6 +160,14 @@ pub enum BehaviourOut {
data: Bytes,
},
/// A substream with a remote is clogged. We should avoid sending more data to it if possible.
Clogged {
/// Id of the peer the message came from.
peer_id: PeerId,
/// Protocol which generated the message.
protocol_id: ProtocolId,
},
/// We have obtained debug information from a peer.
Identified {
/// Id of the peer that has been identified.
@@ -174,13 +182,16 @@ impl From<CustomProtosOut> for BehaviourOut {
match other {
CustomProtosOut::CustomProtocolOpen { protocol_id, version, peer_id, endpoint } => {
BehaviourOut::CustomProtocolOpen { protocol_id, version, peer_id, endpoint }
},
}
CustomProtosOut::CustomProtocolClosed { protocol_id, peer_id, result } => {
BehaviourOut::CustomProtocolClosed { protocol_id, peer_id, result }
},
}
CustomProtosOut::CustomMessage { protocol_id, peer_id, data } => {
BehaviourOut::CustomMessage { protocol_id, peer_id, data }
},
}
CustomProtosOut::Clogged { protocol_id, peer_id } => {
BehaviourOut::Clogged { protocol_id, peer_id }
}
}
}
}
@@ -107,6 +107,15 @@ pub enum CustomProtosOut {
/// Data that has been received.
data: Bytes,
},
/// The substream used by the protocol is pretty large. We should print avoid sending more
/// data on it if possible.
Clogged {
/// Id of the peer which is clogged.
peer_id: PeerId,
/// Protocol which has a problem.
protocol_id: ProtocolId,
},
}
impl<TSubstream> CustomProtos<TSubstream> {
@@ -436,6 +445,14 @@ where
self.events.push(NetworkBehaviourAction::GenerateEvent(event));
}
CustomProtosHandlerOut::Clogged { protocol_id } => {
warn!(target: "sub-libp2p", "Queue of packets to send to {:?} (protocol: {:?}) is \
pretty large", source, protocol_id);
self.events.push(NetworkBehaviourAction::GenerateEvent(CustomProtosOut::Clogged {
peer_id: source,
protocol_id,
}));
}
}
}
@@ -15,7 +15,7 @@
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
use crate::ProtocolId;
use crate::custom_proto::upgrade::{RegisteredProtocol, RegisteredProtocols, RegisteredProtocolSubstream};
use crate::custom_proto::upgrade::{RegisteredProtocol, RegisteredProtocols, RegisteredProtocolSubstream, RegisteredProtocolEvent};
use bytes::Bytes;
use futures::prelude::*;
use libp2p::core::{
@@ -111,6 +111,13 @@ pub enum CustomProtosHandlerOut {
/// Data that has been received.
data: Bytes,
},
/// A substream to the remote is clogged. The send buffer is very large, and we should print
/// a diagnostic message and/or avoid sending more data.
Clogged {
/// Protocol which is clogged.
protocol_id: ProtocolId,
},
}
impl<TSubstream> CustomProtosHandler<TSubstream>
@@ -283,7 +290,7 @@ where
for n in (0..self.substreams.len()).rev() {
let mut substream = self.substreams.swap_remove(n);
match substream.poll() {
Ok(Async::Ready(Some(data))) => {
Ok(Async::Ready(Some(RegisteredProtocolEvent::Message(data)))) => {
let event = CustomProtosHandlerOut::CustomMessage {
protocol_id: substream.protocol_id(),
data
@@ -291,6 +298,13 @@ where
self.substreams.push(substream);
return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(event)))
},
Ok(Async::Ready(Some(RegisteredProtocolEvent::Clogged))) => {
let event = CustomProtosHandlerOut::Clogged {
protocol_id: substream.protocol_id()
};
self.substreams.push(substream);
return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(event)))
},
Ok(Async::NotReady) =>
self.substreams.push(substream),
Ok(Async::Ready(None)) => {
@@ -18,7 +18,6 @@ use crate::ProtocolId;
use bytes::Bytes;
use libp2p::core::{UpgradeInfo, InboundUpgrade, OutboundUpgrade, upgrade::ProtocolName};
use libp2p::tokio_codec::Framed;
use log::debug;
use std::{collections::VecDeque, io, vec::IntoIter as VecIntoIter};
use futures::{prelude::*, future, stream};
use tokio_io::{AsyncRead, AsyncWrite};
@@ -81,6 +80,9 @@ pub struct RegisteredProtocolSubstream<TSubstream> {
protocol_id: ProtocolId,
/// Version of the protocol that was negotiated.
protocol_version: u8,
/// If true, we have sent a "remote is clogged" event recently and shouldn't send another one
/// unless the buffer empties then fills itself again.
clogged_fuse: bool,
}
impl<TSubstream> RegisteredProtocolSubstream<TSubstream> {
@@ -114,21 +116,23 @@ impl<TSubstream> RegisteredProtocolSubstream<TSubstream> {
}
self.send_queue.push_back(data);
// If the length of the queue goes over a certain arbitrary threshold, we print a warning.
if self.send_queue.len() >= 2048 {
// TODO: this used to be a warning, but is now a `debug` in order to avoid too much
// noise in the logs; see https://github.com/paritytech/substrate/issues/1414
debug!(target: "sub-libp2p", "Queue of packets to send over substream is pretty \
large: {}", self.send_queue.len());
}
}
}
/// Event produced by the `RegisteredProtocolSubstream`.
#[derive(Debug, Clone)]
pub enum RegisteredProtocolEvent {
/// Received a message from the remote.
Message(Bytes),
/// Diagnostic event indicating that the connection is clogged and we should avoid sending too
/// many messages to it.
Clogged,
}
impl<TSubstream> Stream for RegisteredProtocolSubstream<TSubstream>
where TSubstream: AsyncRead + AsyncWrite,
{
type Item = Bytes;
type Item = RegisteredProtocolEvent;
type Error = io::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
@@ -148,6 +152,19 @@ where TSubstream: AsyncRead + AsyncWrite,
}
}
// Indicating that the remote is clogged if that's the case.
if self.send_queue.len() >= 2048 {
if !self.clogged_fuse {
// Note: this fuse is important not just for preventing us from flooding the logs;
// if you remove the fuse, then we will always return early from this function and
// thus never read any message from the network.
self.clogged_fuse = true;
return Ok(Async::Ready(Some(RegisteredProtocolEvent::Clogged)))
}
} else {
self.clogged_fuse = false;
}
// Flushing if necessary.
if self.requires_poll_complete {
if let Async::Ready(()) = self.inner.poll_complete()? {
@@ -158,7 +175,8 @@ where TSubstream: AsyncRead + AsyncWrite,
// Receiving incoming packets.
// Note that `inner` is wrapped in a `Fuse`, therefore we can poll it forever.
match self.inner.poll()? {
Async::Ready(Some(data)) => Ok(Async::Ready(Some(data.freeze()))),
Async::Ready(Some(data)) =>
Ok(Async::Ready(Some(RegisteredProtocolEvent::Message(data.freeze())))),
Async::Ready(None) =>
if !self.requires_poll_complete && self.send_queue.is_empty() {
Ok(Async::Ready(None))
@@ -225,6 +243,7 @@ where TSubstream: AsyncRead + AsyncWrite,
inner: framed.fuse(),
protocol_id: self.id,
protocol_version: info.version,
clogged_fuse: false,
})
}
}
@@ -175,6 +175,14 @@ pub enum ServiceEvent {
/// Data that has been received.
data: Bytes,
},
/// The substream with a node is clogged. We should avoid sending data to it if possible.
Clogged {
/// Index of the node.
node_index: NodeIndex,
/// Protocol which generated the message.
protocol_id: ProtocolId,
},
}
/// Network service. Must be polled regularly in order for the networking to work.
@@ -370,6 +378,13 @@ impl Service {
data,
})))
}
Ok(Async::Ready(Some(BehaviourOut::Clogged { protocol_id, peer_id }))) => {
let node_index = *self.index_by_id.get(&peer_id).expect("index_by_id is always kept in sync with the state of the behaviour");
break Ok(Async::Ready(Some(ServiceEvent::Clogged {
node_index,
protocol_id,
})))
}
Ok(Async::Ready(Some(BehaviourOut::Identified { peer_id, info }))) => {
// Contrary to the other events, this one can happen even on nodes which don't
// have any open custom protocol slot. Therefore it is not necessarily in the