Make blocks per request configurable (#13824)

* Make blocks per request configurable

* Correct type

* Update docs

* Update client/cli/src/params/network_params.rs
This commit is contained in:
Aaro Altonen
2023-04-07 11:20:17 +03:00
committed by GitHub
parent 23de70a536
commit e77099c1a6
6 changed files with 81 additions and 33 deletions
@@ -17,7 +17,10 @@
//! Helper for handling (i.e. answering) block requests from a remote peer via the
//! `crate::request_responses::RequestResponsesBehaviour`.
use crate::schema::v1::{block_request::FromBlock, BlockResponse, Direction};
use crate::{
schema::v1::{block_request::FromBlock, BlockResponse, Direction},
MAX_BLOCKS_IN_RESPONSE,
};
use codec::{Decode, Encode};
use futures::{
@@ -50,7 +53,6 @@ use std::{
};
const LOG_TARGET: &str = "sync";
const MAX_BLOCKS_IN_RESPONSE: usize = 128;
const MAX_BODY_BYTES: usize = 8 * 1024 * 1024;
const MAX_NUMBER_OF_SAME_REQUESTS_PER_PEER: usize = 2;
+1 -1
View File
@@ -109,7 +109,7 @@ impl<B: BlockT> BlockCollection<B> {
pub fn needed_blocks(
&mut self,
who: PeerId,
count: usize,
count: u32,
peer_best: NumberFor<B>,
common: NumberFor<B>,
max_parallel: u32,
@@ -264,6 +264,14 @@ where
SyncOperationMode::Warp => SyncMode::Warp,
};
let max_parallel_downloads = network_config.max_parallel_downloads;
let max_blocks_per_request = if network_config.max_blocks_per_request >
crate::MAX_BLOCKS_IN_RESPONSE as u32
{
log::info!(target: "sync", "clamping maximum blocks per request to {}", crate::MAX_BLOCKS_IN_RESPONSE);
crate::MAX_BLOCKS_IN_RESPONSE as u32
} else {
network_config.max_blocks_per_request
};
let cache_capacity = NonZeroUsize::new(
(network_config.default_peers_set.in_peers as usize +
network_config.default_peers_set.out_peers as usize)
@@ -318,6 +326,7 @@ where
roles,
block_announce_validator,
max_parallel_downloads,
max_blocks_per_request,
warp_sync_params,
metrics_registry,
network_service.clone(),
+54 -30
View File
@@ -107,9 +107,6 @@ pub mod state_request_handler;
pub mod warp;
pub mod warp_request_handler;
/// Maximum blocks to request in a single packet.
const MAX_BLOCKS_TO_REQUEST: usize = 64;
/// Maximum blocks to store in the import queue.
const MAX_IMPORTING_BLOCKS: usize = 2048;
@@ -147,6 +144,9 @@ const MIN_PEERS_TO_START_WARP_SYNC: usize = 3;
/// Maximum allowed size for a block announce.
const MAX_BLOCK_ANNOUNCE_SIZE: u64 = 1024 * 1024;
/// Maximum blocks per response.
pub(crate) const MAX_BLOCKS_IN_RESPONSE: usize = 128;
mod rep {
use sc_peerset::ReputationChange as Rep;
/// Reputation change when a peer sent us a message that led to a
@@ -311,6 +311,8 @@ pub struct ChainSync<B: BlockT, Client> {
block_announce_validator: Box<dyn BlockAnnounceValidator<B> + Send>,
/// Maximum number of peers to ask the same blocks in parallel.
max_parallel_downloads: u32,
/// Maximum blocks per request.
max_blocks_per_request: u32,
/// Total number of downloaded blocks.
downloaded_blocks: usize,
/// All block announcement that are currently being validated.
@@ -1403,6 +1405,7 @@ where
roles: Roles,
block_announce_validator: Box<dyn BlockAnnounceValidator<B> + Send>,
max_parallel_downloads: u32,
max_blocks_per_request: u32,
warp_sync_params: Option<WarpSyncParams<B>>,
metrics_registry: Option<&Registry>,
network_service: service::network::NetworkServiceHandle,
@@ -1437,6 +1440,7 @@ where
allowed_requests: Default::default(),
block_announce_validator,
max_parallel_downloads,
max_blocks_per_request,
downloaded_blocks: 0,
block_announce_validation: Default::default(),
block_announce_validation_per_peer_stats: Default::default(),
@@ -2365,6 +2369,7 @@ where
let queue = &self.queue_blocks;
let allowed_requests = self.allowed_requests.take();
let max_parallel = if is_major_syncing { 1 } else { self.max_parallel_downloads };
let max_blocks_per_request = self.max_blocks_per_request;
let gap_sync = &mut self.gap_sync;
self.peers
.iter_mut()
@@ -2404,6 +2409,7 @@ where
blocks,
attrs,
max_parallel,
max_blocks_per_request,
last_finalized,
best_queued,
) {
@@ -2430,6 +2436,7 @@ where
client.block_status(*hash).unwrap_or(BlockStatus::Unknown)
}
},
max_blocks_per_request,
) {
trace!(target: "sync", "Downloading fork {:?} from {}", hash, id);
peer.state = PeerSyncState::DownloadingStale(hash);
@@ -2442,6 +2449,7 @@ where
attrs,
sync.target,
sync.best_queued_number,
max_blocks_per_request,
)
}) {
peer.state = PeerSyncState::DownloadingGap(range.start);
@@ -2910,6 +2918,7 @@ fn peer_block_request<B: BlockT>(
blocks: &mut BlockCollection<B>,
attrs: BlockAttributes,
max_parallel_downloads: u32,
max_blocks_per_request: u32,
finalized: NumberFor<B>,
best_num: NumberFor<B>,
) -> Option<(Range<NumberFor<B>>, BlockRequest<B>)> {
@@ -2925,7 +2934,7 @@ fn peer_block_request<B: BlockT>(
}
let range = blocks.needed_blocks(
*id,
MAX_BLOCKS_TO_REQUEST,
max_blocks_per_request,
peer.best_number,
peer.common_number,
max_parallel_downloads,
@@ -2960,10 +2969,11 @@ fn peer_gap_block_request<B: BlockT>(
attrs: BlockAttributes,
target: NumberFor<B>,
common_number: NumberFor<B>,
max_blocks_per_request: u32,
) -> Option<(Range<NumberFor<B>>, BlockRequest<B>)> {
let range = blocks.needed_blocks(
*id,
MAX_BLOCKS_TO_REQUEST,
max_blocks_per_request,
std::cmp::min(peer.best_number, target),
common_number,
1,
@@ -2992,6 +3002,7 @@ fn fork_sync_request<B: BlockT>(
finalized: NumberFor<B>,
attributes: BlockAttributes,
check_block: impl Fn(&B::Hash) -> BlockStatus,
max_blocks_per_request: u32,
) -> Option<(B::Hash, BlockRequest<B>)> {
targets.retain(|hash, r| {
if r.number <= finalized {
@@ -3011,7 +3022,7 @@ fn fork_sync_request<B: BlockT>(
// Download the fork only if it is behind or not too far ahead our tip of the chain
// Otherwise it should be downloaded in full sync mode.
if r.number <= best_num ||
(r.number - best_num).saturated_into::<u32>() < MAX_BLOCKS_TO_REQUEST as u32
(r.number - best_num).saturated_into::<u32>() < max_blocks_per_request as u32
{
let parent_status = r.parent_hash.as_ref().map_or(BlockStatus::Unknown, check_block);
let count = if parent_status == BlockStatus::Unknown {
@@ -3199,6 +3210,7 @@ mod test {
Roles::from(&Role::Full),
block_announce_validator,
1,
64,
None,
None,
chain_sync_network_handle,
@@ -3265,6 +3277,7 @@ mod test {
Roles::from(&Role::Full),
Box::new(DefaultBlockAnnounceValidator),
1,
64,
None,
None,
chain_sync_network_handle,
@@ -3446,6 +3459,7 @@ mod test {
Roles::from(&Role::Full),
Box::new(DefaultBlockAnnounceValidator),
5,
64,
None,
None,
chain_sync_network_handle,
@@ -3572,6 +3586,7 @@ mod test {
Roles::from(&Role::Full),
Box::new(DefaultBlockAnnounceValidator),
5,
64,
None,
None,
chain_sync_network_handle,
@@ -3586,6 +3601,7 @@ mod test {
let peer_id2 = PeerId::random();
let best_block = blocks.last().unwrap().clone();
let max_blocks_to_request = sync.max_blocks_per_request;
// Connect the node we will sync from
sync.new_peer(peer_id1, best_block.hash(), *best_block.header().number())
.unwrap();
@@ -3595,8 +3611,8 @@ mod test {
while best_block_num < MAX_DOWNLOAD_AHEAD {
let request = get_block_request(
&mut sync,
FromBlock::Number(MAX_BLOCKS_TO_REQUEST as u64 + best_block_num as u64),
MAX_BLOCKS_TO_REQUEST as u32,
FromBlock::Number(max_blocks_to_request as u64 + best_block_num as u64),
max_blocks_to_request as u32,
&peer_id1,
);
@@ -3610,14 +3626,14 @@ mod test {
let res = sync.on_block_data(&peer_id1, Some(request), response).unwrap();
assert!(matches!(
res,
OnBlockData::Import(_, blocks) if blocks.len() == MAX_BLOCKS_TO_REQUEST
OnBlockData::Import(_, blocks) if blocks.len() == max_blocks_to_request as usize
),);
best_block_num += MAX_BLOCKS_TO_REQUEST as u32;
best_block_num += max_blocks_to_request as u32;
let _ = sync.on_blocks_processed(
MAX_BLOCKS_TO_REQUEST as usize,
MAX_BLOCKS_TO_REQUEST as usize,
max_blocks_to_request as usize,
max_blocks_to_request as usize,
resp_blocks
.iter()
.rev()
@@ -3675,8 +3691,8 @@ mod test {
// peer 2 as well.
get_block_request(
&mut sync,
FromBlock::Number(peer1_from + MAX_BLOCKS_TO_REQUEST as u64),
MAX_BLOCKS_TO_REQUEST as u32,
FromBlock::Number(peer1_from + max_blocks_to_request as u64),
max_blocks_to_request as u32,
&peer_id2,
);
}
@@ -3728,6 +3744,7 @@ mod test {
Roles::from(&Role::Full),
Box::new(DefaultBlockAnnounceValidator),
5,
64,
None,
None,
chain_sync_network_handle,
@@ -3773,11 +3790,12 @@ mod test {
// Now request and import the fork.
let mut best_block_num = *finalized_block.header().number() as u32;
let max_blocks_to_request = sync.max_blocks_per_request;
while best_block_num < *fork_blocks.last().unwrap().header().number() as u32 - 1 {
let request = get_block_request(
&mut sync,
FromBlock::Number(MAX_BLOCKS_TO_REQUEST as u64 + best_block_num as u64),
MAX_BLOCKS_TO_REQUEST as u32,
FromBlock::Number(max_blocks_to_request as u64 + best_block_num as u64),
max_blocks_to_request as u32,
&peer_id1,
);
@@ -3791,14 +3809,14 @@ mod test {
let res = sync.on_block_data(&peer_id1, Some(request), response).unwrap();
assert!(matches!(
res,
OnBlockData::Import(_, blocks) if blocks.len() == MAX_BLOCKS_TO_REQUEST
OnBlockData::Import(_, blocks) if blocks.len() == sync.max_blocks_per_request as usize
),);
best_block_num += MAX_BLOCKS_TO_REQUEST as u32;
best_block_num += sync.max_blocks_per_request as u32;
let _ = sync.on_blocks_processed(
MAX_BLOCKS_TO_REQUEST as usize,
MAX_BLOCKS_TO_REQUEST as usize,
max_blocks_to_request as usize,
max_blocks_to_request as usize,
resp_blocks
.iter()
.rev()
@@ -3869,6 +3887,7 @@ mod test {
Roles::from(&Role::Full),
Box::new(DefaultBlockAnnounceValidator),
5,
64,
None,
None,
chain_sync_network_handle,
@@ -3914,10 +3933,12 @@ mod test {
// Now request and import the fork.
let mut best_block_num = *finalized_block.header().number() as u32;
let max_blocks_to_request = sync.max_blocks_per_request;
let mut request = get_block_request(
&mut sync,
FromBlock::Number(MAX_BLOCKS_TO_REQUEST as u64 + best_block_num as u64),
MAX_BLOCKS_TO_REQUEST as u32,
FromBlock::Number(max_blocks_to_request as u64 + best_block_num as u64),
max_blocks_to_request as u32,
&peer_id1,
);
let last_block_num = *fork_blocks.last().unwrap().header().number() as u32 - 1;
@@ -3932,18 +3953,18 @@ mod test {
let res = sync.on_block_data(&peer_id1, Some(request.clone()), response).unwrap();
assert!(matches!(
res,
OnBlockData::Import(_, blocks) if blocks.len() == MAX_BLOCKS_TO_REQUEST
OnBlockData::Import(_, blocks) if blocks.len() == max_blocks_to_request as usize
),);
best_block_num += MAX_BLOCKS_TO_REQUEST as u32;
best_block_num += max_blocks_to_request as u32;
if best_block_num < last_block_num {
// make sure we're not getting a duplicate request in the time before the blocks are
// processed
request = get_block_request(
&mut sync,
FromBlock::Number(MAX_BLOCKS_TO_REQUEST as u64 + best_block_num as u64),
MAX_BLOCKS_TO_REQUEST as u32,
FromBlock::Number(max_blocks_to_request as u64 + best_block_num as u64),
max_blocks_to_request as u32,
&peer_id1,
);
}
@@ -3965,16 +3986,17 @@ mod test {
// The import queue may send notifications in batches of varying size. So we simulate
// this here by splitting the batch into 2 notifications.
let max_blocks_to_request = sync.max_blocks_per_request;
let second_batch = notify_imported.split_off(notify_imported.len() / 2);
let _ = sync.on_blocks_processed(
MAX_BLOCKS_TO_REQUEST as usize,
MAX_BLOCKS_TO_REQUEST as usize,
max_blocks_to_request as usize,
max_blocks_to_request as usize,
notify_imported,
);
let _ = sync.on_blocks_processed(
MAX_BLOCKS_TO_REQUEST as usize,
MAX_BLOCKS_TO_REQUEST as usize,
max_blocks_to_request as usize,
max_blocks_to_request as usize,
second_batch,
);
@@ -4010,6 +4032,7 @@ mod test {
Roles::from(&Role::Full),
Box::new(DefaultBlockAnnounceValidator),
1,
64,
None,
None,
chain_sync_network_handle,
@@ -4055,6 +4078,7 @@ mod test {
Roles::from(&Role::Full),
Box::new(DefaultBlockAnnounceValidator),
1,
64,
None,
None,
chain_sync_network_handle,