From 85f32e8813a934391f2c5618a25a5c9e87fd905a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastian=20K=C3=B6cher?= Date: Tue, 9 Mar 2021 14:43:19 +0100 Subject: [PATCH] Decrease the peer reputation on invalid block requests (#8260) * Decrease the peer reputation on invalid block requests This pr changes the block request handler to decrease the reputation of peers when they send the same request multiple times or they send us an invalid block request. * Review feedback * Change log target * Remove unused code --- .../network/src/block_request_handler.rs | 146 +++++++++++++++--- substrate/client/network/src/gossip/tests.rs | 1 + substrate/client/network/src/service/tests.rs | 1 + substrate/client/network/test/src/lib.rs | 6 +- substrate/client/service/src/builder.rs | 2 + 5 files changed, 130 insertions(+), 26 deletions(-) diff --git a/substrate/client/network/src/block_request_handler.rs b/substrate/client/network/src/block_request_handler.rs index 8faa6a7f6c..85b4acf687 100644 --- a/substrate/client/network/src/block_request_handler.rs +++ b/substrate/client/network/src/block_request_handler.rs @@ -24,19 +24,30 @@ use crate::protocol::{message::BlockAttributes}; use crate::request_responses::{IncomingRequest, OutgoingResponse, ProtocolConfig}; use crate::schema::v1::block_request::FromBlock; use crate::schema::v1::{BlockResponse, Direction}; +use crate::{PeerId, ReputationChange}; use futures::channel::{mpsc, oneshot}; use futures::stream::StreamExt; use log::debug; +use lru::LruCache; use prost::Message; use sp_runtime::generic::BlockId; use sp_runtime::traits::{Block as BlockT, Header, One, Zero}; use std::cmp::min; -use std::sync::{Arc}; +use std::sync::Arc; use std::time::Duration; +use std::hash::{Hasher, Hash}; -const LOG_TARGET: &str = "block-request-handler"; +const LOG_TARGET: &str = "sync"; const MAX_BLOCKS_IN_RESPONSE: usize = 128; const MAX_BODY_BYTES: usize = 8 * 1024 * 1024; +const MAX_NUMBER_OF_SAME_REQUESTS_PER_PEER: usize = 2; + +mod rep { + use super::ReputationChange as Rep; + + /// Reputation change when a peer sent us the same request multiple times. + pub const SAME_REQUEST: Rep = Rep::new(i32::min_value(), "Same block request multiple times"); +} /// Generates a [`ProtocolConfig`] for the block request protocol, refusing incoming requests. pub fn generate_protocol_config(protocol_id: &ProtocolId) -> ProtocolConfig { @@ -61,15 +72,45 @@ pub(crate) fn generate_protocol_name(protocol_id: &ProtocolId) -> String { s } -/// Handler for incoming block requests from a remote peer. -pub struct BlockRequestHandler { - client: Arc>, - request_receiver: mpsc::Receiver, +/// The key for [`BlockRequestHandler::seen_requests`]. +#[derive(Eq, PartialEq)] +struct SeenRequestsKey { + peer: PeerId, + from: BlockId, + max_blocks: usize, + direction: Direction, } -impl BlockRequestHandler { +impl Hash for SeenRequestsKey { + fn hash(&self, state: &mut H) { + self.peer.hash(state); + self.max_blocks.hash(state); + self.direction.hash(state); + + match self.from { + BlockId::Hash(h) => h.hash(state), + BlockId::Number(n) => n.hash(state), + } + } +} + +/// Handler for incoming block requests from a remote peer. +pub struct BlockRequestHandler { + client: Arc>, + request_receiver: mpsc::Receiver, + /// Maps from request to number of times we have seen this request. + /// + /// This is used to check if a peer is spamming us with the same request. + seen_requests: LruCache, usize>, +} + +impl BlockRequestHandler { /// Create a new [`BlockRequestHandler`]. - pub fn new(protocol_id: &ProtocolId, client: Arc>) -> (Self, ProtocolConfig) { + pub fn new( + protocol_id: &ProtocolId, + client: Arc>, + num_peer_hint: usize, + ) -> (Self, ProtocolConfig) { // Rate of arrival multiplied with the waiting time in the queue equals the queue length. // // An average Polkadot node serves less than 5 requests per second. The 95th percentile @@ -82,7 +123,9 @@ impl BlockRequestHandler { let mut protocol_config = generate_protocol_config(protocol_id); protocol_config.inbound_queue = Some(tx); - (Self { client, request_receiver }, protocol_config) + let seen_requests = LruCache::new(num_peer_hint * 2); + + (Self { client, request_receiver, seen_requests }, protocol_config) } /// Run [`BlockRequestHandler`]. @@ -90,21 +133,23 @@ impl BlockRequestHandler { while let Some(request) = self.request_receiver.next().await { let IncomingRequest { peer, payload, pending_response } = request; - match self.handle_request(payload, pending_response) { + match self.handle_request(payload, pending_response, &peer) { Ok(()) => debug!(target: LOG_TARGET, "Handled block request from {}.", peer), Err(e) => debug!( target: LOG_TARGET, "Failed to handle block request from {}: {}", - peer, e, + peer, + e, ), } } } fn handle_request( - &self, + &mut self, payload: Vec, - pending_response: oneshot::Sender + pending_response: oneshot::Sender, + peer: &PeerId, ) -> Result<(), HandleRequestError> { let request = crate::schema::v1::BlockRequest::decode(&payload[..])?; @@ -127,16 +172,75 @@ impl BlockRequestHandler { let direction = Direction::from_i32(request.direction) .ok_or(HandleRequestError::ParseDirection)?; + + let key = SeenRequestsKey { + peer: *peer, + max_blocks, + direction, + from: from_block_id.clone(), + }; + + let mut reputation_changes = Vec::new(); + + if let Some(requests) = self.seen_requests.get_mut(&key) { + *requests = requests.saturating_add(1); + + if *requests > MAX_NUMBER_OF_SAME_REQUESTS_PER_PEER { + reputation_changes.push(rep::SAME_REQUEST); + } + } else { + self.seen_requests.put(key, 1); + } + + debug!( + target: LOG_TARGET, + "Handling block request from {}: Starting at `{:?}` with maximum blocks \ + of `{}` and direction `{:?}`.", + peer, + from_block_id, + max_blocks, + direction, + ); + let attributes = BlockAttributes::from_be_u32(request.fields)?; + + let result = if reputation_changes.is_empty() { + let block_response = self.get_block_response( + attributes, + from_block_id, + direction, + max_blocks, + )?; + + let mut data = Vec::with_capacity(block_response.encoded_len()); + block_response.encode(&mut data)?; + + Ok(data) + } else { + Err(()) + }; + + pending_response.send(OutgoingResponse { + result, + reputation_changes, + }).map_err(|_| HandleRequestError::SendResponse) + } + + fn get_block_response( + &self, + attributes: BlockAttributes, + mut block_id: BlockId, + direction: Direction, + max_blocks: usize, + ) -> Result { let get_header = attributes.contains(BlockAttributes::HEADER); let get_body = attributes.contains(BlockAttributes::BODY); let get_justification = attributes.contains(BlockAttributes::JUSTIFICATION); let mut blocks = Vec::new(); - let mut block_id = from_block_id; let mut total_size: usize = 0; - while let Some(header) = self.client.header(block_id).unwrap_or(None) { + while let Some(header) = self.client.header(block_id).unwrap_or_default() { let number = *header.number(); let hash = header.hash(); let parent_hash = *header.parent_hash(); @@ -153,7 +257,7 @@ impl BlockRequestHandler { .map(|extrinsic| extrinsic.encode()) .collect(), None => { - log::trace!(target: "sync", "Missing data for block request."); + log::trace!(target: LOG_TARGET, "Missing data for block request."); break; } } @@ -195,15 +299,7 @@ impl BlockRequestHandler { } } - let res = BlockResponse { blocks }; - - let mut data = Vec::with_capacity(res.encoded_len()); - res.encode(&mut data)?; - - pending_response.send(OutgoingResponse { - result: Ok(data), - reputation_changes: Vec::new(), - }).map_err(|_| HandleRequestError::SendResponse) + Ok(BlockResponse { blocks }) } } diff --git a/substrate/client/network/src/gossip/tests.rs b/substrate/client/network/src/gossip/tests.rs index c35159168d..89ad5fcf04 100644 --- a/substrate/client/network/src/gossip/tests.rs +++ b/substrate/client/network/src/gossip/tests.rs @@ -99,6 +99,7 @@ fn build_test_full_node(network_config: config::NetworkConfiguration) let (handler, protocol_config) = BlockRequestHandler::new( &protocol_id, client.clone(), + 50, ); async_std::task::spawn(handler.run().boxed()); protocol_config diff --git a/substrate/client/network/src/service/tests.rs b/substrate/client/network/src/service/tests.rs index defb9213a3..660eac82c4 100644 --- a/substrate/client/network/src/service/tests.rs +++ b/substrate/client/network/src/service/tests.rs @@ -99,6 +99,7 @@ fn build_test_full_node(config: config::NetworkConfiguration) let (handler, protocol_config) = BlockRequestHandler::new( &protocol_id, client.clone(), + 50, ); async_std::task::spawn(handler.run().boxed()); protocol_config diff --git a/substrate/client/network/test/src/lib.rs b/substrate/client/network/test/src/lib.rs index 6e2380b284..c8b442d0dd 100644 --- a/substrate/client/network/test/src/lib.rs +++ b/substrate/client/network/test/src/lib.rs @@ -727,7 +727,11 @@ pub trait TestNetFactory: Sized { let protocol_id = ProtocolId::from("test-protocol-name"); let block_request_protocol_config = { - let (handler, protocol_config) = BlockRequestHandler::new(&protocol_id, client.clone()); + let (handler, protocol_config) = BlockRequestHandler::new( + &protocol_id, + client.clone(), + 50, + ); self.spawn_task(handler.run().boxed()); protocol_config }; diff --git a/substrate/client/service/src/builder.rs b/substrate/client/service/src/builder.rs index 103e499a58..d0fa10a44d 100644 --- a/substrate/client/service/src/builder.rs +++ b/substrate/client/service/src/builder.rs @@ -883,6 +883,8 @@ pub fn build_network( let (handler, protocol_config) = BlockRequestHandler::new( &protocol_id, client.clone(), + config.network.default_peers_set.in_peers as usize + + config.network.default_peers_set.out_peers as usize, ); spawn_handle.spawn("block_request_handler", handler.run()); protocol_config