mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-04-30 18:57:57 +00:00
Remove legacy Clogged event (#6652)
This commit is contained in:
@@ -99,9 +99,6 @@ const LIGHT_MAXIMAL_BLOCKS_DIFFERENCE: u64 = 8192;
|
||||
|
||||
mod rep {
|
||||
use sc_peerset::ReputationChange as Rep;
|
||||
/// Reputation change when a peer is "clogged", meaning that it's not fast enough to process our
|
||||
/// messages.
|
||||
pub const CLOGGED_PEER: Rep = Rep::new(-(1 << 12), "Clogged message queue");
|
||||
/// Reputation change when a peer doesn't respond in time to our messages.
|
||||
pub const TIMEOUT: Rep = Rep::new(-(1 << 10), "Request timeout");
|
||||
/// Reputation change when we are a light client and a peer is behind us.
|
||||
@@ -742,22 +739,6 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Called as a back-pressure mechanism if the networking detects that the peer cannot process
|
||||
/// our messaging rate fast enough.
|
||||
pub fn on_clogged_peer(&self, who: PeerId, _msg: Option<Message<B>>) {
|
||||
self.peerset_handle.report_peer(who.clone(), rep::CLOGGED_PEER);
|
||||
|
||||
// Print some diagnostics.
|
||||
if let Some(peer) = self.context_data.peers.get(&who) {
|
||||
debug!(target: "sync", "Clogged peer {} (protocol_version: {:?}; roles: {:?}; \
|
||||
known_transactions: {:?}; known_blocks: {:?}; best_hash: {:?}; best_number: {:?})",
|
||||
who, peer.info.protocol_version, peer.info.roles, peer.known_transactions, peer.known_blocks,
|
||||
peer.info.best_hash, peer.info.best_number);
|
||||
} else {
|
||||
debug!(target: "sync", "Peer clogged before being properly connected");
|
||||
}
|
||||
}
|
||||
|
||||
fn on_block_request(&mut self, peer: PeerId, request: message::BlockRequest<B>) {
|
||||
trace!(target: "sync", "BlockRequest {} from {}: from {:?} to {:?} max {:?} for {:?}",
|
||||
request.id,
|
||||
@@ -2101,15 +2082,6 @@ impl<B: BlockT, H: ExHashT> NetworkBehaviour for Protocol<B, H> {
|
||||
CustomMessageOutcome::None
|
||||
}
|
||||
}
|
||||
GenericProtoOut::Clogged { peer_id, messages } => {
|
||||
debug!(target: "sync", "{} clogging messages:", messages.len());
|
||||
for msg in messages.into_iter().take(5) {
|
||||
let message: Option<Message<B>> = Decode::decode(&mut &msg[..]).ok();
|
||||
debug!(target: "sync", "{:?}", message);
|
||||
self.on_clogged_peer(peer_id.clone(), message);
|
||||
}
|
||||
CustomMessageOutcome::None
|
||||
}
|
||||
};
|
||||
|
||||
if let CustomMessageOutcome::None = outcome {
|
||||
|
||||
@@ -310,15 +310,6 @@ pub enum GenericProtoOut {
|
||||
/// Message that has been received.
|
||||
message: BytesMut,
|
||||
},
|
||||
|
||||
/// The substream used by the protocol is pretty large. We should print avoid sending more
|
||||
/// messages on it if possible.
|
||||
Clogged {
|
||||
/// Id of the peer which is clogged.
|
||||
peer_id: PeerId,
|
||||
/// Copy of the messages that are within the buffer, for further diagnostic.
|
||||
messages: Vec<Vec<u8>>,
|
||||
},
|
||||
}
|
||||
|
||||
impl GenericProto {
|
||||
@@ -1312,18 +1303,6 @@ impl NetworkBehaviour for GenericProto {
|
||||
self.events.push_back(NetworkBehaviourAction::GenerateEvent(event));
|
||||
}
|
||||
|
||||
NotifsHandlerOut::Clogged { messages } => {
|
||||
debug_assert!(self.is_open(&source));
|
||||
trace!(target: "sub-libp2p", "Handler({:?}) => Clogged", source);
|
||||
trace!(target: "sub-libp2p", "External API <= Clogged({:?})", source);
|
||||
warn!(target: "sub-libp2p", "Queue of packets to send to {:?} is \
|
||||
pretty large", source);
|
||||
self.events.push_back(NetworkBehaviourAction::GenerateEvent(GenericProtoOut::Clogged {
|
||||
peer_id: source,
|
||||
messages,
|
||||
}));
|
||||
}
|
||||
|
||||
// Don't do anything for non-severe errors except report them.
|
||||
NotifsHandlerOut::ProtocolError { is_severe, ref error } if !is_severe => {
|
||||
debug!(target: "sub-libp2p", "Handler({:?}) => Benign protocol error: {:?}",
|
||||
|
||||
@@ -215,13 +215,6 @@ pub enum NotifsHandlerOut {
|
||||
message: BytesMut,
|
||||
},
|
||||
|
||||
/// A substream to the remote is clogged. The send buffer is very large, and we should print
|
||||
/// a diagnostic message and/or avoid sending more data.
|
||||
Clogged {
|
||||
/// Copy of the messages that are within the buffer, for further diagnostic.
|
||||
messages: Vec<Vec<u8>>,
|
||||
},
|
||||
|
||||
/// An error has happened on the protocol level with this node.
|
||||
ProtocolError {
|
||||
/// If true the error is severe, such as a protocol violation.
|
||||
@@ -484,10 +477,6 @@ impl ProtocolsHandler for NotifsHandler {
|
||||
Poll::Ready(ProtocolsHandlerEvent::Custom(
|
||||
NotifsHandlerOut::CustomMessage { message }
|
||||
)),
|
||||
ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::Clogged { messages }) =>
|
||||
Poll::Ready(ProtocolsHandlerEvent::Custom(
|
||||
NotifsHandlerOut::Clogged { messages }
|
||||
)),
|
||||
ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::ProtocolError { is_severe, error }) =>
|
||||
Poll::Ready(ProtocolsHandlerEvent::Custom(
|
||||
NotifsHandlerOut::ProtocolError { is_severe, error }
|
||||
|
||||
@@ -236,13 +236,6 @@ pub enum LegacyProtoHandlerOut {
|
||||
message: BytesMut,
|
||||
},
|
||||
|
||||
/// A substream to the remote is clogged. The send buffer is very large, and we should print
|
||||
/// a diagnostic message and/or avoid sending more data.
|
||||
Clogged {
|
||||
/// Copy of the messages that are within the buffer, for further diagnostic.
|
||||
messages: Vec<Vec<u8>>,
|
||||
},
|
||||
|
||||
/// An error has happened on the protocol level with this node.
|
||||
ProtocolError {
|
||||
/// If true the error is severe, such as a protocol violation.
|
||||
@@ -395,13 +388,19 @@ impl LegacyProtoHandler {
|
||||
self.state = ProtocolState::Normal { substreams, shutdown };
|
||||
return Some(ProtocolsHandlerEvent::Custom(event));
|
||||
},
|
||||
Poll::Ready(Some(Ok(RegisteredProtocolEvent::Clogged { messages }))) => {
|
||||
let event = LegacyProtoHandlerOut::Clogged {
|
||||
messages,
|
||||
};
|
||||
substreams.push(substream);
|
||||
self.state = ProtocolState::Normal { substreams, shutdown };
|
||||
return Some(ProtocolsHandlerEvent::Custom(event));
|
||||
Poll::Ready(Some(Ok(RegisteredProtocolEvent::Clogged))) => {
|
||||
shutdown.push(substream);
|
||||
if substreams.is_empty() {
|
||||
let event = LegacyProtoHandlerOut::CustomProtocolClosed {
|
||||
reason: "Legacy substream clogged".into(),
|
||||
endpoint: self.endpoint.clone()
|
||||
};
|
||||
self.state = ProtocolState::Disabled {
|
||||
shutdown: shutdown.into_iter().collect(),
|
||||
reenable: true
|
||||
};
|
||||
return Some(ProtocolsHandlerEvent::Custom(event));
|
||||
}
|
||||
}
|
||||
Poll::Ready(None) => {
|
||||
shutdown.push(substream);
|
||||
|
||||
@@ -142,10 +142,7 @@ pub enum RegisteredProtocolEvent {
|
||||
|
||||
/// Diagnostic event indicating that the connection is clogged and we should avoid sending too
|
||||
/// many messages to it.
|
||||
Clogged {
|
||||
/// Copy of the messages that are within the buffer, for further diagnostic.
|
||||
messages: Vec<Vec<u8>>,
|
||||
},
|
||||
Clogged,
|
||||
}
|
||||
|
||||
impl<TSubstream> Stream for RegisteredProtocolSubstream<TSubstream>
|
||||
@@ -183,11 +180,7 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin {
|
||||
// if you remove the fuse, then we will always return early from this function and
|
||||
// thus never read any message from the network.
|
||||
self.clogged_fuse = true;
|
||||
return Poll::Ready(Some(Ok(RegisteredProtocolEvent::Clogged {
|
||||
messages: self.send_queue.iter()
|
||||
.map(|m| m.clone().to_vec())
|
||||
.collect(),
|
||||
})))
|
||||
return Poll::Ready(Some(Ok(RegisteredProtocolEvent::Clogged)))
|
||||
}
|
||||
} else {
|
||||
self.clogged_fuse = false;
|
||||
|
||||
Reference in New Issue
Block a user