|
|
|
@@ -24,19 +24,30 @@ use crate::protocol::{message::BlockAttributes};
|
|
|
|
|
use crate::request_responses::{IncomingRequest, OutgoingResponse, ProtocolConfig};
|
|
|
|
|
use crate::schema::v1::block_request::FromBlock;
|
|
|
|
|
use crate::schema::v1::{BlockResponse, Direction};
|
|
|
|
|
use crate::{PeerId, ReputationChange};
|
|
|
|
|
use futures::channel::{mpsc, oneshot};
|
|
|
|
|
use futures::stream::StreamExt;
|
|
|
|
|
use log::debug;
|
|
|
|
|
use lru::LruCache;
|
|
|
|
|
use prost::Message;
|
|
|
|
|
use sp_runtime::generic::BlockId;
|
|
|
|
|
use sp_runtime::traits::{Block as BlockT, Header, One, Zero};
|
|
|
|
|
use std::cmp::min;
|
|
|
|
|
use std::sync::{Arc};
|
|
|
|
|
use std::sync::Arc;
|
|
|
|
|
use std::time::Duration;
|
|
|
|
|
use std::hash::{Hasher, Hash};
|
|
|
|
|
|
|
|
|
|
const LOG_TARGET: &str = "block-request-handler";
|
|
|
|
|
const LOG_TARGET: &str = "sync";
|
|
|
|
|
const MAX_BLOCKS_IN_RESPONSE: usize = 128;
|
|
|
|
|
const MAX_BODY_BYTES: usize = 8 * 1024 * 1024;
|
|
|
|
|
const MAX_NUMBER_OF_SAME_REQUESTS_PER_PEER: usize = 2;
|
|
|
|
|
|
|
|
|
|
mod rep {
|
|
|
|
|
use super::ReputationChange as Rep;
|
|
|
|
|
|
|
|
|
|
/// Reputation change when a peer sent us the same request multiple times.
|
|
|
|
|
pub const SAME_REQUEST: Rep = Rep::new(i32::min_value(), "Same block request multiple times");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Generates a [`ProtocolConfig`] for the block request protocol, refusing incoming requests.
|
|
|
|
|
pub fn generate_protocol_config(protocol_id: &ProtocolId) -> ProtocolConfig {
|
|
|
|
@@ -61,15 +72,45 @@ pub(crate) fn generate_protocol_name(protocol_id: &ProtocolId) -> String {
|
|
|
|
|
s
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Handler for incoming block requests from a remote peer.
|
|
|
|
|
pub struct BlockRequestHandler<B> {
|
|
|
|
|
client: Arc<dyn Client<B>>,
|
|
|
|
|
request_receiver: mpsc::Receiver<IncomingRequest>,
|
|
|
|
|
/// The key for [`BlockRequestHandler::seen_requests`].
|
|
|
|
|
#[derive(Eq, PartialEq)]
|
|
|
|
|
struct SeenRequestsKey<B: BlockT> {
|
|
|
|
|
peer: PeerId,
|
|
|
|
|
from: BlockId<B>,
|
|
|
|
|
max_blocks: usize,
|
|
|
|
|
direction: Direction,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl <B: BlockT> BlockRequestHandler<B> {
|
|
|
|
|
impl<B: BlockT> Hash for SeenRequestsKey<B> {
|
|
|
|
|
fn hash<H: Hasher>(&self, state: &mut H) {
|
|
|
|
|
self.peer.hash(state);
|
|
|
|
|
self.max_blocks.hash(state);
|
|
|
|
|
self.direction.hash(state);
|
|
|
|
|
|
|
|
|
|
match self.from {
|
|
|
|
|
BlockId::Hash(h) => h.hash(state),
|
|
|
|
|
BlockId::Number(n) => n.hash(state),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Handler for incoming block requests from a remote peer.
|
|
|
|
|
pub struct BlockRequestHandler<B: BlockT> {
|
|
|
|
|
client: Arc<dyn Client<B>>,
|
|
|
|
|
request_receiver: mpsc::Receiver<IncomingRequest>,
|
|
|
|
|
/// Maps from request to number of times we have seen this request.
|
|
|
|
|
///
|
|
|
|
|
/// This is used to check if a peer is spamming us with the same request.
|
|
|
|
|
seen_requests: LruCache<SeenRequestsKey<B>, usize>,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl<B: BlockT> BlockRequestHandler<B> {
|
|
|
|
|
/// Create a new [`BlockRequestHandler`].
|
|
|
|
|
pub fn new(protocol_id: &ProtocolId, client: Arc<dyn Client<B>>) -> (Self, ProtocolConfig) {
|
|
|
|
|
pub fn new(
|
|
|
|
|
protocol_id: &ProtocolId,
|
|
|
|
|
client: Arc<dyn Client<B>>,
|
|
|
|
|
num_peer_hint: usize,
|
|
|
|
|
) -> (Self, ProtocolConfig) {
|
|
|
|
|
// Rate of arrival multiplied with the waiting time in the queue equals the queue length.
|
|
|
|
|
//
|
|
|
|
|
// An average Polkadot node serves less than 5 requests per second. The 95th percentile
|
|
|
|
@@ -82,7 +123,9 @@ impl <B: BlockT> BlockRequestHandler<B> {
|
|
|
|
|
let mut protocol_config = generate_protocol_config(protocol_id);
|
|
|
|
|
protocol_config.inbound_queue = Some(tx);
|
|
|
|
|
|
|
|
|
|
(Self { client, request_receiver }, protocol_config)
|
|
|
|
|
let seen_requests = LruCache::new(num_peer_hint * 2);
|
|
|
|
|
|
|
|
|
|
(Self { client, request_receiver, seen_requests }, protocol_config)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Run [`BlockRequestHandler`].
|
|
|
|
@@ -90,21 +133,23 @@ impl <B: BlockT> BlockRequestHandler<B> {
|
|
|
|
|
while let Some(request) = self.request_receiver.next().await {
|
|
|
|
|
let IncomingRequest { peer, payload, pending_response } = request;
|
|
|
|
|
|
|
|
|
|
match self.handle_request(payload, pending_response) {
|
|
|
|
|
match self.handle_request(payload, pending_response, &peer) {
|
|
|
|
|
Ok(()) => debug!(target: LOG_TARGET, "Handled block request from {}.", peer),
|
|
|
|
|
Err(e) => debug!(
|
|
|
|
|
target: LOG_TARGET,
|
|
|
|
|
"Failed to handle block request from {}: {}",
|
|
|
|
|
peer, e,
|
|
|
|
|
peer,
|
|
|
|
|
e,
|
|
|
|
|
),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn handle_request(
|
|
|
|
|
&self,
|
|
|
|
|
&mut self,
|
|
|
|
|
payload: Vec<u8>,
|
|
|
|
|
pending_response: oneshot::Sender<OutgoingResponse>
|
|
|
|
|
pending_response: oneshot::Sender<OutgoingResponse>,
|
|
|
|
|
peer: &PeerId,
|
|
|
|
|
) -> Result<(), HandleRequestError> {
|
|
|
|
|
let request = crate::schema::v1::BlockRequest::decode(&payload[..])?;
|
|
|
|
|
|
|
|
|
@@ -127,16 +172,75 @@ impl <B: BlockT> BlockRequestHandler<B> {
|
|
|
|
|
|
|
|
|
|
let direction = Direction::from_i32(request.direction)
|
|
|
|
|
.ok_or(HandleRequestError::ParseDirection)?;
|
|
|
|
|
|
|
|
|
|
let key = SeenRequestsKey {
|
|
|
|
|
peer: *peer,
|
|
|
|
|
max_blocks,
|
|
|
|
|
direction,
|
|
|
|
|
from: from_block_id.clone(),
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
let mut reputation_changes = Vec::new();
|
|
|
|
|
|
|
|
|
|
if let Some(requests) = self.seen_requests.get_mut(&key) {
|
|
|
|
|
*requests = requests.saturating_add(1);
|
|
|
|
|
|
|
|
|
|
if *requests > MAX_NUMBER_OF_SAME_REQUESTS_PER_PEER {
|
|
|
|
|
reputation_changes.push(rep::SAME_REQUEST);
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
self.seen_requests.put(key, 1);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
debug!(
|
|
|
|
|
target: LOG_TARGET,
|
|
|
|
|
"Handling block request from {}: Starting at `{:?}` with maximum blocks \
|
|
|
|
|
of `{}` and direction `{:?}`.",
|
|
|
|
|
peer,
|
|
|
|
|
from_block_id,
|
|
|
|
|
max_blocks,
|
|
|
|
|
direction,
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
let attributes = BlockAttributes::from_be_u32(request.fields)?;
|
|
|
|
|
|
|
|
|
|
let result = if reputation_changes.is_empty() {
|
|
|
|
|
let block_response = self.get_block_response(
|
|
|
|
|
attributes,
|
|
|
|
|
from_block_id,
|
|
|
|
|
direction,
|
|
|
|
|
max_blocks,
|
|
|
|
|
)?;
|
|
|
|
|
|
|
|
|
|
let mut data = Vec::with_capacity(block_response.encoded_len());
|
|
|
|
|
block_response.encode(&mut data)?;
|
|
|
|
|
|
|
|
|
|
Ok(data)
|
|
|
|
|
} else {
|
|
|
|
|
Err(())
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
pending_response.send(OutgoingResponse {
|
|
|
|
|
result,
|
|
|
|
|
reputation_changes,
|
|
|
|
|
}).map_err(|_| HandleRequestError::SendResponse)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn get_block_response(
|
|
|
|
|
&self,
|
|
|
|
|
attributes: BlockAttributes,
|
|
|
|
|
mut block_id: BlockId<B>,
|
|
|
|
|
direction: Direction,
|
|
|
|
|
max_blocks: usize,
|
|
|
|
|
) -> Result<BlockResponse, HandleRequestError> {
|
|
|
|
|
let get_header = attributes.contains(BlockAttributes::HEADER);
|
|
|
|
|
let get_body = attributes.contains(BlockAttributes::BODY);
|
|
|
|
|
let get_justification = attributes.contains(BlockAttributes::JUSTIFICATION);
|
|
|
|
|
|
|
|
|
|
let mut blocks = Vec::new();
|
|
|
|
|
let mut block_id = from_block_id;
|
|
|
|
|
|
|
|
|
|
let mut total_size: usize = 0;
|
|
|
|
|
while let Some(header) = self.client.header(block_id).unwrap_or(None) {
|
|
|
|
|
while let Some(header) = self.client.header(block_id).unwrap_or_default() {
|
|
|
|
|
let number = *header.number();
|
|
|
|
|
let hash = header.hash();
|
|
|
|
|
let parent_hash = *header.parent_hash();
|
|
|
|
@@ -153,7 +257,7 @@ impl <B: BlockT> BlockRequestHandler<B> {
|
|
|
|
|
.map(|extrinsic| extrinsic.encode())
|
|
|
|
|
.collect(),
|
|
|
|
|
None => {
|
|
|
|
|
log::trace!(target: "sync", "Missing data for block request.");
|
|
|
|
|
log::trace!(target: LOG_TARGET, "Missing data for block request.");
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@@ -195,15 +299,7 @@ impl <B: BlockT> BlockRequestHandler<B> {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
let res = BlockResponse { blocks };
|
|
|
|
|
|
|
|
|
|
let mut data = Vec::with_capacity(res.encoded_len());
|
|
|
|
|
res.encode(&mut data)?;
|
|
|
|
|
|
|
|
|
|
pending_response.send(OutgoingResponse {
|
|
|
|
|
result: Ok(data),
|
|
|
|
|
reputation_changes: Vec::new(),
|
|
|
|
|
}).map_err(|_| HandleRequestError::SendResponse)
|
|
|
|
|
Ok(BlockResponse { blocks })
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|