diff --git a/substrate/client/network/src/protocol.rs b/substrate/client/network/src/protocol.rs index d98ba8d323..601c7d9c99 100644 --- a/substrate/client/network/src/protocol.rs +++ b/substrate/client/network/src/protocol.rs @@ -99,9 +99,6 @@ const LIGHT_MAXIMAL_BLOCKS_DIFFERENCE: u64 = 8192; mod rep { use sc_peerset::ReputationChange as Rep; - /// Reputation change when a peer is "clogged", meaning that it's not fast enough to process our - /// messages. - pub const CLOGGED_PEER: Rep = Rep::new(-(1 << 12), "Clogged message queue"); /// Reputation change when a peer doesn't respond in time to our messages. pub const TIMEOUT: Rep = Rep::new(-(1 << 10), "Request timeout"); /// Reputation change when we are a light client and a peer is behind us. @@ -742,22 +739,6 @@ impl Protocol { } } - /// Called as a back-pressure mechanism if the networking detects that the peer cannot process - /// our messaging rate fast enough. - pub fn on_clogged_peer(&self, who: PeerId, _msg: Option>) { - self.peerset_handle.report_peer(who.clone(), rep::CLOGGED_PEER); - - // Print some diagnostics. - if let Some(peer) = self.context_data.peers.get(&who) { - debug!(target: "sync", "Clogged peer {} (protocol_version: {:?}; roles: {:?}; \ - known_transactions: {:?}; known_blocks: {:?}; best_hash: {:?}; best_number: {:?})", - who, peer.info.protocol_version, peer.info.roles, peer.known_transactions, peer.known_blocks, - peer.info.best_hash, peer.info.best_number); - } else { - debug!(target: "sync", "Peer clogged before being properly connected"); - } - } - fn on_block_request(&mut self, peer: PeerId, request: message::BlockRequest) { trace!(target: "sync", "BlockRequest {} from {}: from {:?} to {:?} max {:?} for {:?}", request.id, @@ -2101,15 +2082,6 @@ impl NetworkBehaviour for Protocol { CustomMessageOutcome::None } } - GenericProtoOut::Clogged { peer_id, messages } => { - debug!(target: "sync", "{} clogging messages:", messages.len()); - for msg in messages.into_iter().take(5) { - let message: Option> = Decode::decode(&mut &msg[..]).ok(); - debug!(target: "sync", "{:?}", message); - self.on_clogged_peer(peer_id.clone(), message); - } - CustomMessageOutcome::None - } }; if let CustomMessageOutcome::None = outcome { diff --git a/substrate/client/network/src/protocol/generic_proto/behaviour.rs b/substrate/client/network/src/protocol/generic_proto/behaviour.rs index 48b75b6321..75f011d9f8 100644 --- a/substrate/client/network/src/protocol/generic_proto/behaviour.rs +++ b/substrate/client/network/src/protocol/generic_proto/behaviour.rs @@ -310,15 +310,6 @@ pub enum GenericProtoOut { /// Message that has been received. message: BytesMut, }, - - /// The substream used by the protocol is pretty large. We should print avoid sending more - /// messages on it if possible. - Clogged { - /// Id of the peer which is clogged. - peer_id: PeerId, - /// Copy of the messages that are within the buffer, for further diagnostic. - messages: Vec>, - }, } impl GenericProto { @@ -1312,18 +1303,6 @@ impl NetworkBehaviour for GenericProto { self.events.push_back(NetworkBehaviourAction::GenerateEvent(event)); } - NotifsHandlerOut::Clogged { messages } => { - debug_assert!(self.is_open(&source)); - trace!(target: "sub-libp2p", "Handler({:?}) => Clogged", source); - trace!(target: "sub-libp2p", "External API <= Clogged({:?})", source); - warn!(target: "sub-libp2p", "Queue of packets to send to {:?} is \ - pretty large", source); - self.events.push_back(NetworkBehaviourAction::GenerateEvent(GenericProtoOut::Clogged { - peer_id: source, - messages, - })); - } - // Don't do anything for non-severe errors except report them. NotifsHandlerOut::ProtocolError { is_severe, ref error } if !is_severe => { debug!(target: "sub-libp2p", "Handler({:?}) => Benign protocol error: {:?}", diff --git a/substrate/client/network/src/protocol/generic_proto/handler/group.rs b/substrate/client/network/src/protocol/generic_proto/handler/group.rs index ed3e564223..1364ecf39d 100644 --- a/substrate/client/network/src/protocol/generic_proto/handler/group.rs +++ b/substrate/client/network/src/protocol/generic_proto/handler/group.rs @@ -215,13 +215,6 @@ pub enum NotifsHandlerOut { message: BytesMut, }, - /// A substream to the remote is clogged. The send buffer is very large, and we should print - /// a diagnostic message and/or avoid sending more data. - Clogged { - /// Copy of the messages that are within the buffer, for further diagnostic. - messages: Vec>, - }, - /// An error has happened on the protocol level with this node. ProtocolError { /// If true the error is severe, such as a protocol violation. @@ -484,10 +477,6 @@ impl ProtocolsHandler for NotifsHandler { Poll::Ready(ProtocolsHandlerEvent::Custom( NotifsHandlerOut::CustomMessage { message } )), - ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::Clogged { messages }) => - Poll::Ready(ProtocolsHandlerEvent::Custom( - NotifsHandlerOut::Clogged { messages } - )), ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::ProtocolError { is_severe, error }) => Poll::Ready(ProtocolsHandlerEvent::Custom( NotifsHandlerOut::ProtocolError { is_severe, error } diff --git a/substrate/client/network/src/protocol/generic_proto/handler/legacy.rs b/substrate/client/network/src/protocol/generic_proto/handler/legacy.rs index c7de2d265e..1469380fb7 100644 --- a/substrate/client/network/src/protocol/generic_proto/handler/legacy.rs +++ b/substrate/client/network/src/protocol/generic_proto/handler/legacy.rs @@ -236,13 +236,6 @@ pub enum LegacyProtoHandlerOut { message: BytesMut, }, - /// A substream to the remote is clogged. The send buffer is very large, and we should print - /// a diagnostic message and/or avoid sending more data. - Clogged { - /// Copy of the messages that are within the buffer, for further diagnostic. - messages: Vec>, - }, - /// An error has happened on the protocol level with this node. ProtocolError { /// If true the error is severe, such as a protocol violation. @@ -395,13 +388,19 @@ impl LegacyProtoHandler { self.state = ProtocolState::Normal { substreams, shutdown }; return Some(ProtocolsHandlerEvent::Custom(event)); }, - Poll::Ready(Some(Ok(RegisteredProtocolEvent::Clogged { messages }))) => { - let event = LegacyProtoHandlerOut::Clogged { - messages, - }; - substreams.push(substream); - self.state = ProtocolState::Normal { substreams, shutdown }; - return Some(ProtocolsHandlerEvent::Custom(event)); + Poll::Ready(Some(Ok(RegisteredProtocolEvent::Clogged))) => { + shutdown.push(substream); + if substreams.is_empty() { + let event = LegacyProtoHandlerOut::CustomProtocolClosed { + reason: "Legacy substream clogged".into(), + endpoint: self.endpoint.clone() + }; + self.state = ProtocolState::Disabled { + shutdown: shutdown.into_iter().collect(), + reenable: true + }; + return Some(ProtocolsHandlerEvent::Custom(event)); + } } Poll::Ready(None) => { shutdown.push(substream); diff --git a/substrate/client/network/src/protocol/generic_proto/upgrade/legacy.rs b/substrate/client/network/src/protocol/generic_proto/upgrade/legacy.rs index 538532c1af..6a8619ee4e 100644 --- a/substrate/client/network/src/protocol/generic_proto/upgrade/legacy.rs +++ b/substrate/client/network/src/protocol/generic_proto/upgrade/legacy.rs @@ -142,10 +142,7 @@ pub enum RegisteredProtocolEvent { /// Diagnostic event indicating that the connection is clogged and we should avoid sending too /// many messages to it. - Clogged { - /// Copy of the messages that are within the buffer, for further diagnostic. - messages: Vec>, - }, + Clogged, } impl Stream for RegisteredProtocolSubstream @@ -183,11 +180,7 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin { // if you remove the fuse, then we will always return early from this function and // thus never read any message from the network. self.clogged_fuse = true; - return Poll::Ready(Some(Ok(RegisteredProtocolEvent::Clogged { - messages: self.send_queue.iter() - .map(|m| m.clone().to_vec()) - .collect(), - }))) + return Poll::Ready(Some(Ok(RegisteredProtocolEvent::Clogged))) } } else { self.clogged_fuse = false;