mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-05-09 16:27:58 +00:00
Sync: validate block responses for required data (#5052)
* Less verbose state-db logging * Validate block responses for block bodies * Update client/network/src/protocol.rs Co-Authored-By: Bastian Köcher <bkchr@users.noreply.github.com> * Added validation test * Disconnect on missing header as well * Typo Co-Authored-By: André Silva <andre.beat@gmail.com> Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com> Co-authored-by: André Silva <andre.beat@gmail.com>
This commit is contained in:
@@ -131,6 +131,8 @@ mod rep {
|
||||
pub const BAD_PROTOCOL: Rep = Rep::new_fatal("Unsupported protocol");
|
||||
/// Peer role does not match (e.g. light peer connecting to another light peer).
|
||||
pub const BAD_ROLE: Rep = Rep::new_fatal("Unsupported role");
|
||||
/// Peer response data does not have requested bits.
|
||||
pub const BAD_RESPONSE: Rep = Rep::new(-(1 << 12), "Incomplete response");
|
||||
}
|
||||
|
||||
// Lock must always be taken in order declared here.
|
||||
@@ -701,12 +703,14 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
|
||||
peer: PeerId,
|
||||
request: message::BlockRequest<B>
|
||||
) {
|
||||
trace!(target: "sync", "BlockRequest {} from {}: from {:?} to {:?} max {:?}",
|
||||
trace!(target: "sync", "BlockRequest {} from {}: from {:?} to {:?} max {:?} for {:?}",
|
||||
request.id,
|
||||
peer,
|
||||
request.from,
|
||||
request.to,
|
||||
request.max);
|
||||
request.max,
|
||||
request.fields,
|
||||
);
|
||||
|
||||
// sending block requests to the node that is unable to serve it is considered a bad behavior
|
||||
if !self.config.roles.is_full() {
|
||||
@@ -754,6 +758,11 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
|
||||
message_queue: None,
|
||||
justification,
|
||||
};
|
||||
// Stop if we don't have requested block body
|
||||
if get_body && block_data.body.is_none() {
|
||||
trace!(target: "sync", "Missing data for block request.");
|
||||
break;
|
||||
}
|
||||
blocks.push(block_data);
|
||||
match request.direction {
|
||||
message::Direction::Ascending => id = BlockId::Number(number + One::one()),
|
||||
@@ -784,7 +793,7 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
|
||||
request: message::BlockRequest<B>,
|
||||
response: message::BlockResponse<B>,
|
||||
) -> CustomMessageOutcome<B> {
|
||||
let blocks_range = match (
|
||||
let blocks_range = || match (
|
||||
response.blocks.first().and_then(|b| b.header.as_ref().map(|h| h.number())),
|
||||
response.blocks.last().and_then(|b| b.header.as_ref().map(|h| h.number())),
|
||||
) {
|
||||
@@ -796,7 +805,7 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
|
||||
response.id,
|
||||
peer,
|
||||
response.blocks.len(),
|
||||
blocks_range
|
||||
blocks_range(),
|
||||
);
|
||||
|
||||
if request.fields == message::BlockAttributes::JUSTIFICATION {
|
||||
@@ -811,6 +820,20 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// Validate fields against the request.
|
||||
if request.fields.contains(message::BlockAttributes::HEADER) && response.blocks.iter().any(|b| b.header.is_none()) {
|
||||
self.behaviour.disconnect_peer(&peer);
|
||||
self.peerset_handle.report_peer(peer, rep::BAD_RESPONSE);
|
||||
trace!(target: "sync", "Missing header for a block");
|
||||
return CustomMessageOutcome::None
|
||||
}
|
||||
if request.fields.contains(message::BlockAttributes::BODY) && response.blocks.iter().any(|b| b.body.is_none()) {
|
||||
self.behaviour.disconnect_peer(&peer);
|
||||
self.peerset_handle.report_peer(peer, rep::BAD_RESPONSE);
|
||||
trace!(target: "sync", "Missing body for a block");
|
||||
return CustomMessageOutcome::None
|
||||
}
|
||||
|
||||
match self.sync.on_block_data(peer, Some(request), response) {
|
||||
Ok(sync::OnBlockData::Import(origin, blocks)) =>
|
||||
CustomMessageOutcome::BlockImport(origin, blocks),
|
||||
|
||||
@@ -751,7 +751,7 @@ impl<B: BlockT> ChainSync<B> {
|
||||
| PeerSyncState::DownloadingFinalityProof(..) => Vec::new()
|
||||
}
|
||||
} else {
|
||||
// When request.is_none() just accept blocks
|
||||
// When request.is_none() this is a block announcement. Just accept blocks.
|
||||
blocks.into_iter().map(|b| {
|
||||
IncomingBlock {
|
||||
hash: b.hash,
|
||||
|
||||
@@ -237,7 +237,7 @@ impl<D> Peer<D> {
|
||||
where F: FnMut(BlockBuilder<Block, PeersFullClient, substrate_test_runtime_client::Backend>) -> Block
|
||||
{
|
||||
let best_hash = self.client.info().best_hash;
|
||||
self.generate_blocks_at(BlockId::Hash(best_hash), count, origin, edit_block)
|
||||
self.generate_blocks_at(BlockId::Hash(best_hash), count, origin, edit_block, false)
|
||||
}
|
||||
|
||||
/// Add blocks to the peer -- edit the block before adding. The chain will
|
||||
@@ -247,7 +247,8 @@ impl<D> Peer<D> {
|
||||
at: BlockId<Block>,
|
||||
count: usize,
|
||||
origin: BlockOrigin,
|
||||
mut edit_block: F
|
||||
mut edit_block: F,
|
||||
headers_only: bool,
|
||||
) -> H256 where F: FnMut(BlockBuilder<Block, PeersFullClient, substrate_test_runtime_client::Backend>) -> Block {
|
||||
let full_client = self.client.as_full()
|
||||
.expect("blocks could only be generated by full clients");
|
||||
@@ -272,7 +273,7 @@ impl<D> Peer<D> {
|
||||
origin,
|
||||
header.clone(),
|
||||
None,
|
||||
Some(block.extrinsics)
|
||||
if headers_only { None } else { Some(block.extrinsics) },
|
||||
).unwrap();
|
||||
let cache = if let Some(cache) = cache {
|
||||
cache.into_iter().collect()
|
||||
@@ -294,28 +295,46 @@ impl<D> Peer<D> {
|
||||
self.push_blocks_at(BlockId::Hash(best_hash), count, with_tx)
|
||||
}
|
||||
|
||||
/// Push blocks to the peer (simplified: with or without a TX)
|
||||
pub fn push_headers(&mut self, count: usize) -> H256 {
|
||||
let best_hash = self.client.info().best_hash;
|
||||
self.generate_tx_blocks_at(BlockId::Hash(best_hash), count, false, true)
|
||||
}
|
||||
|
||||
/// Push blocks to the peer (simplified: with or without a TX) starting from
|
||||
/// given hash.
|
||||
pub fn push_blocks_at(&mut self, at: BlockId<Block>, count: usize, with_tx: bool) -> H256 {
|
||||
self.generate_tx_blocks_at(at, count, with_tx, false)
|
||||
}
|
||||
|
||||
/// Push blocks/headers to the peer (simplified: with or without a TX) starting from
|
||||
/// given hash.
|
||||
fn generate_tx_blocks_at(&mut self, at: BlockId<Block>, count: usize, with_tx: bool, headers_only:bool) -> H256 {
|
||||
let mut nonce = 0;
|
||||
if with_tx {
|
||||
self.generate_blocks_at(at, count, BlockOrigin::File, |mut builder| {
|
||||
let transfer = Transfer {
|
||||
from: AccountKeyring::Alice.into(),
|
||||
to: AccountKeyring::Alice.into(),
|
||||
amount: 1,
|
||||
nonce,
|
||||
};
|
||||
builder.push(transfer.into_signed_tx()).unwrap();
|
||||
nonce = nonce + 1;
|
||||
builder.build().unwrap().block
|
||||
})
|
||||
self.generate_blocks_at(
|
||||
at,
|
||||
count,
|
||||
BlockOrigin::File, |mut builder| {
|
||||
let transfer = Transfer {
|
||||
from: AccountKeyring::Alice.into(),
|
||||
to: AccountKeyring::Alice.into(),
|
||||
amount: 1,
|
||||
nonce,
|
||||
};
|
||||
builder.push(transfer.into_signed_tx()).unwrap();
|
||||
nonce = nonce + 1;
|
||||
builder.build().unwrap().block
|
||||
},
|
||||
headers_only
|
||||
)
|
||||
} else {
|
||||
self.generate_blocks_at(
|
||||
at,
|
||||
count,
|
||||
BlockOrigin::File,
|
||||
|builder| builder.build().unwrap().block,
|
||||
headers_only,
|
||||
)
|
||||
}
|
||||
}
|
||||
@@ -748,6 +767,23 @@ pub trait TestNetFactory: Sized {
|
||||
Async::Ready(())
|
||||
}
|
||||
|
||||
/// Polls the testnet until theres' no activiy of any kind.
|
||||
///
|
||||
/// Must be executed in a task context.
|
||||
fn poll_until_idle(&mut self) -> Async<()> {
|
||||
self.poll();
|
||||
|
||||
for peer in self.peers().iter() {
|
||||
if peer.is_major_syncing() || peer.network.num_queued_blocks() != 0 {
|
||||
return Async::NotReady
|
||||
}
|
||||
if peer.network.num_sync_requests() != 0 {
|
||||
return Async::NotReady
|
||||
}
|
||||
}
|
||||
Async::Ready(())
|
||||
}
|
||||
|
||||
/// Blocks the current thread until we are sync'ed.
|
||||
///
|
||||
/// Calls `poll_until_sync` repeatedly with the runtime passed as parameter.
|
||||
@@ -755,6 +791,13 @@ pub trait TestNetFactory: Sized {
|
||||
runtime.block_on(futures::future::poll_fn::<(), (), _>(|| Ok(self.poll_until_sync()))).unwrap();
|
||||
}
|
||||
|
||||
/// Blocks the current thread until there are no pending packets.
|
||||
///
|
||||
/// Calls `poll_until_idle` repeatedly with the runtime passed as parameter.
|
||||
fn block_until_idle(&mut self, runtime: &mut tokio::runtime::current_thread::Runtime) {
|
||||
runtime.block_on(futures::future::poll_fn::<(), (), _>(|| Ok(self.poll_until_idle()))).unwrap();
|
||||
}
|
||||
|
||||
/// Polls the testnet. Processes all the pending actions and returns `NotReady`.
|
||||
fn poll(&mut self) {
|
||||
self.mut_peers(|peers| {
|
||||
|
||||
@@ -660,3 +660,24 @@ fn does_not_sync_announced_old_best_block() {
|
||||
})).unwrap();
|
||||
assert!(!net.peer(1).is_major_syncing());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn full_sync_requires_block_body() {
|
||||
// Check that we don't sync headers-only in full mode.
|
||||
let _ = ::env_logger::try_init();
|
||||
let mut runtime = current_thread::Runtime::new().unwrap();
|
||||
let mut net = TestNet::new(2);
|
||||
|
||||
net.peer(0).push_headers(1);
|
||||
// Wait for nodes to connect
|
||||
runtime.block_on(futures::future::poll_fn::<(), (), _>(|| -> Result<_, ()> {
|
||||
net.poll();
|
||||
if net.peer(0).num_peers() == 0 || net.peer(1).num_peers() == 0 {
|
||||
Ok(Async::NotReady)
|
||||
} else {
|
||||
Ok(Async::Ready(()))
|
||||
}
|
||||
})).unwrap();
|
||||
net.block_until_idle(&mut runtime);
|
||||
assert_eq!(net.peer(1).client.info().best_number, 0);
|
||||
}
|
||||
|
||||
@@ -340,7 +340,7 @@ impl<BlockHash: Hash, Key: Hash> StateDbSync<BlockHash, Key> {
|
||||
{
|
||||
let refs = self.pinned.entry(hash.clone()).or_default();
|
||||
if *refs == 0 {
|
||||
trace!(target: "state-db", "Pinned block: {:?}", hash);
|
||||
trace!(target: "state-db-pin", "Pinned block: {:?}", hash);
|
||||
self.non_canonical.pin(hash);
|
||||
}
|
||||
*refs += 1;
|
||||
@@ -357,11 +357,11 @@ impl<BlockHash: Hash, Key: Hash> StateDbSync<BlockHash, Key> {
|
||||
Entry::Occupied(mut entry) => {
|
||||
*entry.get_mut() -= 1;
|
||||
if *entry.get() == 0 {
|
||||
trace!(target: "state-db", "Unpinned block: {:?}", hash);
|
||||
trace!(target: "state-db-pin", "Unpinned block: {:?}", hash);
|
||||
entry.remove();
|
||||
self.non_canonical.unpin(hash);
|
||||
} else {
|
||||
trace!(target: "state-db", "Releasing reference for {:?}", hash);
|
||||
trace!(target: "state-db-pin", "Releasing reference for {:?}", hash);
|
||||
}
|
||||
},
|
||||
Entry::Vacant(_) => {},
|
||||
|
||||
@@ -436,7 +436,7 @@ impl<BlockHash: Hash, Key: Hash> NonCanonicalOverlay<BlockHash, Key> {
|
||||
while let Some(hash) = parent {
|
||||
let refs = self.pinned.entry(hash.clone()).or_default();
|
||||
if *refs == 0 {
|
||||
trace!(target: "state-db", "Pinned non-canon block: {:?}", hash);
|
||||
trace!(target: "state-db-pin", "Pinned non-canon block: {:?}", hash);
|
||||
}
|
||||
*refs += 1;
|
||||
parent = self.parents.get(hash);
|
||||
@@ -455,7 +455,7 @@ impl<BlockHash: Hash, Key: Hash> NonCanonicalOverlay<BlockHash, Key> {
|
||||
if *entry.get() == 0 {
|
||||
entry.remove();
|
||||
if let Some(inserted) = self.pinned_insertions.remove(&hash) {
|
||||
trace!(target: "state-db", "Discarding unpinned non-canon block: {:?}", hash);
|
||||
trace!(target: "state-db-pin", "Discarding unpinned non-canon block: {:?}", hash);
|
||||
discard_values(&mut self.values, inserted);
|
||||
self.parents.remove(&hash);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user