mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-04-26 12:17:58 +00:00
Use block requests to check if block responses are correct (#7653)
* Use block requests to check if block responses are correct Before this pr sync relied on recently announced blocks to check if a given peer response is correct. However this could lead to situations where we requested a block from a peer and it gave us the requested, but we rejected the response because this peer never send us an announcement for the given block. See the added tests for a reproduction of the problem. With this pr, we now take the block request to check if a given response matches the request. A node should not send us a block response without a request anyway. Essentially there is still a bug, because as you see in the test, we are requesting block 2, while we already have this block imported. It even happens that we request a block from the network that we have authored. However a fix for this would require some more refactoring of the sync code. * Revert change * Give the test a proper name * Add moar logging * Move cheaper checks * Move checks to common place
This commit is contained in:
@@ -34,8 +34,8 @@ use sp_consensus::{BlockOrigin, BlockStatus,
|
||||
block_validation::{BlockAnnounceValidator, Validation},
|
||||
import_queue::{IncomingBlock, BlockImportResult, BlockImportError}
|
||||
};
|
||||
use crate::{
|
||||
protocol::message::{self, BlockAnnounce, BlockAttributes, BlockRequest, BlockResponse, Roles},
|
||||
use crate::protocol::message::{
|
||||
self, BlockAnnounce, BlockAttributes, BlockRequest, BlockResponse, Roles,
|
||||
};
|
||||
use either::Either;
|
||||
use extra_requests::ExtraRequests;
|
||||
@@ -44,12 +44,14 @@ use log::{debug, trace, warn, info, error};
|
||||
use sp_runtime::{
|
||||
Justification,
|
||||
generic::BlockId,
|
||||
traits::{Block as BlockT, Header, NumberFor, Zero, One, CheckedSub, SaturatedConversion, Hash, HashFor}
|
||||
traits::{
|
||||
Block as BlockT, Header as HeaderT, NumberFor, Zero, One, CheckedSub, SaturatedConversion,
|
||||
Hash, HashFor,
|
||||
},
|
||||
};
|
||||
use sp_arithmetic::traits::Saturating;
|
||||
use std::{
|
||||
fmt, ops::Range, collections::{HashMap, hash_map::Entry, HashSet, VecDeque},
|
||||
sync::Arc, pin::Pin,
|
||||
fmt, ops::Range, collections::{HashMap, hash_map::Entry, HashSet}, sync::Arc, pin::Pin,
|
||||
};
|
||||
use futures::{task::Poll, Future, stream::FuturesUnordered, FutureExt, StreamExt};
|
||||
|
||||
@@ -83,9 +85,6 @@ const MAX_CONCURRENT_BLOCK_ANNOUNCE_VALIDATIONS_PER_PEER: usize = 4;
|
||||
/// so far behind.
|
||||
const MAJOR_SYNC_BLOCKS: u8 = 5;
|
||||
|
||||
/// Number of recently announced blocks to track for each peer.
|
||||
const ANNOUNCE_HISTORY_SIZE: usize = 64;
|
||||
|
||||
mod rep {
|
||||
use sc_peerset::ReputationChange as Rep;
|
||||
/// Reputation change when a peer sent us a message that led to a
|
||||
@@ -108,14 +107,17 @@ mod rep {
|
||||
/// Peer did not provide us with advertised block data.
|
||||
pub const NO_BLOCK: Rep = Rep::new(-(1 << 29), "No requested block data");
|
||||
|
||||
/// Reputation change for peers which send us a known block.
|
||||
pub const KNOWN_BLOCK: Rep = Rep::new(-(1 << 29), "Duplicate block");
|
||||
/// Reputation change for peers which send us non-requested block data.
|
||||
pub const NOT_REQUESTED: Rep = Rep::new(-(1 << 29), "Not requested block data");
|
||||
|
||||
/// Reputation change for peers which send us a block with bad justifications.
|
||||
pub const BAD_JUSTIFICATION: Rep = Rep::new(-(1 << 16), "Bad justification");
|
||||
|
||||
/// Reputation change when a peer sent us invlid ancestry result.
|
||||
pub const UNKNOWN_ANCESTOR:Rep = Rep::new(-(1 << 16), "DB Error");
|
||||
|
||||
/// Peer response data does not have requested bits.
|
||||
pub const BAD_RESPONSE: Rep = Rep::new(-(1 << 12), "Incomplete response");
|
||||
}
|
||||
|
||||
enum PendingRequests {
|
||||
@@ -219,9 +221,6 @@ pub struct PeerSync<B: BlockT> {
|
||||
/// The state of syncing this peer is in for us, generally categories
|
||||
/// into `Available` or "busy" with something as defined by `PeerSyncState`.
|
||||
pub state: PeerSyncState<B>,
|
||||
/// A queue of blocks that this peer has announced to us, should only
|
||||
/// contain `ANNOUNCE_HISTORY_SIZE` entries.
|
||||
pub recently_announced: VecDeque<B::Hash>
|
||||
}
|
||||
|
||||
/// The sync status of a peer we are trying to sync with
|
||||
@@ -514,7 +513,6 @@ impl<B: BlockT> ChainSync<B> {
|
||||
best_hash,
|
||||
best_number,
|
||||
state: PeerSyncState::Available,
|
||||
recently_announced: Default::default()
|
||||
});
|
||||
return Ok(None)
|
||||
}
|
||||
@@ -527,7 +525,6 @@ impl<B: BlockT> ChainSync<B> {
|
||||
best_hash,
|
||||
best_number,
|
||||
state: PeerSyncState::Available,
|
||||
recently_announced: Default::default(),
|
||||
});
|
||||
self.pending_requests.add(&who);
|
||||
return Ok(None)
|
||||
@@ -551,7 +548,6 @@ impl<B: BlockT> ChainSync<B> {
|
||||
start: self.best_queued_number,
|
||||
state: AncestorSearchState::ExponentialBackoff(One::one()),
|
||||
},
|
||||
recently_announced: Default::default()
|
||||
});
|
||||
|
||||
Ok(Some(ancestry_request::<B>(common_best)))
|
||||
@@ -563,7 +559,6 @@ impl<B: BlockT> ChainSync<B> {
|
||||
best_hash,
|
||||
best_number,
|
||||
state: PeerSyncState::Available,
|
||||
recently_announced: Default::default(),
|
||||
});
|
||||
self.pending_requests.add(&who);
|
||||
Ok(None)
|
||||
@@ -751,13 +746,13 @@ impl<B: BlockT> ChainSync<B> {
|
||||
blocks.reverse()
|
||||
}
|
||||
self.pending_requests.add(who);
|
||||
if request.is_some() {
|
||||
if let Some(request) = request {
|
||||
match &mut peer.state {
|
||||
PeerSyncState::DownloadingNew(start_block) => {
|
||||
self.blocks.clear_peer_download(who);
|
||||
let start_block = *start_block;
|
||||
peer.state = PeerSyncState::Available;
|
||||
validate_blocks::<B>(&blocks, who)?;
|
||||
validate_blocks::<B>(&blocks, who, Some(request))?;
|
||||
self.blocks.insert(start_block, blocks, who.clone());
|
||||
self.blocks
|
||||
.drain(self.best_queued_number + One::one())
|
||||
@@ -780,7 +775,7 @@ impl<B: BlockT> ChainSync<B> {
|
||||
debug!(target: "sync", "Empty block response from {}", who);
|
||||
return Err(BadPeer(who.clone(), rep::NO_BLOCK));
|
||||
}
|
||||
validate_blocks::<B>(&blocks, who)?;
|
||||
validate_blocks::<B>(&blocks, who, Some(request))?;
|
||||
blocks.into_iter().map(|b| {
|
||||
IncomingBlock {
|
||||
hash: b.hash,
|
||||
@@ -864,7 +859,7 @@ impl<B: BlockT> ChainSync<B> {
|
||||
}
|
||||
} else {
|
||||
// When request.is_none() this is a block announcement. Just accept blocks.
|
||||
validate_blocks::<B>(&blocks, who)?;
|
||||
validate_blocks::<B>(&blocks, who, None)?;
|
||||
blocks.into_iter().map(|b| {
|
||||
IncomingBlock {
|
||||
hash: b.hash,
|
||||
@@ -878,40 +873,30 @@ impl<B: BlockT> ChainSync<B> {
|
||||
}).collect()
|
||||
}
|
||||
} else {
|
||||
Vec::new()
|
||||
// We don't know of this peer, so we also did not request anything from it.
|
||||
return Err(BadPeer(who.clone(), rep::NOT_REQUESTED));
|
||||
};
|
||||
|
||||
// When doing initial sync we don't request blocks in parallel.
|
||||
// So the only way this can happen is when peers lie about the
|
||||
// common block.
|
||||
let is_recent = new_blocks.first()
|
||||
.map(|block| {
|
||||
self.peers.iter().any(|(_, peer)| peer.recently_announced.contains(&block.hash))
|
||||
})
|
||||
.unwrap_or(false);
|
||||
|
||||
if !is_recent && new_blocks.last().map_or(false, |b| self.is_known(&b.hash)) {
|
||||
// When doing initial sync we don't request blocks in parallel.
|
||||
// So the only way this can happen is when peers lie about the
|
||||
// common block.
|
||||
debug!(target: "sync", "Ignoring known blocks from {}", who);
|
||||
return Err(BadPeer(who.clone(), rep::KNOWN_BLOCK));
|
||||
}
|
||||
let orig_len = new_blocks.len();
|
||||
new_blocks.retain(|b| !self.queue_blocks.contains(&b.hash));
|
||||
if new_blocks.len() != orig_len {
|
||||
debug!(target: "sync", "Ignoring {} blocks that are already queued", orig_len - new_blocks.len());
|
||||
}
|
||||
|
||||
let origin =
|
||||
if is_recent {
|
||||
BlockOrigin::NetworkBroadcast
|
||||
} else {
|
||||
BlockOrigin::NetworkInitialSync
|
||||
};
|
||||
let origin = if self.status().state != SyncState::Downloading {
|
||||
BlockOrigin::NetworkBroadcast
|
||||
} else {
|
||||
BlockOrigin::NetworkInitialSync
|
||||
};
|
||||
|
||||
if let Some((h, n)) = new_blocks.last().and_then(|b| b.header.as_ref().map(|h| (&b.hash, *h.number()))) {
|
||||
trace!(target:"sync", "Accepted {} blocks ({:?}) with origin {:?}", new_blocks.len(), h, origin);
|
||||
trace!(
|
||||
target:"sync",
|
||||
"Accepted {} blocks ({:?}) with origin {:?}",
|
||||
new_blocks.len(),
|
||||
h,
|
||||
origin,
|
||||
);
|
||||
self.on_block_queued(h, n)
|
||||
}
|
||||
|
||||
@@ -1320,11 +1305,6 @@ impl<B: BlockT> ChainSync<B> {
|
||||
return PollBlockAnnounceValidation::Nothing { is_best, who, header }
|
||||
};
|
||||
|
||||
while peer.recently_announced.len() >= ANNOUNCE_HISTORY_SIZE {
|
||||
peer.recently_announced.pop_front();
|
||||
}
|
||||
peer.recently_announced.push_back(hash.clone());
|
||||
|
||||
if is_best {
|
||||
// update their best block
|
||||
peer.best_number = number;
|
||||
@@ -1600,8 +1580,7 @@ fn fork_sync_request<B: BlockT>(
|
||||
finalized: NumberFor<B>,
|
||||
attributes: &message::BlockAttributes,
|
||||
check_block: impl Fn(&B::Hash) -> BlockStatus,
|
||||
) -> Option<(B::Hash, BlockRequest<B>)>
|
||||
{
|
||||
) -> Option<(B::Hash, BlockRequest<B>)> {
|
||||
targets.retain(|hash, r| {
|
||||
if r.number <= finalized {
|
||||
trace!(target: "sync", "Removed expired fork sync request {:?} (#{})", hash, r.number);
|
||||
@@ -1654,7 +1633,75 @@ fn is_descendent_of<Block, T>(client: &T, base: &Block::Hash, block: &Block::Has
|
||||
Ok(ancestor.hash == *base)
|
||||
}
|
||||
|
||||
fn validate_blocks<Block: BlockT>(blocks: &Vec<message::BlockData<Block>>, who: &PeerId) -> Result<(), BadPeer> {
|
||||
/// Validate that the given `blocks` are correct.
|
||||
///
|
||||
/// It is expected that `blocks` are in asending order.
|
||||
fn validate_blocks<Block: BlockT>(
|
||||
blocks: &Vec<message::BlockData<Block>>,
|
||||
who: &PeerId,
|
||||
request: Option<BlockRequest<Block>>,
|
||||
) -> Result<(), BadPeer> {
|
||||
if let Some(request) = request {
|
||||
if Some(blocks.len() as _) > request.max {
|
||||
debug!(
|
||||
target: "sync",
|
||||
"Received more blocks than requested from {}. Expected in maximum {:?}, got {}.",
|
||||
who,
|
||||
request.max,
|
||||
blocks.len(),
|
||||
);
|
||||
|
||||
return Err(BadPeer(who.clone(), rep::NOT_REQUESTED))
|
||||
}
|
||||
|
||||
let block_header = if request.direction == message::Direction::Descending {
|
||||
blocks.last()
|
||||
} else {
|
||||
blocks.first()
|
||||
}.and_then(|b| b.header.as_ref());
|
||||
|
||||
let expected_block = block_header.as_ref()
|
||||
.map_or(false, |h| match request.from {
|
||||
message::FromBlock::Hash(hash) => h.hash() == hash,
|
||||
message::FromBlock::Number(n) => h.number() == &n,
|
||||
});
|
||||
|
||||
if !expected_block {
|
||||
debug!(
|
||||
target: "sync",
|
||||
"Received block that was not requested. Requested {:?}, got {:?}.",
|
||||
request.from,
|
||||
block_header,
|
||||
);
|
||||
|
||||
return Err(BadPeer(who.clone(), rep::NOT_REQUESTED))
|
||||
}
|
||||
|
||||
if request.fields.contains(message::BlockAttributes::HEADER)
|
||||
&& blocks.iter().any(|b| b.header.is_none())
|
||||
{
|
||||
trace!(
|
||||
target: "sync",
|
||||
"Missing requested header for a block in response from {}.",
|
||||
who,
|
||||
);
|
||||
|
||||
return Err(BadPeer(who.clone(), rep::BAD_RESPONSE))
|
||||
}
|
||||
|
||||
if request.fields.contains(message::BlockAttributes::BODY)
|
||||
&& blocks.iter().any(|b| b.body.is_none())
|
||||
{
|
||||
trace!(
|
||||
target: "sync",
|
||||
"Missing requested body for a block in response from {}.",
|
||||
who,
|
||||
);
|
||||
|
||||
return Err(BadPeer(who.clone(), rep::BAD_RESPONSE))
|
||||
}
|
||||
}
|
||||
|
||||
for b in blocks {
|
||||
if let Some(header) = &b.header {
|
||||
let hash = header.hash();
|
||||
@@ -1685,20 +1732,23 @@ fn validate_blocks<Block: BlockT>(blocks: &Vec<message::BlockData<Block>>, who:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::message::FromBlock;
|
||||
use super::message::{FromBlock, BlockState, BlockData};
|
||||
use super::*;
|
||||
use sc_block_builder::BlockBuilderProvider;
|
||||
use sp_blockchain::HeaderBackend;
|
||||
use sp_consensus::block_validation::DefaultBlockAnnounceValidator;
|
||||
use substrate_test_runtime_client::{
|
||||
runtime::{Block, Hash},
|
||||
runtime::{Block, Hash, Header},
|
||||
ClientBlockImportExt, DefaultTestClientBuilderExt, TestClientBuilder, TestClientBuilderExt,
|
||||
BlockBuilderExt,
|
||||
};
|
||||
use futures::{future::poll_fn, executor::block_on};
|
||||
|
||||
#[test]
|
||||
fn processes_empty_response_on_justification_request_for_unknown_block() {
|
||||
@@ -1846,4 +1896,177 @@ mod test {
|
||||
PeerSyncState::DownloadingJustification(b1_hash),
|
||||
);
|
||||
}
|
||||
|
||||
/// Send a block annoucnement for the given `header`.
|
||||
fn send_block_announce(
|
||||
header: Header,
|
||||
peer_id: &PeerId,
|
||||
sync: &mut ChainSync<Block>,
|
||||
) {
|
||||
let block_annnounce = BlockAnnounce {
|
||||
header: header.clone(),
|
||||
state: Some(BlockState::Best),
|
||||
data: Some(Vec::new()),
|
||||
};
|
||||
|
||||
sync.push_block_announce_validation(
|
||||
peer_id.clone(),
|
||||
header.hash(),
|
||||
block_annnounce,
|
||||
true,
|
||||
);
|
||||
|
||||
// Poll until we have procssed the block announcement
|
||||
block_on(poll_fn(|cx| loop {
|
||||
if sync.poll_block_announce_validation(cx).is_pending() {
|
||||
break Poll::Ready(())
|
||||
}
|
||||
}))
|
||||
}
|
||||
|
||||
/// Create a block response from the given `blocks`.
|
||||
fn create_block_response(blocks: Vec<Block>) -> BlockResponse<Block> {
|
||||
BlockResponse::<Block> {
|
||||
id: 0,
|
||||
blocks: blocks.into_iter().map(|b|
|
||||
BlockData::<Block> {
|
||||
hash: b.hash(),
|
||||
header: Some(b.header().clone()),
|
||||
body: Some(b.deconstruct().1),
|
||||
receipt: None,
|
||||
message_queue: None,
|
||||
justification: None,
|
||||
}
|
||||
).collect(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Get a block request from `sync` and check that is matches the expected request.
|
||||
fn get_block_request(
|
||||
sync: &mut ChainSync<Block>,
|
||||
from: message::FromBlock<Hash, u64>,
|
||||
max: u32,
|
||||
peer: &PeerId,
|
||||
) -> BlockRequest<Block> {
|
||||
let requests = sync.block_requests().collect::<Vec<_>>();
|
||||
assert_eq!(1, requests.len());
|
||||
assert_eq!(peer, requests[0].0);
|
||||
|
||||
let request = requests[0].1.clone();
|
||||
|
||||
assert_eq!(from, request.from);
|
||||
assert_eq!(Some(max), request.max);
|
||||
request
|
||||
}
|
||||
|
||||
/// This test is a regression test as observed on a real network.
|
||||
///
|
||||
/// The node is connected to multiple peers. Both of these peers are having a best block (1) that
|
||||
/// is below our best block (3). Now peer 2 announces a fork of block 3 that we will
|
||||
/// request from peer 2. After imporitng the fork, peer 2 and then peer 1 will announce block 4.
|
||||
/// But as peer 1 in our view is still at block 1, we will request block 2 (which we already have)
|
||||
/// from it. In the meanwhile peer 2 sends us block 4 and 3 and we send another request for block
|
||||
/// 2 to peer 2. Peer 1 answers with block 2 and then peer 2. This will need to succeed, as we
|
||||
/// have requested block 2 from both peers.
|
||||
#[test]
|
||||
fn do_not_report_peer_on_block_response_for_block_request() {
|
||||
sp_tracing::try_init_simple();
|
||||
|
||||
let mut client = Arc::new(TestClientBuilder::new().build());
|
||||
let info = client.info();
|
||||
|
||||
let mut sync = ChainSync::new(
|
||||
Roles::AUTHORITY,
|
||||
client.clone(),
|
||||
&info,
|
||||
Box::new(DefaultBlockAnnounceValidator),
|
||||
5,
|
||||
);
|
||||
|
||||
let peer_id1 = PeerId::random();
|
||||
let peer_id2 = PeerId::random();
|
||||
|
||||
let mut client2 = client.clone();
|
||||
let mut build_block = || {
|
||||
let block = client2.new_block(Default::default()).unwrap().build().unwrap().block;
|
||||
client2.import(BlockOrigin::Own, block.clone()).unwrap();
|
||||
|
||||
block
|
||||
};
|
||||
|
||||
let mut client2 = client.clone();
|
||||
let mut build_block_at = |at, import| {
|
||||
let mut block_builder = client2.new_block_at(&BlockId::Hash(at), Default::default(), false)
|
||||
.unwrap();
|
||||
// Make sure we generate a different block as fork
|
||||
block_builder.push_storage_change(vec![1, 2, 3], Some(vec![4, 5, 6])).unwrap();
|
||||
|
||||
let block = block_builder.build().unwrap().block;
|
||||
|
||||
if import {
|
||||
client2.import(BlockOrigin::Own, block.clone()).unwrap();
|
||||
}
|
||||
|
||||
block
|
||||
};
|
||||
|
||||
let block1 = build_block();
|
||||
let block2 = build_block();
|
||||
let block3 = build_block();
|
||||
let block3_fork = build_block_at(block2.hash(), false);
|
||||
|
||||
// Add two peers which are on block 1.
|
||||
sync.new_peer(peer_id1.clone(), block1.hash(), 1).unwrap();
|
||||
sync.new_peer(peer_id2.clone(), block1.hash(), 1).unwrap();
|
||||
|
||||
// Tell sync that our best block is 3.
|
||||
sync.update_chain_info(&block3.hash(), 3);
|
||||
|
||||
// There should be no requests.
|
||||
assert!(sync.block_requests().collect::<Vec<_>>().is_empty());
|
||||
|
||||
// Let peer2 announce a fork of block 3
|
||||
send_block_announce(block3_fork.header().clone(), &peer_id2, &mut sync);
|
||||
|
||||
// Import and tell sync that we now have the fork.
|
||||
client.import(BlockOrigin::Own, block3_fork.clone()).unwrap();
|
||||
sync.update_chain_info(&block3_fork.hash(), 3);
|
||||
|
||||
let block4 = build_block_at(block3_fork.hash(), false);
|
||||
|
||||
// Let peer2 announce block 4 and check that sync wants to get the block.
|
||||
send_block_announce(block4.header().clone(), &peer_id2, &mut sync);
|
||||
|
||||
let request = get_block_request(&mut sync, FromBlock::Hash(block4.hash()), 2, &peer_id2);
|
||||
|
||||
// Peer1 announces the same block, but as the common block is still `1`, sync will request
|
||||
// block 2 again.
|
||||
send_block_announce(block4.header().clone(), &peer_id1, &mut sync);
|
||||
|
||||
let request2 = get_block_request(&mut sync, FromBlock::Number(2), 1, &peer_id1);
|
||||
|
||||
let response = create_block_response(vec![block4.clone(), block3_fork.clone()]);
|
||||
let res = sync.on_block_data(&peer_id2, Some(request), response).unwrap();
|
||||
|
||||
// We should not yet import the blocks, because there is still an open request for fetching
|
||||
// block `2` which blocks the import.
|
||||
assert!(matches!(res, OnBlockData::Import(_, blocks) if blocks.is_empty()));
|
||||
|
||||
let request3 = get_block_request(&mut sync, FromBlock::Number(2), 1, &peer_id2);
|
||||
|
||||
let response = create_block_response(vec![block2.clone()]);
|
||||
let res = sync.on_block_data(&peer_id1, Some(request2), response).unwrap();
|
||||
assert!(
|
||||
matches!(
|
||||
res,
|
||||
OnBlockData::Import(_, blocks)
|
||||
if blocks.iter().all(|b| [2, 3, 4].contains(b.header.as_ref().unwrap().number()))
|
||||
)
|
||||
);
|
||||
|
||||
let response = create_block_response(vec![block2.clone()]);
|
||||
let res = sync.on_block_data(&peer_id2, Some(request3), response).unwrap();
|
||||
// Nothing to import
|
||||
assert!(matches!(res, OnBlockData::Import(_, blocks) if blocks.is_empty()));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -549,7 +549,6 @@ mod tests {
|
||||
best_hash: Hash::random(),
|
||||
best_number: g.gen(),
|
||||
state: ArbitraryPeerSyncState::arbitrary(g).0,
|
||||
recently_announced: Default::default()
|
||||
};
|
||||
ArbitraryPeerSync(ps)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user