From b692954c6ed2d4d371153ac706b64aea20e02e36 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Tue, 26 Jan 2021 11:45:41 +0100 Subject: [PATCH] client/network: Report reputation changes via response (#7958) * client/network: Report reputation changes via response When handling a request by a remote peer in a request response handler, one might want to in- or de-crease the reputation of the peer. E.g. one might want to decrease the reputation slightly for each request, given that it forces the local node to do work, or one might want to issue a larger reputation change due to a malformed request by the remote peer. Instead of having to pass a peerset handle to each request response handler, this commit suggests to allow handlers to isssue reputation changes via the provided `pending_response` `oneshot` channel. A reputation change issued by a request response handler via the `pending_response` channel is received by the `RequestResponsesBehaviour` which passes the reputation change up as an event to eventually be send to a peerset via a peerset handle. * client/network/req-resp: Use Vec::new instead of None::> * client/network: Rename Response to OutgoingResponse Given that a request-response request is not called `Request` but `InomingRequest`, rename a request-response response to `OutgoingResponse`. * client/finality-grandpa-warp: Send empty rep change via response --- .../finality-grandpa-warp-sync/src/lib.rs | 10 +- substrate/client/network/src/behaviour.rs | 5 + .../network/src/block_request_handler.rs | 10 +- substrate/client/network/src/config.rs | 6 +- .../client/network/src/request_responses.rs | 96 +++++++++++++++---- 5 files changed, 97 insertions(+), 30 deletions(-) diff --git a/substrate/client/finality-grandpa-warp-sync/src/lib.rs b/substrate/client/finality-grandpa-warp-sync/src/lib.rs index d22d74c2fa..cae28173f0 100644 --- a/substrate/client/finality-grandpa-warp-sync/src/lib.rs +++ b/substrate/client/finality-grandpa-warp-sync/src/lib.rs @@ -18,7 +18,7 @@ //! [`crate::request_responses::RequestResponsesBehaviour`]. use codec::Decode; -use sc_network::config::{ProtocolId, IncomingRequest, RequestResponseConfig}; +use sc_network::config::{IncomingRequest, OutgoingResponse, ProtocolId, RequestResponseConfig}; use sc_client_api::Backend; use sp_runtime::traits::NumberFor; use futures::channel::{mpsc, oneshot}; @@ -113,7 +113,7 @@ impl> GrandpaWarpSyncRequestHandler, - pending_response: oneshot::Sender> + pending_response: oneshot::Sender ) -> Result<(), HandleRequestError> where NumberFor: sc_finality_grandpa::BlockNumberOps, { @@ -124,8 +124,10 @@ impl> GrandpaWarpSyncRequestHandler NetworkBehaviourEventProcess { + for change in changes { + self.substrate.report_peer(peer, change); + } + } } } } diff --git a/substrate/client/network/src/block_request_handler.rs b/substrate/client/network/src/block_request_handler.rs index c88be52ecf..1a6c09eff1 100644 --- a/substrate/client/network/src/block_request_handler.rs +++ b/substrate/client/network/src/block_request_handler.rs @@ -21,7 +21,7 @@ use codec::{Encode, Decode}; use crate::chain::Client; use crate::config::ProtocolId; use crate::protocol::{message::BlockAttributes}; -use crate::request_responses::{IncomingRequest, ProtocolConfig}; +use crate::request_responses::{IncomingRequest, OutgoingResponse, ProtocolConfig}; use crate::schema::v1::block_request::FromBlock; use crate::schema::v1::{BlockResponse, Direction}; use futures::channel::{mpsc, oneshot}; @@ -85,7 +85,7 @@ impl BlockRequestHandler { fn handle_request( &self, payload: Vec, - pending_response: oneshot::Sender> + pending_response: oneshot::Sender ) -> Result<(), HandleRequestError> { let request = crate::schema::v1::BlockRequest::decode(&payload[..])?; @@ -181,8 +181,10 @@ impl BlockRequestHandler { let mut data = Vec::with_capacity(res.encoded_len()); res.encode(&mut data)?; - pending_response.send(data) - .map_err(|_| HandleRequestError::SendResponse) + pending_response.send(OutgoingResponse { + result: Ok(data), + reputation_changes: Vec::new(), + }).map_err(|_| HandleRequestError::SendResponse) } /// Run [`BlockRequestHandler`]. diff --git a/substrate/client/network/src/config.rs b/substrate/client/network/src/config.rs index c0e2c66482..29d238c368 100644 --- a/substrate/client/network/src/config.rs +++ b/substrate/client/network/src/config.rs @@ -23,7 +23,11 @@ pub use crate::chain::Client; pub use crate::on_demand_layer::{AlwaysBadChecker, OnDemand}; -pub use crate::request_responses::{IncomingRequest, ProtocolConfig as RequestResponseConfig}; +pub use crate::request_responses::{ + IncomingRequest, + OutgoingResponse, + ProtocolConfig as RequestResponseConfig, +}; pub use libp2p::{identity, core::PublicKey, wasm_ext::ExtTransport, build_multiaddr}; // Note: this re-export shouldn't be part of the public API of the crate and will be removed in diff --git a/substrate/client/network/src/request_responses.rs b/substrate/client/network/src/request_responses.rs index 575f483b09..9170644c3f 100644 --- a/substrate/client/network/src/request_responses.rs +++ b/substrate/client/network/src/request_responses.rs @@ -55,6 +55,7 @@ use std::{ pin::Pin, task::{Context, Poll}, time::Duration, }; use wasm_timer::Instant; +use crate::ReputationChange; pub use libp2p::request_response::{InboundFailure, OutboundFailure, RequestId}; @@ -114,8 +115,27 @@ pub struct IncomingRequest { /// [`ProtocolConfig::max_request_size`]. pub payload: Vec, - /// Channel to send back the response to. - pub pending_response: oneshot::Sender>, + /// Channel to send back the response. + /// + /// There are two ways to indicate that handling the request failed: + /// + /// 1. Drop `pending_response` and thus not changing the reputation of the peer. + /// + /// 2. Sending an `Err(())` via `pending_response`, optionally including reputation changes for + /// the given peer. + pub pending_response: oneshot::Sender, +} + +/// Response for an incoming request to be send by a request protocol handler. +#[derive(Debug)] +pub struct OutgoingResponse { + /// The payload of the response. + /// + /// `Err(())` if none is available e.g. due an error while handling the request. + pub result: Result, ()>, + /// Reputation changes accrued while handling the request. To be applied to the reputation of + /// the peer sending the request. + pub reputation_changes: Vec, } /// Event generated by the [`RequestResponsesBehaviour`]. @@ -150,6 +170,12 @@ pub enum Event { /// Result of the request. result: Result<(), RequestFailure> }, + + /// A request protocol handler issued reputation changes for the given peer. + ReputationChanges { + peer: PeerId, + changes: Vec, + } } /// Combination of a protocol name and a request id. @@ -198,10 +224,11 @@ pub struct RequestResponsesBehaviour { /// Generated by the response builder and waiting to be processed. struct RequestProcessingOutcome { + peer: PeerId, request_id: RequestId, protocol: Cow<'static, str>, inner_channel: ResponseChannel, ()>>, - response: Vec, + response: OutgoingResponse, } impl RequestResponsesBehaviour { @@ -406,30 +433,45 @@ impl NetworkBehaviour for RequestResponsesBehaviour { // Poll to see if any response is ready to be sent back. while let Poll::Ready(Some(outcome)) = self.pending_responses.poll_next_unpin(cx) { let RequestProcessingOutcome { + peer, request_id, protocol: protocol_name, inner_channel, - response + response: OutgoingResponse { + result, + reputation_changes, + }, } = match outcome { Some(outcome) => outcome, - // The response builder was too busy and thus the request was dropped. This is + // The response builder was too busy or handling the request failed. This is // later on reported as a `InboundFailure::Omission`. None => continue, }; - if let Some((protocol, _)) = self.protocols.get_mut(&*protocol_name) { - if let Err(_) = protocol.send_response(inner_channel, Ok(response)) { - // Note: Failure is handled further below when receiving `InboundFailure` - // event from `RequestResponse` behaviour. - log::debug!( - target: "sub-libp2p", - "Failed to send response for {:?} on protocol {:?} due to a \ - timeout or due to the connection to the peer being closed. \ - Dropping response", - request_id, protocol_name, - ); + if let Ok(payload) = result { + if let Some((protocol, _)) = self.protocols.get_mut(&*protocol_name) { + if let Err(_) = protocol.send_response(inner_channel, Ok(payload)) { + // Note: Failure is handled further below when receiving + // `InboundFailure` event from `RequestResponse` behaviour. + log::debug!( + target: "sub-libp2p", + "Failed to send response for {:?} on protocol {:?} due to a \ + timeout or due to the connection to the peer being closed. \ + Dropping response", + request_id, protocol_name, + ); + } } } + + if !reputation_changes.is_empty() { + return Poll::Ready(NetworkBehaviourAction::GenerateEvent( + Event::ReputationChanges{ + peer, + changes: reputation_changes, + }, + )); + } } // Poll request-responses protocols. @@ -505,7 +547,7 @@ impl NetworkBehaviour for RequestResponsesBehaviour { // `InboundFailure::Omission` event. if let Ok(response) = rx.await { Some(RequestProcessingOutcome { - request_id, protocol, inner_channel: channel, response + peer, request_id, protocol, inner_channel: channel, response }) } else { None @@ -851,7 +893,10 @@ mod tests { pool.spawner().spawn_obj(async move { while let Some(rq) = rx.next().await { assert_eq!(rq.payload, b"this is a request"); - let _ = rq.pending_response.send(b"this is a response".to_vec()); + let _ = rq.pending_response.send(super::OutgoingResponse { + result: Ok(b"this is a response".to_vec()), + reputation_changes: Vec::new(), + }); } }.boxed().into()).unwrap(); @@ -934,7 +979,10 @@ mod tests { pool.spawner().spawn_obj(async move { while let Some(rq) = rx.next().await { assert_eq!(rq.payload, b"this is a request"); - let _ = rq.pending_response.send(b"this response exceeds the limit".to_vec()); + let _ = rq.pending_response.send(super::OutgoingResponse { + result: Ok(b"this response exceeds the limit".to_vec()), + reputation_changes: Vec::new(), + }); } }.boxed().into()).unwrap(); @@ -1100,11 +1148,17 @@ mod tests { protocol_1_request.unwrap() .pending_response - .send(b"this is a response".to_vec()) + .send(OutgoingResponse { + result: Ok(b"this is a response".to_vec()), + reputation_changes: Vec::new(), + }) .unwrap(); protocol_2_request.unwrap() .pending_response - .send(b"this is a response".to_vec()) + .send(OutgoingResponse { + result: Ok(b"this is a response".to_vec()), + reputation_changes: Vec::new(), + }) .unwrap(); }.boxed().into()).unwrap();