Log the clogging networking messages (#1532)

This commit is contained in:
Pierre Krieger
2019-01-23 15:24:25 +01:00
committed by Gav Wood
parent cd86643f33
commit d30a8e0bc3
7 changed files with 46 additions and 11 deletions
@@ -166,6 +166,8 @@ pub enum BehaviourOut {
peer_id: PeerId, peer_id: PeerId,
/// Protocol which generated the message. /// Protocol which generated the message.
protocol_id: ProtocolId, protocol_id: ProtocolId,
/// Copy of the messages that are within the buffer, for further diagnostic.
messages: Vec<Bytes>,
}, },
/// We have obtained debug information from a peer. /// We have obtained debug information from a peer.
@@ -189,8 +191,8 @@ impl From<CustomProtosOut> for BehaviourOut {
CustomProtosOut::CustomMessage { protocol_id, peer_id, data } => { CustomProtosOut::CustomMessage { protocol_id, peer_id, data } => {
BehaviourOut::CustomMessage { protocol_id, peer_id, data } BehaviourOut::CustomMessage { protocol_id, peer_id, data }
} }
CustomProtosOut::Clogged { protocol_id, peer_id } => { CustomProtosOut::Clogged { protocol_id, peer_id, messages } => {
BehaviourOut::Clogged { protocol_id, peer_id } BehaviourOut::Clogged { protocol_id, peer_id, messages }
} }
} }
} }
@@ -115,6 +115,8 @@ pub enum CustomProtosOut {
peer_id: PeerId, peer_id: PeerId,
/// Protocol which has a problem. /// Protocol which has a problem.
protocol_id: ProtocolId, protocol_id: ProtocolId,
/// Copy of the messages that are within the buffer, for further diagnostic.
messages: Vec<Bytes>,
}, },
} }
@@ -445,12 +447,13 @@ where
self.events.push(NetworkBehaviourAction::GenerateEvent(event)); self.events.push(NetworkBehaviourAction::GenerateEvent(event));
} }
CustomProtosHandlerOut::Clogged { protocol_id } => { CustomProtosHandlerOut::Clogged { protocol_id, messages } => {
warn!(target: "sub-libp2p", "Queue of packets to send to {:?} (protocol: {:?}) is \ warn!(target: "sub-libp2p", "Queue of packets to send to {:?} (protocol: {:?}) is \
pretty large", source, protocol_id); pretty large", source, protocol_id);
self.events.push(NetworkBehaviourAction::GenerateEvent(CustomProtosOut::Clogged { self.events.push(NetworkBehaviourAction::GenerateEvent(CustomProtosOut::Clogged {
peer_id: source, peer_id: source,
protocol_id, protocol_id,
messages,
})); }));
} }
} }
@@ -117,6 +117,8 @@ pub enum CustomProtosHandlerOut {
Clogged { Clogged {
/// Protocol which is clogged. /// Protocol which is clogged.
protocol_id: ProtocolId, protocol_id: ProtocolId,
/// Copy of the messages that are within the buffer, for further diagnostic.
messages: Vec<Bytes>,
}, },
} }
@@ -298,9 +300,10 @@ where
self.substreams.push(substream); self.substreams.push(substream);
return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(event))) return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(event)))
}, },
Ok(Async::Ready(Some(RegisteredProtocolEvent::Clogged))) => { Ok(Async::Ready(Some(RegisteredProtocolEvent::Clogged { messages }))) => {
let event = CustomProtosHandlerOut::Clogged { let event = CustomProtosHandlerOut::Clogged {
protocol_id: substream.protocol_id() protocol_id: substream.protocol_id(),
messages,
}; };
self.substreams.push(substream); self.substreams.push(substream);
return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(event))) return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(event)))
@@ -124,9 +124,13 @@ impl<TSubstream> RegisteredProtocolSubstream<TSubstream> {
pub enum RegisteredProtocolEvent { pub enum RegisteredProtocolEvent {
/// Received a message from the remote. /// Received a message from the remote.
Message(Bytes), Message(Bytes),
/// Diagnostic event indicating that the connection is clogged and we should avoid sending too /// Diagnostic event indicating that the connection is clogged and we should avoid sending too
/// many messages to it. /// many messages to it.
Clogged, Clogged {
/// Copy of the messages that are within the buffer, for further diagnostic.
messages: Vec<Bytes>,
},
} }
impl<TSubstream> Stream for RegisteredProtocolSubstream<TSubstream> impl<TSubstream> Stream for RegisteredProtocolSubstream<TSubstream>
@@ -159,7 +163,9 @@ where TSubstream: AsyncRead + AsyncWrite,
// if you remove the fuse, then we will always return early from this function and // if you remove the fuse, then we will always return early from this function and
// thus never read any message from the network. // thus never read any message from the network.
self.clogged_fuse = true; self.clogged_fuse = true;
return Ok(Async::Ready(Some(RegisteredProtocolEvent::Clogged))) return Ok(Async::Ready(Some(RegisteredProtocolEvent::Clogged {
messages: self.send_queue.iter().cloned().collect(),
})))
} }
} else { } else {
self.clogged_fuse = false; self.clogged_fuse = false;
@@ -182,6 +182,8 @@ pub enum ServiceEvent {
node_index: NodeIndex, node_index: NodeIndex,
/// Protocol which generated the message. /// Protocol which generated the message.
protocol_id: ProtocolId, protocol_id: ProtocolId,
/// Copy of the messages that are within the buffer, for further diagnostic.
messages: Vec<Bytes>,
}, },
} }
@@ -378,11 +380,12 @@ impl Service {
data, data,
}))) })))
} }
Ok(Async::Ready(Some(BehaviourOut::Clogged { protocol_id, peer_id }))) => { Ok(Async::Ready(Some(BehaviourOut::Clogged { protocol_id, peer_id, messages }))) => {
let node_index = *self.index_by_id.get(&peer_id).expect("index_by_id is always kept in sync with the state of the behaviour"); let node_index = *self.index_by_id.get(&peer_id).expect("index_by_id is always kept in sync with the state of the behaviour");
break Ok(Async::Ready(Some(ServiceEvent::Clogged { break Ok(Async::Ready(Some(ServiceEvent::Clogged {
node_index, node_index,
protocol_id, protocol_id,
messages,
}))) })))
} }
Ok(Async::Ready(Some(BehaviourOut::Identified { peer_id, info }))) => { Ok(Async::Ready(Some(BehaviourOut::Identified { peer_id, info }))) => {
+18 -1
View File
@@ -16,6 +16,7 @@
use std::collections::{HashMap, HashSet, BTreeMap}; use std::collections::{HashMap, HashSet, BTreeMap};
use std::cmp; use std::cmp;
use std::io::Cursor;
use std::sync::Arc; use std::sync::Arc;
use std::time; use std::time;
use parking_lot::RwLock; use parking_lot::RwLock;
@@ -353,13 +354,29 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
/// Called as a back-pressure mechanism if the networking detects that the peer cannot process /// Called as a back-pressure mechanism if the networking detects that the peer cannot process
/// our messaging rate fast enough. /// our messaging rate fast enough.
pub fn on_clogged_peer(&self, io: &mut SyncIo, who: NodeIndex) { pub fn on_clogged_peer<'a>(
&self,
_io: &mut SyncIo,
who: NodeIndex,
clogging_messages: impl ExactSizeIterator<Item = &'a [u8]>
) {
// We don't do anything but print some diagnostics for now. // We don't do anything but print some diagnostics for now.
if let Some(peer) = self.context_data.peers.read().get(&who) { if let Some(peer) = self.context_data.peers.read().get(&who) {
debug!(target: "sync", "Clogged peer {} (protocol_version: {:?}; roles: {:?}; \ debug!(target: "sync", "Clogged peer {} (protocol_version: {:?}; roles: {:?}; \
known_extrinsics: {:?}; known_blocks: {:?}; best_hash: {:?}; best_number: {:?})", known_extrinsics: {:?}; known_blocks: {:?}; best_hash: {:?}; best_number: {:?})",
who, peer.protocol_version, peer.roles, peer.known_extrinsics, peer.known_blocks, who, peer.protocol_version, peer.roles, peer.known_extrinsics, peer.known_blocks,
peer.best_hash, peer.best_number); peer.best_hash, peer.best_number);
} else {
debug!(target: "sync", "Peer clogged before being properly connected");
}
debug!(target: "sync", "{} clogging messages:", clogging_messages.len());
for msg_bytes in clogging_messages {
if let Some(msg) = <Message<B> as Decode>::decode(&mut Cursor::new(msg_bytes)) {
debug!(target: "sync", "{:?}", msg);
} else {
debug!(target: "sync", "{:?}", msg_bytes)
}
} }
} }
+3 -2
View File
@@ -401,8 +401,9 @@ fn run_thread<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT>(
NetworkServiceEvent::CustomMessage { node_index, data, .. } => { NetworkServiceEvent::CustomMessage { node_index, data, .. } => {
protocol.handle_packet(&mut net_sync, node_index, &data); protocol.handle_packet(&mut net_sync, node_index, &data);
} }
NetworkServiceEvent::Clogged { node_index, .. } => { NetworkServiceEvent::Clogged { node_index, messages, .. } => {
protocol.on_clogged_peer(&mut net_sync, node_index); protocol.on_clogged_peer(&mut net_sync, node_index,
messages.iter().map(|d| d.as_ref()));
} }
}; };