Avoid a duplicate block request when syncing from a fork (#11094)

* Separate queueing blocks for import from removal

* Add regression tests

* Remove unnecessary log

* Clear queued blocks when processed

* Move check out of match block

* Track queued block ranges

* Update client/network/sync/src/blocks.rs

* Update client/network/sync/src/blocks.rs

* Update client/network/sync/src/blocks.rs

* Update client/network/sync/src/blocks.rs

* FMT

Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com>
Co-authored-by: Bastian Köcher <info@kchr.de>
This commit is contained in:
Nathan Whitaker
2022-06-24 18:21:07 -04:00
committed by GitHub
parent 025e1d76f1
commit 421b4b63d0
3 changed files with 282 additions and 44 deletions
+88 -28
View File
@@ -18,7 +18,7 @@
use crate::message;
use libp2p::PeerId;
use log::trace;
use log::{debug, trace};
use sp_runtime::traits::{Block as BlockT, NumberFor, One};
use std::{
cmp,
@@ -39,6 +39,7 @@ pub struct BlockData<B: BlockT> {
enum BlockRangeState<B: BlockT> {
Downloading { len: NumberFor<B>, downloading: u32 },
Complete(Vec<BlockData<B>>),
Queued { len: NumberFor<B> },
}
impl<B: BlockT> BlockRangeState<B> {
@@ -46,6 +47,7 @@ impl<B: BlockT> BlockRangeState<B> {
match *self {
Self::Downloading { len, .. } => len,
Self::Complete(ref blocks) => (blocks.len() as u32).into(),
Self::Queued { len } => len,
}
}
}
@@ -56,12 +58,19 @@ pub struct BlockCollection<B: BlockT> {
/// Downloaded blocks.
blocks: BTreeMap<NumberFor<B>, BlockRangeState<B>>,
peer_requests: HashMap<PeerId, NumberFor<B>>,
/// Block ranges downloaded and queued for import.
/// Maps start_hash => (start_num, end_num).
queued_blocks: HashMap<B::Hash, (NumberFor<B>, NumberFor<B>)>,
}
impl<B: BlockT> BlockCollection<B> {
/// Create a new instance.
pub fn new() -> Self {
Self { blocks: BTreeMap::new(), peer_requests: HashMap::new() }
Self {
blocks: BTreeMap::new(),
peer_requests: HashMap::new(),
queued_blocks: HashMap::new(),
}
}
/// Clear everything.
@@ -170,29 +179,52 @@ impl<B: BlockT> BlockCollection<B> {
}
/// Get a valid chain of blocks ordered in descending order and ready for importing into
/// blockchain.
pub fn drain(&mut self, from: NumberFor<B>) -> Vec<BlockData<B>> {
let mut drained = Vec::new();
let mut ranges = Vec::new();
/// the blockchain.
pub fn ready_blocks(&mut self, from: NumberFor<B>) -> Vec<BlockData<B>> {
let mut ready = Vec::new();
let mut prev = from;
for (start, range_data) in &mut self.blocks {
match range_data {
BlockRangeState::Complete(blocks) if *start <= prev => {
prev = *start + (blocks.len() as u32).into();
// Remove all elements from `blocks` and add them to `drained`
drained.append(blocks);
ranges.push(*start);
},
_ => break,
for (&start, range_data) in &mut self.blocks {
if start > prev {
break
}
let len = match range_data {
BlockRangeState::Complete(blocks) => {
let len = (blocks.len() as u32).into();
prev = start + len;
// Remove all elements from `blocks` and add them to `ready`
ready.append(blocks);
len
},
BlockRangeState::Queued { .. } => continue,
_ => break,
};
*range_data = BlockRangeState::Queued { len };
}
for r in ranges {
self.blocks.remove(&r);
if let Some(BlockData { block, .. }) = ready.first() {
self.queued_blocks
.insert(block.hash, (from, from + (ready.len() as u32).into()));
}
trace!(target: "sync", "{} blocks ready for import", ready.len());
ready
}
pub fn clear_queued(&mut self, from_hash: &B::Hash) {
match self.queued_blocks.remove(from_hash) {
None => {
debug!(target: "sync", "Can't clear unknown queued blocks from {:?}", from_hash);
},
Some((from, to)) => {
let mut block_num = from;
while block_num < to {
self.blocks.remove(&block_num);
block_num += One::one();
}
trace!(target: "sync", "Cleared blocks from {:?} to {:?}", from, to);
},
}
trace!(target: "sync", "Drained {} blocks from {:?}", drained.len(), from);
drained
}
pub fn clear_peer_download(&mut self, who: &PeerId) {
@@ -268,14 +300,14 @@ mod test {
bc.clear_peer_download(&peer1);
bc.insert(41, blocks[41..81].to_vec(), peer1.clone());
assert_eq!(bc.drain(1), vec![]);
assert_eq!(bc.ready_blocks(1), vec![]);
assert_eq!(bc.needed_blocks(peer1.clone(), 40, 150, 0, 1, 200), Some(121..151));
bc.clear_peer_download(&peer0);
bc.insert(1, blocks[1..11].to_vec(), peer0.clone());
assert_eq!(bc.needed_blocks(peer0.clone(), 40, 150, 0, 1, 200), Some(11..41));
assert_eq!(
bc.drain(1),
bc.ready_blocks(1),
blocks[1..11]
.iter()
.map(|b| BlockData { block: b.clone(), origin: Some(peer0.clone()) })
@@ -285,16 +317,16 @@ mod test {
bc.clear_peer_download(&peer0);
bc.insert(11, blocks[11..41].to_vec(), peer0.clone());
let drained = bc.drain(12);
let ready = bc.ready_blocks(12);
assert_eq!(
drained[..30],
ready[..30],
blocks[11..41]
.iter()
.map(|b| BlockData { block: b.clone(), origin: Some(peer0.clone()) })
.collect::<Vec<_>>()[..]
);
assert_eq!(
drained[30..],
ready[30..],
blocks[41..81]
.iter()
.map(|b| BlockData { block: b.clone(), origin: Some(peer1.clone()) })
@@ -308,17 +340,17 @@ mod test {
bc.clear_peer_download(&peer1);
bc.insert(121, blocks[121..150].to_vec(), peer1.clone());
assert_eq!(bc.drain(80), vec![]);
let drained = bc.drain(81);
assert_eq!(bc.ready_blocks(80), vec![]);
let ready = bc.ready_blocks(81);
assert_eq!(
drained[..40],
ready[..40],
blocks[81..121]
.iter()
.map(|b| BlockData { block: b.clone(), origin: Some(peer2.clone()) })
.collect::<Vec<_>>()[..]
);
assert_eq!(
drained[40..],
ready[40..],
blocks[121..150]
.iter()
.map(|b| BlockData { block: b.clone(), origin: Some(peer1.clone()) })
@@ -344,4 +376,32 @@ mod test {
Some(100 + 128..100 + 128 + 128)
);
}
#[test]
fn no_duplicate_requests_on_fork() {
let mut bc = BlockCollection::new();
assert!(is_empty(&bc));
let peer = PeerId::random();
let blocks = generate_blocks(10);
// count = 5, peer_best = 50, common = 39, max_parallel = 0, max_ahead = 200
assert_eq!(bc.needed_blocks(peer.clone(), 5, 50, 39, 0, 200), Some(40..45));
// got a response on the request for `40..45`
bc.clear_peer_download(&peer);
bc.insert(40, blocks[..5].to_vec(), peer.clone());
// our "node" started on a fork, with its current best = 47, which is > common
let ready = bc.ready_blocks(48);
assert_eq!(
ready,
blocks[..5]
.iter()
.map(|b| BlockData { block: b.clone(), origin: Some(peer.clone()) })
.collect::<Vec<_>>()
);
assert_eq!(bc.needed_blocks(peer.clone(), 5, 50, 39, 0, 200), Some(45..50));
}
}
+184 -16
View File
@@ -1098,7 +1098,7 @@ where
{
self.blocks.insert(start_block, blocks, *who);
}
self.drain_blocks()
self.ready_blocks()
},
PeerSyncState::DownloadingGap(_) => {
peer.state = PeerSyncState::Available;
@@ -1112,7 +1112,7 @@ where
gap = true;
let blocks: Vec<_> = gap_sync
.blocks
.drain(gap_sync.best_queued_number + One::one())
.ready_blocks(gap_sync.best_queued_number + One::one())
.into_iter()
.map(|block_data| {
let justifications =
@@ -1434,6 +1434,12 @@ where
OnBlockData::Import(origin, new_blocks)
}
fn update_peer_common_number(&mut self, peer_id: &PeerId, new_common: NumberFor<B>) {
if let Some(peer) = self.peers.get_mut(peer_id) {
peer.update_common_number(new_common);
}
}
/// Handle a response from the remote to a justification request that we made.
///
/// `request` must be the original request that triggered `response`.
@@ -1515,9 +1521,12 @@ where
for (_, hash) in &results {
self.queue_blocks.remove(hash);
}
if let Some(from_hash) = results.first().map(|(_, h)| h) {
self.blocks.clear_queued(from_hash);
}
for (result, hash) in results {
if has_error {
continue
break
}
if result.is_err() {
@@ -1525,11 +1534,10 @@ where
}
match result {
Ok(BlockImportStatus::ImportedKnown(number, who)) => {
if let Some(peer) = who.and_then(|p| self.peers.get_mut(&p)) {
peer.update_common_number(number);
}
},
Ok(BlockImportStatus::ImportedKnown(number, who)) =>
if let Some(peer) = who {
self.update_peer_common_number(&peer, number);
},
Ok(BlockImportStatus::ImportedUnknown(number, aux, who)) => {
if aux.clear_justification_requests {
trace!(
@@ -1558,8 +1566,8 @@ where
}
}
if let Some(peer) = who.and_then(|p| self.peers.get_mut(&p)) {
peer.update_common_number(number);
if let Some(peer) = who {
self.update_peer_common_number(&peer, number);
}
let state_sync_complete =
self.state_sync.as_ref().map_or(false, |s| s.target() == hash);
@@ -1993,11 +2001,11 @@ where
// is either one further ahead or it's the one they just announced, if we know about it.
if is_best {
if known && self.best_queued_number >= number {
peer.update_common_number(number);
self.update_peer_common_number(&who, number);
} else if announce.header.parent_hash() == &self.best_queued_hash ||
known_parent && self.best_queued_number >= number
{
peer.update_common_number(number - One::one());
self.update_peer_common_number(&who, number - One::one());
}
}
self.allowed_requests.add(&who);
@@ -2071,7 +2079,7 @@ where
target.peers.remove(who);
!target.peers.is_empty()
});
let blocks = self.drain_blocks();
let blocks = self.ready_blocks();
if !blocks.is_empty() {
Some(self.validate_and_queue_blocks(blocks, false))
} else {
@@ -2191,10 +2199,10 @@ where
}
}
/// Drain the downloaded block set up to the first gap.
fn drain_blocks(&mut self) -> Vec<IncomingBlock<B>> {
/// Get the set of downloaded blocks that are ready to be queued for import.
fn ready_blocks(&mut self) -> Vec<IncomingBlock<B>> {
self.blocks
.drain(self.best_queued_number + One::one())
.ready_blocks(self.best_queued_number + One::one())
.into_iter()
.map(|block_data| {
let justifications = block_data
@@ -2981,6 +2989,25 @@ mod test {
best_block_num += MAX_BLOCKS_TO_REQUEST as u32;
let _ = sync.on_blocks_processed(
MAX_BLOCKS_TO_REQUEST as usize,
MAX_BLOCKS_TO_REQUEST as usize,
resp_blocks
.iter()
.rev()
.map(|b| {
(
Ok(BlockImportStatus::ImportedUnknown(
b.header().number().clone(),
Default::default(),
Some(peer_id1.clone()),
)),
b.hash(),
)
})
.collect(),
);
resp_blocks
.into_iter()
.rev()
@@ -3165,6 +3192,147 @@ mod test {
);
}
#[test]
fn syncs_fork_without_duplicate_requests() {
sp_tracing::try_init_simple();
let mut client = Arc::new(TestClientBuilder::new().build());
let blocks = (0..MAX_BLOCKS_TO_LOOK_BACKWARDS * 4)
.map(|_| build_block(&mut client, None, false))
.collect::<Vec<_>>();
let fork_blocks = {
let mut client = Arc::new(TestClientBuilder::new().build());
let fork_blocks = blocks[..MAX_BLOCKS_TO_LOOK_BACKWARDS as usize * 2]
.into_iter()
.inspect(|b| block_on(client.import(BlockOrigin::Own, (*b).clone())).unwrap())
.cloned()
.collect::<Vec<_>>();
fork_blocks
.into_iter()
.chain(
(0..MAX_BLOCKS_TO_LOOK_BACKWARDS * 2 + 1)
.map(|_| build_block(&mut client, None, true)),
)
.collect::<Vec<_>>()
};
let info = client.info();
let mut sync = ChainSync::new(
SyncMode::Full,
client.clone(),
Box::new(DefaultBlockAnnounceValidator),
5,
None,
)
.unwrap();
let finalized_block = blocks[MAX_BLOCKS_TO_LOOK_BACKWARDS as usize * 2 - 1].clone();
let just = (*b"TEST", Vec::new());
client
.finalize_block(BlockId::Hash(finalized_block.hash()), Some(just))
.unwrap();
sync.update_chain_info(&info.best_hash, info.best_number);
let peer_id1 = PeerId::random();
let common_block = blocks[MAX_BLOCKS_TO_LOOK_BACKWARDS as usize / 2].clone();
// Connect the node we will sync from
sync.new_peer(peer_id1.clone(), common_block.hash(), *common_block.header().number())
.unwrap();
send_block_announce(fork_blocks.last().unwrap().header().clone(), &peer_id1, &mut sync);
let mut request =
get_block_request(&mut sync, FromBlock::Number(info.best_number), 1, &peer_id1);
// Do the ancestor search
loop {
let block = &fork_blocks[unwrap_from_block_number(request.from.clone()) as usize - 1];
let response = create_block_response(vec![block.clone()]);
let on_block_data = sync.on_block_data(&peer_id1, Some(request), response).unwrap();
request = match on_block_data.into_request() {
Some(req) => req.1,
// We found the ancenstor
None => break,
};
log::trace!(target: "sync", "Request: {:?}", request);
}
// Now request and import the fork.
let mut best_block_num = finalized_block.header().number().clone() as u32;
let mut request = get_block_request(
&mut sync,
FromBlock::Number(MAX_BLOCKS_TO_REQUEST as u64 + best_block_num as u64),
MAX_BLOCKS_TO_REQUEST as u32,
&peer_id1,
);
let last_block_num = *fork_blocks.last().unwrap().header().number() as u32 - 1;
while best_block_num < last_block_num {
let from = unwrap_from_block_number(request.from.clone());
let mut resp_blocks = fork_blocks[best_block_num as usize..from as usize].to_vec();
resp_blocks.reverse();
let response = create_block_response(resp_blocks.clone());
let res = sync.on_block_data(&peer_id1, Some(request.clone()), response).unwrap();
assert!(matches!(
res,
OnBlockData::Import(_, blocks) if blocks.len() == MAX_BLOCKS_TO_REQUEST
),);
best_block_num += MAX_BLOCKS_TO_REQUEST as u32;
if best_block_num < last_block_num {
// make sure we're not getting a duplicate request in the time before the blocks are
// processed
request = get_block_request(
&mut sync,
FromBlock::Number(MAX_BLOCKS_TO_REQUEST as u64 + best_block_num as u64),
MAX_BLOCKS_TO_REQUEST as u32,
&peer_id1,
);
}
let _ = sync.on_blocks_processed(
MAX_BLOCKS_TO_REQUEST as usize,
MAX_BLOCKS_TO_REQUEST as usize,
resp_blocks
.iter()
.rev()
.map(|b| {
(
Ok(BlockImportStatus::ImportedUnknown(
b.header().number().clone(),
Default::default(),
Some(peer_id1.clone()),
)),
b.hash(),
)
})
.collect(),
);
resp_blocks
.into_iter()
.rev()
.for_each(|b| block_on(client.import(BlockOrigin::Own, b)).unwrap());
}
// Request the tip
get_block_request(
&mut sync,
FromBlock::Hash(fork_blocks.last().unwrap().hash()),
1,
&peer_id1,
);
}
#[test]
fn removes_target_fork_on_disconnect() {
sp_tracing::try_init_simple();