diff --git a/substrate/client/network/src/block_request_handler.rs b/substrate/client/network/src/block_request_handler.rs index 8faa6a7f6c..85b4acf687 100644 --- a/substrate/client/network/src/block_request_handler.rs +++ b/substrate/client/network/src/block_request_handler.rs @@ -24,19 +24,30 @@ use crate::protocol::{message::BlockAttributes}; use crate::request_responses::{IncomingRequest, OutgoingResponse, ProtocolConfig}; use crate::schema::v1::block_request::FromBlock; use crate::schema::v1::{BlockResponse, Direction}; +use crate::{PeerId, ReputationChange}; use futures::channel::{mpsc, oneshot}; use futures::stream::StreamExt; use log::debug; +use lru::LruCache; use prost::Message; use sp_runtime::generic::BlockId; use sp_runtime::traits::{Block as BlockT, Header, One, Zero}; use std::cmp::min; -use std::sync::{Arc}; +use std::sync::Arc; use std::time::Duration; +use std::hash::{Hasher, Hash}; -const LOG_TARGET: &str = "block-request-handler"; +const LOG_TARGET: &str = "sync"; const MAX_BLOCKS_IN_RESPONSE: usize = 128; const MAX_BODY_BYTES: usize = 8 * 1024 * 1024; +const MAX_NUMBER_OF_SAME_REQUESTS_PER_PEER: usize = 2; + +mod rep { + use super::ReputationChange as Rep; + + /// Reputation change when a peer sent us the same request multiple times. + pub const SAME_REQUEST: Rep = Rep::new(i32::min_value(), "Same block request multiple times"); +} /// Generates a [`ProtocolConfig`] for the block request protocol, refusing incoming requests. pub fn generate_protocol_config(protocol_id: &ProtocolId) -> ProtocolConfig { @@ -61,15 +72,45 @@ pub(crate) fn generate_protocol_name(protocol_id: &ProtocolId) -> String { s } -/// Handler for incoming block requests from a remote peer. -pub struct BlockRequestHandler { - client: Arc>, - request_receiver: mpsc::Receiver, +/// The key for [`BlockRequestHandler::seen_requests`]. +#[derive(Eq, PartialEq)] +struct SeenRequestsKey { + peer: PeerId, + from: BlockId, + max_blocks: usize, + direction: Direction, } -impl BlockRequestHandler { +impl Hash for SeenRequestsKey { + fn hash(&self, state: &mut H) { + self.peer.hash(state); + self.max_blocks.hash(state); + self.direction.hash(state); + + match self.from { + BlockId::Hash(h) => h.hash(state), + BlockId::Number(n) => n.hash(state), + } + } +} + +/// Handler for incoming block requests from a remote peer. +pub struct BlockRequestHandler { + client: Arc>, + request_receiver: mpsc::Receiver, + /// Maps from request to number of times we have seen this request. + /// + /// This is used to check if a peer is spamming us with the same request. + seen_requests: LruCache, usize>, +} + +impl BlockRequestHandler { /// Create a new [`BlockRequestHandler`]. - pub fn new(protocol_id: &ProtocolId, client: Arc>) -> (Self, ProtocolConfig) { + pub fn new( + protocol_id: &ProtocolId, + client: Arc>, + num_peer_hint: usize, + ) -> (Self, ProtocolConfig) { // Rate of arrival multiplied with the waiting time in the queue equals the queue length. // // An average Polkadot node serves less than 5 requests per second. The 95th percentile @@ -82,7 +123,9 @@ impl BlockRequestHandler { let mut protocol_config = generate_protocol_config(protocol_id); protocol_config.inbound_queue = Some(tx); - (Self { client, request_receiver }, protocol_config) + let seen_requests = LruCache::new(num_peer_hint * 2); + + (Self { client, request_receiver, seen_requests }, protocol_config) } /// Run [`BlockRequestHandler`]. @@ -90,21 +133,23 @@ impl BlockRequestHandler { while let Some(request) = self.request_receiver.next().await { let IncomingRequest { peer, payload, pending_response } = request; - match self.handle_request(payload, pending_response) { + match self.handle_request(payload, pending_response, &peer) { Ok(()) => debug!(target: LOG_TARGET, "Handled block request from {}.", peer), Err(e) => debug!( target: LOG_TARGET, "Failed to handle block request from {}: {}", - peer, e, + peer, + e, ), } } } fn handle_request( - &self, + &mut self, payload: Vec, - pending_response: oneshot::Sender + pending_response: oneshot::Sender, + peer: &PeerId, ) -> Result<(), HandleRequestError> { let request = crate::schema::v1::BlockRequest::decode(&payload[..])?; @@ -127,16 +172,75 @@ impl BlockRequestHandler { let direction = Direction::from_i32(request.direction) .ok_or(HandleRequestError::ParseDirection)?; + + let key = SeenRequestsKey { + peer: *peer, + max_blocks, + direction, + from: from_block_id.clone(), + }; + + let mut reputation_changes = Vec::new(); + + if let Some(requests) = self.seen_requests.get_mut(&key) { + *requests = requests.saturating_add(1); + + if *requests > MAX_NUMBER_OF_SAME_REQUESTS_PER_PEER { + reputation_changes.push(rep::SAME_REQUEST); + } + } else { + self.seen_requests.put(key, 1); + } + + debug!( + target: LOG_TARGET, + "Handling block request from {}: Starting at `{:?}` with maximum blocks \ + of `{}` and direction `{:?}`.", + peer, + from_block_id, + max_blocks, + direction, + ); + let attributes = BlockAttributes::from_be_u32(request.fields)?; + + let result = if reputation_changes.is_empty() { + let block_response = self.get_block_response( + attributes, + from_block_id, + direction, + max_blocks, + )?; + + let mut data = Vec::with_capacity(block_response.encoded_len()); + block_response.encode(&mut data)?; + + Ok(data) + } else { + Err(()) + }; + + pending_response.send(OutgoingResponse { + result, + reputation_changes, + }).map_err(|_| HandleRequestError::SendResponse) + } + + fn get_block_response( + &self, + attributes: BlockAttributes, + mut block_id: BlockId, + direction: Direction, + max_blocks: usize, + ) -> Result { let get_header = attributes.contains(BlockAttributes::HEADER); let get_body = attributes.contains(BlockAttributes::BODY); let get_justification = attributes.contains(BlockAttributes::JUSTIFICATION); let mut blocks = Vec::new(); - let mut block_id = from_block_id; let mut total_size: usize = 0; - while let Some(header) = self.client.header(block_id).unwrap_or(None) { + while let Some(header) = self.client.header(block_id).unwrap_or_default() { let number = *header.number(); let hash = header.hash(); let parent_hash = *header.parent_hash(); @@ -153,7 +257,7 @@ impl BlockRequestHandler { .map(|extrinsic| extrinsic.encode()) .collect(), None => { - log::trace!(target: "sync", "Missing data for block request."); + log::trace!(target: LOG_TARGET, "Missing data for block request."); break; } } @@ -195,15 +299,7 @@ impl BlockRequestHandler { } } - let res = BlockResponse { blocks }; - - let mut data = Vec::with_capacity(res.encoded_len()); - res.encode(&mut data)?; - - pending_response.send(OutgoingResponse { - result: Ok(data), - reputation_changes: Vec::new(), - }).map_err(|_| HandleRequestError::SendResponse) + Ok(BlockResponse { blocks }) } } diff --git a/substrate/client/network/src/gossip/tests.rs b/substrate/client/network/src/gossip/tests.rs index c35159168d..89ad5fcf04 100644 --- a/substrate/client/network/src/gossip/tests.rs +++ b/substrate/client/network/src/gossip/tests.rs @@ -99,6 +99,7 @@ fn build_test_full_node(network_config: config::NetworkConfiguration) let (handler, protocol_config) = BlockRequestHandler::new( &protocol_id, client.clone(), + 50, ); async_std::task::spawn(handler.run().boxed()); protocol_config diff --git a/substrate/client/network/src/service/tests.rs b/substrate/client/network/src/service/tests.rs index defb9213a3..660eac82c4 100644 --- a/substrate/client/network/src/service/tests.rs +++ b/substrate/client/network/src/service/tests.rs @@ -99,6 +99,7 @@ fn build_test_full_node(config: config::NetworkConfiguration) let (handler, protocol_config) = BlockRequestHandler::new( &protocol_id, client.clone(), + 50, ); async_std::task::spawn(handler.run().boxed()); protocol_config diff --git a/substrate/client/network/test/src/lib.rs b/substrate/client/network/test/src/lib.rs index 6e2380b284..c8b442d0dd 100644 --- a/substrate/client/network/test/src/lib.rs +++ b/substrate/client/network/test/src/lib.rs @@ -727,7 +727,11 @@ pub trait TestNetFactory: Sized { let protocol_id = ProtocolId::from("test-protocol-name"); let block_request_protocol_config = { - let (handler, protocol_config) = BlockRequestHandler::new(&protocol_id, client.clone()); + let (handler, protocol_config) = BlockRequestHandler::new( + &protocol_id, + client.clone(), + 50, + ); self.spawn_task(handler.run().boxed()); protocol_config }; diff --git a/substrate/client/service/src/builder.rs b/substrate/client/service/src/builder.rs index 103e499a58..d0fa10a44d 100644 --- a/substrate/client/service/src/builder.rs +++ b/substrate/client/service/src/builder.rs @@ -883,6 +883,8 @@ pub fn build_network( let (handler, protocol_config) = BlockRequestHandler::new( &protocol_id, client.clone(), + config.network.default_peers_set.in_peers as usize + + config.network.default_peers_set.out_peers as usize, ); spawn_handle.spawn("block_request_handler", handler.run()); protocol_config