From d30a8e0bc399f120dc00ed6d2ab00e4b8ff4aa94 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Wed, 23 Jan 2019 15:24:25 +0100 Subject: [PATCH] Log the clogging networking messages (#1532) --- .../core/network-libp2p/src/behaviour.rs | 6 ++++-- .../src/custom_proto/behaviour.rs | 5 ++++- .../src/custom_proto/handler.rs | 7 +++++-- .../src/custom_proto/upgrade.rs | 10 ++++++++-- .../core/network-libp2p/src/service_task.rs | 5 ++++- substrate/core/network/src/protocol.rs | 19 ++++++++++++++++++- substrate/core/network/src/service.rs | 5 +++-- 7 files changed, 46 insertions(+), 11 deletions(-) diff --git a/substrate/core/network-libp2p/src/behaviour.rs b/substrate/core/network-libp2p/src/behaviour.rs index 52b03c8335..d29be12307 100644 --- a/substrate/core/network-libp2p/src/behaviour.rs +++ b/substrate/core/network-libp2p/src/behaviour.rs @@ -166,6 +166,8 @@ pub enum BehaviourOut { peer_id: PeerId, /// Protocol which generated the message. protocol_id: ProtocolId, + /// Copy of the messages that are within the buffer, for further diagnostic. + messages: Vec, }, /// We have obtained debug information from a peer. @@ -189,8 +191,8 @@ impl From for BehaviourOut { CustomProtosOut::CustomMessage { protocol_id, peer_id, data } => { BehaviourOut::CustomMessage { protocol_id, peer_id, data } } - CustomProtosOut::Clogged { protocol_id, peer_id } => { - BehaviourOut::Clogged { protocol_id, peer_id } + CustomProtosOut::Clogged { protocol_id, peer_id, messages } => { + BehaviourOut::Clogged { protocol_id, peer_id, messages } } } } diff --git a/substrate/core/network-libp2p/src/custom_proto/behaviour.rs b/substrate/core/network-libp2p/src/custom_proto/behaviour.rs index b76e5adcd6..6fe68e308c 100644 --- a/substrate/core/network-libp2p/src/custom_proto/behaviour.rs +++ b/substrate/core/network-libp2p/src/custom_proto/behaviour.rs @@ -115,6 +115,8 @@ pub enum CustomProtosOut { peer_id: PeerId, /// Protocol which has a problem. protocol_id: ProtocolId, + /// Copy of the messages that are within the buffer, for further diagnostic. + messages: Vec, }, } @@ -445,12 +447,13 @@ where self.events.push(NetworkBehaviourAction::GenerateEvent(event)); } - CustomProtosHandlerOut::Clogged { protocol_id } => { + CustomProtosHandlerOut::Clogged { protocol_id, messages } => { warn!(target: "sub-libp2p", "Queue of packets to send to {:?} (protocol: {:?}) is \ pretty large", source, protocol_id); self.events.push(NetworkBehaviourAction::GenerateEvent(CustomProtosOut::Clogged { peer_id: source, protocol_id, + messages, })); } } diff --git a/substrate/core/network-libp2p/src/custom_proto/handler.rs b/substrate/core/network-libp2p/src/custom_proto/handler.rs index dac707ca16..c90276095d 100644 --- a/substrate/core/network-libp2p/src/custom_proto/handler.rs +++ b/substrate/core/network-libp2p/src/custom_proto/handler.rs @@ -117,6 +117,8 @@ pub enum CustomProtosHandlerOut { Clogged { /// Protocol which is clogged. protocol_id: ProtocolId, + /// Copy of the messages that are within the buffer, for further diagnostic. + messages: Vec, }, } @@ -298,9 +300,10 @@ where self.substreams.push(substream); return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(event))) }, - Ok(Async::Ready(Some(RegisteredProtocolEvent::Clogged))) => { + Ok(Async::Ready(Some(RegisteredProtocolEvent::Clogged { messages }))) => { let event = CustomProtosHandlerOut::Clogged { - protocol_id: substream.protocol_id() + protocol_id: substream.protocol_id(), + messages, }; self.substreams.push(substream); return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(event))) diff --git a/substrate/core/network-libp2p/src/custom_proto/upgrade.rs b/substrate/core/network-libp2p/src/custom_proto/upgrade.rs index 69de76cc35..a2dfcf724a 100644 --- a/substrate/core/network-libp2p/src/custom_proto/upgrade.rs +++ b/substrate/core/network-libp2p/src/custom_proto/upgrade.rs @@ -124,9 +124,13 @@ impl RegisteredProtocolSubstream { pub enum RegisteredProtocolEvent { /// Received a message from the remote. Message(Bytes), + /// Diagnostic event indicating that the connection is clogged and we should avoid sending too /// many messages to it. - Clogged, + Clogged { + /// Copy of the messages that are within the buffer, for further diagnostic. + messages: Vec, + }, } impl Stream for RegisteredProtocolSubstream @@ -159,7 +163,9 @@ where TSubstream: AsyncRead + AsyncWrite, // if you remove the fuse, then we will always return early from this function and // thus never read any message from the network. self.clogged_fuse = true; - return Ok(Async::Ready(Some(RegisteredProtocolEvent::Clogged))) + return Ok(Async::Ready(Some(RegisteredProtocolEvent::Clogged { + messages: self.send_queue.iter().cloned().collect(), + }))) } } else { self.clogged_fuse = false; diff --git a/substrate/core/network-libp2p/src/service_task.rs b/substrate/core/network-libp2p/src/service_task.rs index 1ccba87a53..390bb86ebc 100644 --- a/substrate/core/network-libp2p/src/service_task.rs +++ b/substrate/core/network-libp2p/src/service_task.rs @@ -182,6 +182,8 @@ pub enum ServiceEvent { node_index: NodeIndex, /// Protocol which generated the message. protocol_id: ProtocolId, + /// Copy of the messages that are within the buffer, for further diagnostic. + messages: Vec, }, } @@ -378,11 +380,12 @@ impl Service { data, }))) } - Ok(Async::Ready(Some(BehaviourOut::Clogged { protocol_id, peer_id }))) => { + Ok(Async::Ready(Some(BehaviourOut::Clogged { protocol_id, peer_id, messages }))) => { let node_index = *self.index_by_id.get(&peer_id).expect("index_by_id is always kept in sync with the state of the behaviour"); break Ok(Async::Ready(Some(ServiceEvent::Clogged { node_index, protocol_id, + messages, }))) } Ok(Async::Ready(Some(BehaviourOut::Identified { peer_id, info }))) => { diff --git a/substrate/core/network/src/protocol.rs b/substrate/core/network/src/protocol.rs index ca7f6889b1..c89da41d6e 100644 --- a/substrate/core/network/src/protocol.rs +++ b/substrate/core/network/src/protocol.rs @@ -16,6 +16,7 @@ use std::collections::{HashMap, HashSet, BTreeMap}; use std::cmp; +use std::io::Cursor; use std::sync::Arc; use std::time; use parking_lot::RwLock; @@ -353,13 +354,29 @@ impl, H: ExHashT> Protocol { /// Called as a back-pressure mechanism if the networking detects that the peer cannot process /// our messaging rate fast enough. - pub fn on_clogged_peer(&self, io: &mut SyncIo, who: NodeIndex) { + pub fn on_clogged_peer<'a>( + &self, + _io: &mut SyncIo, + who: NodeIndex, + clogging_messages: impl ExactSizeIterator + ) { // We don't do anything but print some diagnostics for now. if let Some(peer) = self.context_data.peers.read().get(&who) { debug!(target: "sync", "Clogged peer {} (protocol_version: {:?}; roles: {:?}; \ known_extrinsics: {:?}; known_blocks: {:?}; best_hash: {:?}; best_number: {:?})", who, peer.protocol_version, peer.roles, peer.known_extrinsics, peer.known_blocks, peer.best_hash, peer.best_number); + } else { + debug!(target: "sync", "Peer clogged before being properly connected"); + } + + debug!(target: "sync", "{} clogging messages:", clogging_messages.len()); + for msg_bytes in clogging_messages { + if let Some(msg) = as Decode>::decode(&mut Cursor::new(msg_bytes)) { + debug!(target: "sync", "{:?}", msg); + } else { + debug!(target: "sync", "{:?}", msg_bytes) + } } } diff --git a/substrate/core/network/src/service.rs b/substrate/core/network/src/service.rs index c05eff9586..952fb4cc8f 100644 --- a/substrate/core/network/src/service.rs +++ b/substrate/core/network/src/service.rs @@ -401,8 +401,9 @@ fn run_thread, H: ExHashT>( NetworkServiceEvent::CustomMessage { node_index, data, .. } => { protocol.handle_packet(&mut net_sync, node_index, &data); } - NetworkServiceEvent::Clogged { node_index, .. } => { - protocol.on_clogged_peer(&mut net_sync, node_index); + NetworkServiceEvent::Clogged { node_index, messages, .. } => { + protocol.on_clogged_peer(&mut net_sync, node_index, + messages.iter().map(|d| d.as_ref())); } };