diff --git a/substrate/client/finality-grandpa-warp-sync/src/lib.rs b/substrate/client/finality-grandpa-warp-sync/src/lib.rs index d22d74c2fa..cae28173f0 100644 --- a/substrate/client/finality-grandpa-warp-sync/src/lib.rs +++ b/substrate/client/finality-grandpa-warp-sync/src/lib.rs @@ -18,7 +18,7 @@ //! [`crate::request_responses::RequestResponsesBehaviour`]. use codec::Decode; -use sc_network::config::{ProtocolId, IncomingRequest, RequestResponseConfig}; +use sc_network::config::{IncomingRequest, OutgoingResponse, ProtocolId, RequestResponseConfig}; use sc_client_api::Backend; use sp_runtime::traits::NumberFor; use futures::channel::{mpsc, oneshot}; @@ -113,7 +113,7 @@ impl> GrandpaWarpSyncRequestHandler, - pending_response: oneshot::Sender> + pending_response: oneshot::Sender ) -> Result<(), HandleRequestError> where NumberFor: sc_finality_grandpa::BlockNumberOps, { @@ -124,8 +124,10 @@ impl> GrandpaWarpSyncRequestHandler NetworkBehaviourEventProcess { + for change in changes { + self.substrate.report_peer(peer, change); + } + } } } } diff --git a/substrate/client/network/src/block_request_handler.rs b/substrate/client/network/src/block_request_handler.rs index c88be52ecf..1a6c09eff1 100644 --- a/substrate/client/network/src/block_request_handler.rs +++ b/substrate/client/network/src/block_request_handler.rs @@ -21,7 +21,7 @@ use codec::{Encode, Decode}; use crate::chain::Client; use crate::config::ProtocolId; use crate::protocol::{message::BlockAttributes}; -use crate::request_responses::{IncomingRequest, ProtocolConfig}; +use crate::request_responses::{IncomingRequest, OutgoingResponse, ProtocolConfig}; use crate::schema::v1::block_request::FromBlock; use crate::schema::v1::{BlockResponse, Direction}; use futures::channel::{mpsc, oneshot}; @@ -85,7 +85,7 @@ impl BlockRequestHandler { fn handle_request( &self, payload: Vec, - pending_response: oneshot::Sender> + pending_response: oneshot::Sender ) -> Result<(), HandleRequestError> { let request = crate::schema::v1::BlockRequest::decode(&payload[..])?; @@ -181,8 +181,10 @@ impl BlockRequestHandler { let mut data = Vec::with_capacity(res.encoded_len()); res.encode(&mut data)?; - pending_response.send(data) - .map_err(|_| HandleRequestError::SendResponse) + pending_response.send(OutgoingResponse { + result: Ok(data), + reputation_changes: Vec::new(), + }).map_err(|_| HandleRequestError::SendResponse) } /// Run [`BlockRequestHandler`]. diff --git a/substrate/client/network/src/config.rs b/substrate/client/network/src/config.rs index c0e2c66482..29d238c368 100644 --- a/substrate/client/network/src/config.rs +++ b/substrate/client/network/src/config.rs @@ -23,7 +23,11 @@ pub use crate::chain::Client; pub use crate::on_demand_layer::{AlwaysBadChecker, OnDemand}; -pub use crate::request_responses::{IncomingRequest, ProtocolConfig as RequestResponseConfig}; +pub use crate::request_responses::{ + IncomingRequest, + OutgoingResponse, + ProtocolConfig as RequestResponseConfig, +}; pub use libp2p::{identity, core::PublicKey, wasm_ext::ExtTransport, build_multiaddr}; // Note: this re-export shouldn't be part of the public API of the crate and will be removed in diff --git a/substrate/client/network/src/request_responses.rs b/substrate/client/network/src/request_responses.rs index 575f483b09..9170644c3f 100644 --- a/substrate/client/network/src/request_responses.rs +++ b/substrate/client/network/src/request_responses.rs @@ -55,6 +55,7 @@ use std::{ pin::Pin, task::{Context, Poll}, time::Duration, }; use wasm_timer::Instant; +use crate::ReputationChange; pub use libp2p::request_response::{InboundFailure, OutboundFailure, RequestId}; @@ -114,8 +115,27 @@ pub struct IncomingRequest { /// [`ProtocolConfig::max_request_size`]. pub payload: Vec, - /// Channel to send back the response to. - pub pending_response: oneshot::Sender>, + /// Channel to send back the response. + /// + /// There are two ways to indicate that handling the request failed: + /// + /// 1. Drop `pending_response` and thus not changing the reputation of the peer. + /// + /// 2. Sending an `Err(())` via `pending_response`, optionally including reputation changes for + /// the given peer. + pub pending_response: oneshot::Sender, +} + +/// Response for an incoming request to be send by a request protocol handler. +#[derive(Debug)] +pub struct OutgoingResponse { + /// The payload of the response. + /// + /// `Err(())` if none is available e.g. due an error while handling the request. + pub result: Result, ()>, + /// Reputation changes accrued while handling the request. To be applied to the reputation of + /// the peer sending the request. + pub reputation_changes: Vec, } /// Event generated by the [`RequestResponsesBehaviour`]. @@ -150,6 +170,12 @@ pub enum Event { /// Result of the request. result: Result<(), RequestFailure> }, + + /// A request protocol handler issued reputation changes for the given peer. + ReputationChanges { + peer: PeerId, + changes: Vec, + } } /// Combination of a protocol name and a request id. @@ -198,10 +224,11 @@ pub struct RequestResponsesBehaviour { /// Generated by the response builder and waiting to be processed. struct RequestProcessingOutcome { + peer: PeerId, request_id: RequestId, protocol: Cow<'static, str>, inner_channel: ResponseChannel, ()>>, - response: Vec, + response: OutgoingResponse, } impl RequestResponsesBehaviour { @@ -406,30 +433,45 @@ impl NetworkBehaviour for RequestResponsesBehaviour { // Poll to see if any response is ready to be sent back. while let Poll::Ready(Some(outcome)) = self.pending_responses.poll_next_unpin(cx) { let RequestProcessingOutcome { + peer, request_id, protocol: protocol_name, inner_channel, - response + response: OutgoingResponse { + result, + reputation_changes, + }, } = match outcome { Some(outcome) => outcome, - // The response builder was too busy and thus the request was dropped. This is + // The response builder was too busy or handling the request failed. This is // later on reported as a `InboundFailure::Omission`. None => continue, }; - if let Some((protocol, _)) = self.protocols.get_mut(&*protocol_name) { - if let Err(_) = protocol.send_response(inner_channel, Ok(response)) { - // Note: Failure is handled further below when receiving `InboundFailure` - // event from `RequestResponse` behaviour. - log::debug!( - target: "sub-libp2p", - "Failed to send response for {:?} on protocol {:?} due to a \ - timeout or due to the connection to the peer being closed. \ - Dropping response", - request_id, protocol_name, - ); + if let Ok(payload) = result { + if let Some((protocol, _)) = self.protocols.get_mut(&*protocol_name) { + if let Err(_) = protocol.send_response(inner_channel, Ok(payload)) { + // Note: Failure is handled further below when receiving + // `InboundFailure` event from `RequestResponse` behaviour. + log::debug!( + target: "sub-libp2p", + "Failed to send response for {:?} on protocol {:?} due to a \ + timeout or due to the connection to the peer being closed. \ + Dropping response", + request_id, protocol_name, + ); + } } } + + if !reputation_changes.is_empty() { + return Poll::Ready(NetworkBehaviourAction::GenerateEvent( + Event::ReputationChanges{ + peer, + changes: reputation_changes, + }, + )); + } } // Poll request-responses protocols. @@ -505,7 +547,7 @@ impl NetworkBehaviour for RequestResponsesBehaviour { // `InboundFailure::Omission` event. if let Ok(response) = rx.await { Some(RequestProcessingOutcome { - request_id, protocol, inner_channel: channel, response + peer, request_id, protocol, inner_channel: channel, response }) } else { None @@ -851,7 +893,10 @@ mod tests { pool.spawner().spawn_obj(async move { while let Some(rq) = rx.next().await { assert_eq!(rq.payload, b"this is a request"); - let _ = rq.pending_response.send(b"this is a response".to_vec()); + let _ = rq.pending_response.send(super::OutgoingResponse { + result: Ok(b"this is a response".to_vec()), + reputation_changes: Vec::new(), + }); } }.boxed().into()).unwrap(); @@ -934,7 +979,10 @@ mod tests { pool.spawner().spawn_obj(async move { while let Some(rq) = rx.next().await { assert_eq!(rq.payload, b"this is a request"); - let _ = rq.pending_response.send(b"this response exceeds the limit".to_vec()); + let _ = rq.pending_response.send(super::OutgoingResponse { + result: Ok(b"this response exceeds the limit".to_vec()), + reputation_changes: Vec::new(), + }); } }.boxed().into()).unwrap(); @@ -1100,11 +1148,17 @@ mod tests { protocol_1_request.unwrap() .pending_response - .send(b"this is a response".to_vec()) + .send(OutgoingResponse { + result: Ok(b"this is a response".to_vec()), + reputation_changes: Vec::new(), + }) .unwrap(); protocol_2_request.unwrap() .pending_response - .send(b"this is a response".to_vec()) + .send(OutgoingResponse { + result: Ok(b"this is a response".to_vec()), + reputation_changes: Vec::new(), + }) .unwrap(); }.boxed().into()).unwrap();