Extract block announce validation from ChainSync (#1170)

This commit is contained in:
Dmitry Markin
2023-09-04 18:27:53 +03:00
committed by GitHub
parent ddab7156b4
commit 01cdae878d
7 changed files with 731 additions and 550 deletions
+7 -61
View File
@@ -22,12 +22,12 @@ pub mod message;
pub mod metrics;
pub mod warp;
use crate::{role::Roles, types::ReputationChange};
use crate::{role::Roles, sync::message::BlockAnnounce, types::ReputationChange};
use futures::Stream;
use libp2p_identity::PeerId;
use message::{BlockAnnounce, BlockData, BlockRequest, BlockResponse};
use message::{BlockData, BlockRequest, BlockResponse};
use sc_consensus::{import_queue::RuntimeOrigin, IncomingBlock};
use sp_consensus::BlockOrigin;
use sp_runtime::{
@@ -157,38 +157,6 @@ pub enum ImportResult<B: BlockT> {
JustificationImport(RuntimeOrigin, B::Hash, NumberFor<B>, Justifications),
}
/// Value polled from `ChainSync`
#[derive(Debug)]
pub enum PollResult<B: BlockT> {
Import(ImportResult<B>),
Announce(PollBlockAnnounceValidation<B::Header>),
}
/// Result of [`ChainSync::poll_block_announce_validation`].
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum PollBlockAnnounceValidation<H> {
/// The announcement failed at validation.
///
/// The peer reputation should be decreased.
Failure {
/// Who sent the processed block announcement?
who: PeerId,
/// Should the peer be disconnected?
disconnect: bool,
},
/// The announcement does not require further handling.
Nothing {
/// Who sent the processed block announcement?
who: PeerId,
/// Was this their new best block?
is_best: bool,
/// The announcement.
announce: BlockAnnounce<H>,
},
/// The block announcement should be skipped.
Skip,
}
/// Sync operation mode.
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub enum SyncMode {
@@ -408,29 +376,14 @@ pub trait ChainSync<Block: BlockT>: Send {
/// Notify about finalization of the given block.
fn on_block_finalized(&mut self, hash: &Block::Hash, number: NumberFor<Block>);
/// Push a block announce validation.
///
/// It is required that [`ChainSync::poll_block_announce_validation`] is called
/// to check for finished block announce validations.
fn push_block_announce_validation(
/// Notify about pre-validated block announcement.
fn on_validated_block_announce(
&mut self,
who: PeerId,
hash: Block::Hash,
announce: BlockAnnounce<Block::Header>,
is_best: bool,
who: PeerId,
announce: &BlockAnnounce<Block::Header>,
);
/// Poll block announce validation.
///
/// Block announce validations can be pushed by using
/// [`ChainSync::push_block_announce_validation`].
///
/// This should be polled until it returns [`Poll::Pending`].
fn poll_block_announce_validation(
&mut self,
cx: &mut std::task::Context<'_>,
) -> Poll<PollBlockAnnounceValidation<Block::Header>>;
/// Call when a peer has disconnected.
/// Canceled obsolete block request may result in some blocks being ready for
/// import, so this functions checks for such blocks and returns them.
@@ -447,14 +400,7 @@ pub trait ChainSync<Block: BlockT>: Send {
) -> Result<Vec<BlockData<Block>>, String>;
/// Advance the state of `ChainSync`
///
/// Internally calls [`ChainSync::poll_block_announce_validation()`] and
/// this function should be polled until it returns [`Poll::Pending`] to
/// consume all pending events.
fn poll(
&mut self,
cx: &mut std::task::Context,
) -> Poll<PollBlockAnnounceValidation<Block::Header>>;
fn poll(&mut self, cx: &mut std::task::Context) -> Poll<()>;
/// Send block request to peer
fn send_block_request(&mut self, who: PeerId, request: BlockRequest<Block>);
@@ -0,0 +1,405 @@
// This file is part of Substrate.
// Copyright (C) Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//! `BlockAnnounceValidator` is responsible for async validation of block announcements.
use crate::futures_stream::FuturesStream;
use futures::{Future, FutureExt, Stream, StreamExt};
use libp2p::PeerId;
use log::{debug, error, trace, warn};
use sc_network_common::sync::message::BlockAnnounce;
use sp_consensus::block_validation::Validation;
use sp_runtime::traits::{Block as BlockT, Header, Zero};
use std::{
collections::{hash_map::Entry, HashMap},
default::Default,
pin::Pin,
task::{Context, Poll},
};
/// Log target for this file.
const LOG_TARGET: &str = "sync";
/// Maximum number of concurrent block announce validations.
///
/// If the queue reaches the maximum, we drop any new block
/// announcements.
const MAX_CONCURRENT_BLOCK_ANNOUNCE_VALIDATIONS: usize = 256;
/// Maximum number of concurrent block announce validations per peer.
///
/// See [`MAX_CONCURRENT_BLOCK_ANNOUNCE_VALIDATIONS`] for more information.
const MAX_CONCURRENT_BLOCK_ANNOUNCE_VALIDATIONS_PER_PEER: usize = 4;
/// Item that yields [`Stream`] implementation of [`BlockAnnounceValidator`].
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) enum BlockAnnounceValidationResult<H> {
/// The announcement failed at validation.
///
/// The peer reputation should be decreased.
Failure {
/// The id of the peer that send us the announcement.
peer_id: PeerId,
/// Should the peer be disconnected?
disconnect: bool,
},
/// The announcement was validated successfully and should be passed to [`crate::ChainSync`].
Process {
/// The id of the peer that send us the announcement.
peer_id: PeerId,
/// Was this their new best block?
is_new_best: bool,
/// The announcement.
announce: BlockAnnounce<H>,
},
/// The block announcement should be skipped.
Skip {
/// The id of the peer that send us the announcement.
peer_id: PeerId,
},
}
impl<H> BlockAnnounceValidationResult<H> {
fn peer_id(&self) -> &PeerId {
match self {
BlockAnnounceValidationResult::Failure { peer_id, .. } |
BlockAnnounceValidationResult::Process { peer_id, .. } |
BlockAnnounceValidationResult::Skip { peer_id } => peer_id,
}
}
}
/// Result of [`BlockAnnounceValidator::allocate_slot_for_block_announce_validation`].
enum AllocateSlotForBlockAnnounceValidation {
/// Success, there is a slot for the block announce validation.
Allocated,
/// We reached the total maximum number of validation slots.
TotalMaximumSlotsReached,
/// We reached the maximum number of validation slots for the given peer.
MaximumPeerSlotsReached,
}
pub(crate) struct BlockAnnounceValidator<B: BlockT> {
/// A type to check incoming block announcements.
validator: Box<dyn sp_consensus::block_validation::BlockAnnounceValidator<B> + Send>,
/// All block announcements that are currently being validated.
validations: FuturesStream<
Pin<Box<dyn Future<Output = BlockAnnounceValidationResult<B::Header>> + Send>>,
>,
/// Number of concurrent block announce validations per peer.
validations_per_peer: HashMap<PeerId, usize>,
}
impl<B: BlockT> BlockAnnounceValidator<B> {
pub(crate) fn new(
validator: Box<dyn sp_consensus::block_validation::BlockAnnounceValidator<B> + Send>,
) -> Self {
Self {
validator,
validations: Default::default(),
validations_per_peer: Default::default(),
}
}
/// Push a block announce validation.
pub(crate) fn push_block_announce_validation(
&mut self,
peer_id: PeerId,
hash: B::Hash,
announce: BlockAnnounce<B::Header>,
is_best: bool,
) {
let header = &announce.header;
let number = *header.number();
debug!(
target: LOG_TARGET,
"Pre-validating received block announcement {:?} with number {:?} from {}",
hash,
number,
peer_id,
);
if number.is_zero() {
warn!(
target: LOG_TARGET,
"💔 Ignored genesis block (#0) announcement from {}: {}",
peer_id,
hash,
);
return
}
// Try to allocate a slot for this block announce validation.
match self.allocate_slot_for_block_announce_validation(&peer_id) {
AllocateSlotForBlockAnnounceValidation::Allocated => {},
AllocateSlotForBlockAnnounceValidation::TotalMaximumSlotsReached => {
warn!(
target: LOG_TARGET,
"💔 Ignored block (#{} -- {}) announcement from {} because all validation slots are occupied.",
number,
hash,
peer_id,
);
return
},
AllocateSlotForBlockAnnounceValidation::MaximumPeerSlotsReached => {
warn!(
target: LOG_TARGET,
"💔 Ignored block (#{} -- {}) announcement from {} because all validation slots for this peer are occupied.",
number,
hash,
peer_id,
);
return
},
}
// Let external validator check the block announcement.
let assoc_data = announce.data.as_ref().map_or(&[][..], |v| v.as_slice());
let future = self.validator.validate(header, assoc_data);
self.validations.push(
async move {
match future.await {
Ok(Validation::Success { is_new_best }) => {
let is_new_best = is_new_best || is_best;
trace!(
target: LOG_TARGET,
"Block announcement validated successfully: from {}: {:?}. Local best: {}.",
peer_id,
announce.summary(),
is_new_best,
);
BlockAnnounceValidationResult::Process { is_new_best, announce, peer_id }
},
Ok(Validation::Failure { disconnect }) => {
debug!(
target: LOG_TARGET,
"Block announcement validation failed: from {}, block {:?}. Disconnect: {}.",
peer_id,
hash,
disconnect,
);
BlockAnnounceValidationResult::Failure { peer_id, disconnect }
},
Err(e) => {
debug!(
target: LOG_TARGET,
"💔 Ignoring block announcement validation from {} of block {:?} due to internal error: {}.",
peer_id,
hash,
e,
);
BlockAnnounceValidationResult::Skip { peer_id }
},
}
}
.boxed(),
);
}
/// Checks if there is a slot for a block announce validation.
///
/// The total number and the number per peer of concurrent block announce validations
/// is capped.
///
/// Returns [`AllocateSlotForBlockAnnounceValidation`] to inform about the result.
///
/// # Note
///
/// It is *required* to call [`Self::deallocate_slot_for_block_announce_validation`] when the
/// validation is finished to clear the slot.
fn allocate_slot_for_block_announce_validation(
&mut self,
peer_id: &PeerId,
) -> AllocateSlotForBlockAnnounceValidation {
if self.validations.len() >= MAX_CONCURRENT_BLOCK_ANNOUNCE_VALIDATIONS {
return AllocateSlotForBlockAnnounceValidation::TotalMaximumSlotsReached
}
match self.validations_per_peer.entry(*peer_id) {
Entry::Vacant(entry) => {
entry.insert(1);
AllocateSlotForBlockAnnounceValidation::Allocated
},
Entry::Occupied(mut entry) => {
if *entry.get() < MAX_CONCURRENT_BLOCK_ANNOUNCE_VALIDATIONS_PER_PEER {
*entry.get_mut() += 1;
AllocateSlotForBlockAnnounceValidation::Allocated
} else {
AllocateSlotForBlockAnnounceValidation::MaximumPeerSlotsReached
}
},
}
}
/// Should be called when a block announce validation is finished, to update the slots
/// of the peer that send the block announce.
fn deallocate_slot_for_block_announce_validation(&mut self, peer_id: &PeerId) {
match self.validations_per_peer.entry(*peer_id) {
Entry::Vacant(_) => {
error!(
target: LOG_TARGET,
"💔 Block announcement validation from peer {} finished for a slot that was not allocated!",
peer_id,
);
},
Entry::Occupied(mut entry) => match entry.get().checked_sub(1) {
Some(value) =>
if value == 0 {
entry.remove();
} else {
*entry.get_mut() = value;
},
None => {
entry.remove();
error!(
target: LOG_TARGET,
"Invalid (zero) block announce validation slot counter for peer {peer_id}.",
);
debug_assert!(
false,
"Invalid (zero) block announce validation slot counter for peer {peer_id}.",
);
},
},
}
}
}
impl<B: BlockT> Stream for BlockAnnounceValidator<B> {
type Item = BlockAnnounceValidationResult<B::Header>;
/// Poll for finished block announce validations. The stream never terminates.
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let validation = futures::ready!(self.validations.poll_next_unpin(cx))
.expect("`FuturesStream` never terminates; qed");
self.deallocate_slot_for_block_announce_validation(validation.peer_id());
Poll::Ready(Some(validation))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::block_announce_validator::AllocateSlotForBlockAnnounceValidation;
use libp2p::PeerId;
use sp_consensus::block_validation::DefaultBlockAnnounceValidator;
use substrate_test_runtime_client::runtime::Block;
#[test]
fn allocate_one_validation_slot() {
let mut validator =
BlockAnnounceValidator::<Block>::new(Box::new(DefaultBlockAnnounceValidator {}));
let peer_id = PeerId::random();
assert!(matches!(
validator.allocate_slot_for_block_announce_validation(&peer_id),
AllocateSlotForBlockAnnounceValidation::Allocated,
));
}
#[test]
fn allocate_validation_slots_for_two_peers() {
let mut validator =
BlockAnnounceValidator::<Block>::new(Box::new(DefaultBlockAnnounceValidator {}));
let peer_id_1 = PeerId::random();
let peer_id_2 = PeerId::random();
assert!(matches!(
validator.allocate_slot_for_block_announce_validation(&peer_id_1),
AllocateSlotForBlockAnnounceValidation::Allocated,
));
assert!(matches!(
validator.allocate_slot_for_block_announce_validation(&peer_id_2),
AllocateSlotForBlockAnnounceValidation::Allocated,
));
}
#[test]
fn maximum_validation_slots_per_peer() {
let mut validator =
BlockAnnounceValidator::<Block>::new(Box::new(DefaultBlockAnnounceValidator {}));
let peer_id = PeerId::random();
for _ in 0..MAX_CONCURRENT_BLOCK_ANNOUNCE_VALIDATIONS_PER_PEER {
assert!(matches!(
validator.allocate_slot_for_block_announce_validation(&peer_id),
AllocateSlotForBlockAnnounceValidation::Allocated,
));
}
assert!(matches!(
validator.allocate_slot_for_block_announce_validation(&peer_id),
AllocateSlotForBlockAnnounceValidation::MaximumPeerSlotsReached,
));
}
#[test]
fn validation_slots_per_peer_deallocated() {
let mut validator =
BlockAnnounceValidator::<Block>::new(Box::new(DefaultBlockAnnounceValidator {}));
let peer_id = PeerId::random();
for _ in 0..MAX_CONCURRENT_BLOCK_ANNOUNCE_VALIDATIONS_PER_PEER {
assert!(matches!(
validator.allocate_slot_for_block_announce_validation(&peer_id),
AllocateSlotForBlockAnnounceValidation::Allocated,
));
}
assert!(matches!(
validator.allocate_slot_for_block_announce_validation(&peer_id),
AllocateSlotForBlockAnnounceValidation::MaximumPeerSlotsReached,
));
validator.deallocate_slot_for_block_announce_validation(&peer_id);
assert!(matches!(
validator.allocate_slot_for_block_announce_validation(&peer_id),
AllocateSlotForBlockAnnounceValidation::Allocated,
));
}
#[test]
fn maximum_validation_slots_for_all_peers() {
let mut validator =
BlockAnnounceValidator::<Block>::new(Box::new(DefaultBlockAnnounceValidator {}));
for _ in 0..MAX_CONCURRENT_BLOCK_ANNOUNCE_VALIDATIONS {
validator.validations.push(
futures::future::ready(BlockAnnounceValidationResult::Skip {
peer_id: PeerId::random(),
})
.boxed(),
);
}
let peer_id = PeerId::random();
assert!(matches!(
validator.allocate_slot_for_block_announce_validation(&peer_id),
AllocateSlotForBlockAnnounceValidation::TotalMaximumSlotsReached,
));
}
}
+76 -77
View File
@@ -20,6 +20,9 @@
//! to tip and keep the blockchain up to date with network updates.
use crate::{
block_announce_validator::{
BlockAnnounceValidationResult, BlockAnnounceValidator as BlockAnnounceValidatorStream,
},
service::{self, chain_sync::ToServiceCommand},
ChainSync, ClientError, SyncingService,
};
@@ -45,7 +48,7 @@ use sc_network_common::{
sync::{
message::{BlockAnnounce, BlockAnnouncesHandshake, BlockState},
warp::WarpSyncParams,
BadPeer, ChainSync as ChainSyncT, ExtendedPeerInfo, PollBlockAnnounceValidation, SyncEvent,
BadPeer, ChainSync as ChainSyncT, ExtendedPeerInfo, SyncEvent,
},
};
use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
@@ -239,6 +242,9 @@ pub struct SyncingEngine<B: BlockT, Client> {
/// Number of inbound peers accepted so far.
num_in_peers: usize,
/// Async processor of block announce validations.
block_announce_validator: BlockAnnounceValidatorStream<B>,
/// A cache for the data that was associated to a block announcement.
block_announce_data_cache: LruMap<B::Hash, Vec<u8>>,
@@ -352,7 +358,6 @@ where
protocol_id,
fork_id,
roles,
block_announce_validator,
max_parallel_downloads,
max_blocks_per_request,
warp_sync_params,
@@ -389,6 +394,9 @@ where
peers: HashMap::new(),
block_announce_data_cache: LruMap::new(ByLength::new(cache_capacity)),
block_announce_protocol_name,
block_announce_validator: BlockAnnounceValidatorStream::new(
block_announce_validator,
),
num_connected: num_connected.clone(),
is_major_syncing: is_major_syncing.clone(),
service_rx,
@@ -453,9 +461,9 @@ where
}
}
fn update_peer_info(&mut self, who: &PeerId) {
if let Some(info) = self.chain_sync.peer_info(who) {
if let Some(ref mut peer) = self.peers.get_mut(who) {
fn update_peer_info(&mut self, peer_id: &PeerId) {
if let Some(info) = self.chain_sync.peer_info(peer_id) {
if let Some(ref mut peer) = self.peers.get_mut(peer_id) {
peer.info.best_hash = info.best_hash;
peer.info.best_number = info.best_number;
}
@@ -463,14 +471,16 @@ where
}
/// Process the result of the block announce validation.
pub fn process_block_announce_validation_result(
fn process_block_announce_validation_result(
&mut self,
validation_result: PollBlockAnnounceValidation<B::Header>,
validation_result: BlockAnnounceValidationResult<B::Header>,
) {
match validation_result {
PollBlockAnnounceValidation::Skip => {},
PollBlockAnnounceValidation::Nothing { is_best: _, who, announce } => {
self.update_peer_info(&who);
BlockAnnounceValidationResult::Skip { peer_id: _ } => {},
BlockAnnounceValidationResult::Process { is_new_best, peer_id, announce } => {
self.chain_sync.on_validated_block_announce(is_new_best, peer_id, &announce);
self.update_peer_info(&peer_id);
if let Some(data) = announce.data {
if !data.is_empty() {
@@ -478,41 +488,29 @@ where
}
}
},
PollBlockAnnounceValidation::Failure { who, disconnect } => {
BlockAnnounceValidationResult::Failure { peer_id, disconnect } => {
if disconnect {
self.network_service
.disconnect_peer(who, self.block_announce_protocol_name.clone());
.disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
}
self.network_service.report_peer(who, rep::BAD_BLOCK_ANNOUNCEMENT);
self.network_service.report_peer(peer_id, rep::BAD_BLOCK_ANNOUNCEMENT);
},
}
}
/// Push a block announce validation.
///
/// It is required that [`ChainSync::poll_block_announce_validation`] is
/// called later to check for finished validations. The result of the validation
/// needs to be passed to [`SyncingEngine::process_block_announce_validation_result`]
/// to finish the processing.
///
/// # Note
///
/// This will internally create a future, but this future will not be registered
/// in the task before being polled once. So, it is required to call
/// [`ChainSync::poll_block_announce_validation`] to ensure that the future is
/// registered properly and will wake up the task when being ready.
pub fn push_block_announce_validation(
&mut self,
who: PeerId,
peer_id: PeerId,
announce: BlockAnnounce<B::Header>,
) {
let hash = announce.header.hash();
let peer = match self.peers.get_mut(&who) {
let peer = match self.peers.get_mut(&peer_id) {
Some(p) => p,
None => {
log::error!(target: "sync", "Received block announce from disconnected peer {}", who);
log::error!(target: "sync", "Received block announce from disconnected peer {}", peer_id);
debug_assert!(false);
return
},
@@ -525,7 +523,8 @@ where
BlockState::Normal => false,
};
self.chain_sync.push_block_announce_validation(who, hash, announce, is_best);
self.block_announce_validator
.push_block_announce_validation(peer_id, hash, announce, is_best);
}
}
@@ -558,10 +557,10 @@ where
.or_else(|| self.block_announce_data_cache.get(&hash).cloned())
.unwrap_or_default();
for (who, ref mut peer) in self.peers.iter_mut() {
for (peer_id, ref mut peer) in self.peers.iter_mut() {
let inserted = peer.known_blocks.insert(hash);
if inserted {
log::trace!(target: "sync", "Announcing block {:?} to {}", hash, who);
log::trace!(target: "sync", "Announcing block {:?} to {}", hash, peer_id);
let message = BlockAnnounce {
header: header.clone(),
state: if is_best { Some(BlockState::Best) } else { Some(BlockState::Normal) },
@@ -656,14 +655,14 @@ where
}
}
},
ToServiceCommand::JustificationImported(peer, hash, number, success) => {
ToServiceCommand::JustificationImported(peer_id, hash, number, success) => {
self.chain_sync.on_justification_import(hash, number, success);
if !success {
log::info!(target: "sync", "💔 Invalid justification provided by {} for #{}", peer, hash);
log::info!(target: "sync", "💔 Invalid justification provided by {} for #{}", peer_id, hash);
self.network_service
.disconnect_peer(peer, self.block_announce_protocol_name.clone());
.disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
self.network_service.report_peer(
peer,
peer_id,
ReputationChange::new_fatal("Invalid justification"),
);
}
@@ -698,8 +697,11 @@ where
let _ = tx.send(self.chain_sync.num_sync_requests());
},
ToServiceCommand::PeersInfo(tx) => {
let peers_info =
self.peers.iter().map(|(id, peer)| (*id, peer.info.clone())).collect();
let peers_info = self
.peers
.iter()
.map(|(peer_id, peer)| (*peer_id, peer.info.clone()))
.collect();
let _ = tx.send(peers_info);
},
ToServiceCommand::OnBlockFinalized(hash, header) =>
@@ -742,14 +744,6 @@ where
if let Ok(announce) = BlockAnnounce::decode(&mut message.as_ref()) {
self.last_notification_io = Instant::now();
self.push_block_announce_validation(remote, announce);
// Make sure that the newly added block announce validation future
// was polled once to be registered in the task.
if let Poll::Ready(res) =
self.chain_sync.poll_block_announce_validation(cx)
{
self.process_block_announce_validation_result(res)
}
} else {
log::warn!(target: "sub-libp2p", "Failed to decode block announce");
}
@@ -770,10 +764,14 @@ where
}
}
// poll `ChainSync` last because of a block announcement was received through the
// event stream between `SyncingEngine` and `Protocol` and the validation finished
// right after it as queued, the resulting block request (if any) can be sent right away.
while let Poll::Ready(result) = self.chain_sync.poll(cx) {
// Drive `ChainSync`.
while let Poll::Ready(()) = self.chain_sync.poll(cx) {}
// Poll block announce validations last, because if a block announcement was received
// through the event stream between `SyncingEngine` and `Protocol` and the validation
// finished right after it is queued, the resulting block request (if any) can be sent
// right away.
while let Poll::Ready(Some(result)) = self.block_announce_validator.poll_next_unpin(cx) {
self.process_block_announce_validation_result(result);
}
@@ -783,15 +781,15 @@ where
/// Called by peer when it is disconnecting.
///
/// Returns a result if the handshake of this peer was indeed accepted.
pub fn on_sync_peer_disconnected(&mut self, peer: PeerId) -> Result<(), ()> {
if let Some(info) = self.peers.remove(&peer) {
if self.important_peers.contains(&peer) {
log::warn!(target: "sync", "Reserved peer {} disconnected", peer);
pub fn on_sync_peer_disconnected(&mut self, peer_id: PeerId) -> Result<(), ()> {
if let Some(info) = self.peers.remove(&peer_id) {
if self.important_peers.contains(&peer_id) {
log::warn!(target: "sync", "Reserved peer {} disconnected", peer_id);
} else {
log::debug!(target: "sync", "{} disconnected", peer);
log::debug!(target: "sync", "{} disconnected", peer_id);
}
if !self.default_peers_set_no_slot_connected_peers.remove(&peer) &&
if !self.default_peers_set_no_slot_connected_peers.remove(&peer_id) &&
info.inbound && info.info.roles.is_full()
{
match self.num_in_peers.checked_sub(1) {
@@ -808,9 +806,10 @@ where
}
}
self.chain_sync.peer_disconnected(&peer);
self.event_streams
.retain(|stream| stream.unbounded_send(SyncEvent::PeerDisconnected(peer)).is_ok());
self.chain_sync.peer_disconnected(&peer_id);
self.event_streams.retain(|stream| {
stream.unbounded_send(SyncEvent::PeerDisconnected(peer_id)).is_ok()
});
Ok(())
} else {
Err(())
@@ -824,35 +823,35 @@ where
/// from.
pub fn on_sync_peer_connected(
&mut self,
who: PeerId,
peer_id: PeerId,
status: &BlockAnnouncesHandshake<B>,
sink: NotificationsSink,
inbound: bool,
) -> Result<(), ()> {
log::trace!(target: "sync", "New peer {} {:?}", who, status);
log::trace!(target: "sync", "New peer {} {:?}", peer_id, status);
if self.peers.contains_key(&who) {
log::error!(target: "sync", "Called on_sync_peer_connected with already connected peer {}", who);
if self.peers.contains_key(&peer_id) {
log::error!(target: "sync", "Called on_sync_peer_connected with already connected peer {}", peer_id);
debug_assert!(false);
return Err(())
}
if status.genesis_hash != self.genesis_hash {
self.network_service.report_peer(who, rep::GENESIS_MISMATCH);
self.network_service.report_peer(peer_id, rep::GENESIS_MISMATCH);
if self.important_peers.contains(&who) {
if self.important_peers.contains(&peer_id) {
log::error!(
target: "sync",
"Reserved peer id `{}` is on a different chain (our genesis: {} theirs: {})",
who,
peer_id,
self.genesis_hash,
status.genesis_hash,
);
} else if self.boot_node_ids.contains(&who) {
} else if self.boot_node_ids.contains(&peer_id) {
log::error!(
target: "sync",
"Bootnode with peer id `{}` is on a different chain (our genesis: {} theirs: {})",
who,
peer_id,
self.genesis_hash,
status.genesis_hash,
);
@@ -867,7 +866,7 @@ where
return Err(())
}
let no_slot_peer = self.default_peers_set_no_slot_peers.contains(&who);
let no_slot_peer = self.default_peers_set_no_slot_peers.contains(&peer_id);
let this_peer_reserved_slot: usize = if no_slot_peer { 1 } else { 0 };
// make sure to accept no more than `--in-peers` many full nodes
@@ -875,7 +874,7 @@ where
status.roles.is_full() &&
inbound && self.num_in_peers == self.max_in_peers
{
log::debug!(target: "sync", "All inbound slots have been consumed, rejecting {who}");
log::debug!(target: "sync", "All inbound slots have been consumed, rejecting {peer_id}");
return Err(())
}
@@ -885,7 +884,7 @@ where
self.default_peers_set_no_slot_connected_peers.len() +
this_peer_reserved_slot
{
log::debug!(target: "sync", "Too many full nodes, rejecting {}", who);
log::debug!(target: "sync", "Too many full nodes, rejecting {}", peer_id);
return Err(())
}
@@ -893,7 +892,7 @@ where
(self.peers.len() - self.chain_sync.num_peers()) >= self.default_peers_set_num_light
{
// Make sure that not all slots are occupied by light clients.
log::debug!(target: "sync", "Too many light nodes, rejecting {}", who);
log::debug!(target: "sync", "Too many light nodes, rejecting {}", peer_id);
return Err(())
}
@@ -911,7 +910,7 @@ where
};
let req = if peer.info.roles.is_full() {
match self.chain_sync.new_peer(who, peer.info.best_hash, peer.info.best_number) {
match self.chain_sync.new_peer(peer_id, peer.info.best_hash, peer.info.best_number) {
Ok(req) => req,
Err(BadPeer(id, repu)) => {
self.network_service.report_peer(id, repu);
@@ -922,22 +921,22 @@ where
None
};
log::debug!(target: "sync", "Connected {}", who);
log::debug!(target: "sync", "Connected {}", peer_id);
self.peers.insert(who, peer);
self.peers.insert(peer_id, peer);
if no_slot_peer {
self.default_peers_set_no_slot_connected_peers.insert(who);
self.default_peers_set_no_slot_connected_peers.insert(peer_id);
} else if inbound && status.roles.is_full() {
self.num_in_peers += 1;
}
if let Some(req) = req {
self.chain_sync.send_block_request(who, req);
self.chain_sync.send_block_request(peer_id, req);
}
self.event_streams
.retain(|stream| stream.unbounded_send(SyncEvent::PeerConnected(who)).is_ok());
.retain(|stream| stream.unbounded_send(SyncEvent::PeerConnected(peer_id)).is_ok());
Ok(())
}
@@ -0,0 +1,134 @@
// This file is part of Substrate.
// Copyright (C) Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//! A wrapper for [`FuturesUnordered`] that wakes the task up once a new future is pushed
//! for it to be polled automatically. It's [`Stream`] never terminates.
use futures::{stream::FuturesUnordered, Future, Stream, StreamExt};
use std::{
pin::Pin,
task::{Context, Poll, Waker},
};
/// Wrapper around [`FuturesUnordered`] that wakes a task up automatically.
pub struct FuturesStream<F> {
futures: FuturesUnordered<F>,
waker: Option<Waker>,
}
/// Surprizingly, `#[derive(Default)]` doesn't work on [`FuturesStream`].
impl<F> Default for FuturesStream<F> {
fn default() -> FuturesStream<F> {
FuturesStream { futures: Default::default(), waker: None }
}
}
impl<F> FuturesStream<F> {
/// Push a future for processing.
pub fn push(&mut self, future: F) {
self.futures.push(future);
if let Some(waker) = self.waker.take() {
waker.wake();
}
}
/// The number of futures in the stream.
pub fn len(&self) -> usize {
self.futures.len()
}
}
impl<F: Future> Stream for FuturesStream<F> {
type Item = <F as Future>::Output;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let Poll::Ready(Some(result)) = self.futures.poll_next_unpin(cx) else {
self.waker = Some(cx.waker().clone());
return Poll::Pending
};
Poll::Ready(Some(result))
}
}
#[cfg(test)]
mod tests {
use super::*;
use futures::future::{BoxFuture, FutureExt};
/// [`Stream`] implementation for [`FuturesStream`] relies on the undocumented
/// feature that [`FuturesUnordered`] can be polled and repeatedly yield
/// `Poll::Ready(None)` before any futures are added into it.
#[tokio::test]
async fn empty_futures_unordered_can_be_polled() {
let mut unordered = FuturesUnordered::<BoxFuture<()>>::default();
futures::future::poll_fn(|cx| {
assert_eq!(unordered.poll_next_unpin(cx), Poll::Ready(None));
assert_eq!(unordered.poll_next_unpin(cx), Poll::Ready(None));
Poll::Ready(())
})
.await;
}
/// [`Stream`] implementation for [`FuturesStream`] relies on the undocumented
/// feature that [`FuturesUnordered`] can be polled and repeatedly yield
/// `Poll::Ready(None)` after all the futures in it have resolved.
#[tokio::test]
async fn deplenished_futures_unordered_can_be_polled() {
let mut unordered = FuturesUnordered::<BoxFuture<()>>::default();
unordered.push(futures::future::ready(()).boxed());
assert_eq!(unordered.next().await, Some(()));
futures::future::poll_fn(|cx| {
assert_eq!(unordered.poll_next_unpin(cx), Poll::Ready(None));
assert_eq!(unordered.poll_next_unpin(cx), Poll::Ready(None));
Poll::Ready(())
})
.await;
}
#[tokio::test]
async fn empty_futures_stream_yields_pending() {
let mut stream = FuturesStream::<BoxFuture<()>>::default();
futures::future::poll_fn(|cx| {
assert_eq!(stream.poll_next_unpin(cx), Poll::Pending);
Poll::Ready(())
})
.await;
}
#[tokio::test]
async fn futures_stream_resolves_futures_and_yields_pending() {
let mut stream = FuturesStream::default();
stream.push(futures::future::ready(17));
futures::future::poll_fn(|cx| {
assert_eq!(stream.poll_next_unpin(cx), Poll::Ready(Some(17)));
assert_eq!(stream.poll_next_unpin(cx), Poll::Pending);
Poll::Ready(())
})
.await;
}
}
+94 -402
View File
@@ -37,9 +37,7 @@ use crate::{
use codec::{Decode, DecodeAll, Encode};
use extra_requests::ExtraRequests;
use futures::{
channel::oneshot, stream::FuturesUnordered, task::Poll, Future, FutureExt, StreamExt,
};
use futures::{channel::oneshot, task::Poll, Future, FutureExt};
use libp2p::{request_response::OutboundFailure, PeerId};
use log::{debug, error, info, trace, warn};
use prost::Message;
@@ -66,16 +64,12 @@ use sc_network_common::{
warp::{EncodedProof, WarpProofRequest, WarpSyncParams, WarpSyncPhase, WarpSyncProgress},
BadPeer, ChainSync as ChainSyncT, ImportResult, Metrics, OnBlockData, OnBlockJustification,
OnStateData, OpaqueBlockRequest, OpaqueBlockResponse, OpaqueStateRequest,
OpaqueStateResponse, PeerInfo, PeerRequest, PollBlockAnnounceValidation, SyncMode,
SyncState, SyncStatus,
OpaqueStateResponse, PeerInfo, PeerRequest, SyncMode, SyncState, SyncStatus,
},
};
use sp_arithmetic::traits::Saturating;
use sp_blockchain::{Error as ClientError, HeaderBackend, HeaderMetadata};
use sp_consensus::{
block_validation::{BlockAnnounceValidator, Validation},
BlockOrigin, BlockStatus,
};
use sp_consensus::{BlockOrigin, BlockStatus};
use sp_runtime::{
traits::{
Block as BlockT, CheckedSub, Hash, HashingFor, Header as HeaderT, NumberFor, One,
@@ -85,7 +79,7 @@ use sp_runtime::{
};
use std::{
collections::{hash_map::Entry, HashMap, HashSet},
collections::{HashMap, HashSet},
iter,
ops::Range,
pin::Pin,
@@ -94,7 +88,9 @@ use std::{
pub use service::chain_sync::SyncingService;
mod block_announce_validator;
mod extra_requests;
mod futures_stream;
mod schema;
pub mod block_request_handler;
@@ -117,17 +113,6 @@ const MAX_DOWNLOAD_AHEAD: u32 = 2048;
/// common block of a node.
const MAX_BLOCKS_TO_LOOK_BACKWARDS: u32 = MAX_DOWNLOAD_AHEAD / 2;
/// Maximum number of concurrent block announce validations.
///
/// If the queue reaches the maximum, we drop any new block
/// announcements.
const MAX_CONCURRENT_BLOCK_ANNOUNCE_VALIDATIONS: usize = 256;
/// Maximum number of concurrent block announce validations per peer.
///
/// See [`MAX_CONCURRENT_BLOCK_ANNOUNCE_VALIDATIONS`] for more information.
const MAX_CONCURRENT_BLOCK_ANNOUNCE_VALIDATIONS_PER_PEER: usize = 4;
/// Pick the state to sync as the latest finalized number minus this.
const STATE_SYNC_FINALITY_THRESHOLD: u32 = 8;
@@ -307,19 +292,12 @@ pub struct ChainSync<B: BlockT, Client> {
fork_targets: HashMap<B::Hash, ForkTarget<B>>,
/// A set of peers for which there might be potential block requests
allowed_requests: AllowedRequests,
/// A type to check incoming block announcements.
block_announce_validator: Box<dyn BlockAnnounceValidator<B> + Send>,
/// Maximum number of peers to ask the same blocks in parallel.
max_parallel_downloads: u32,
/// Maximum blocks per request.
max_blocks_per_request: u32,
/// Total number of downloaded blocks.
downloaded_blocks: usize,
/// All block announcement that are currently being validated.
block_announce_validation:
FuturesUnordered<Pin<Box<dyn Future<Output = PreValidateBlockAnnounce<B::Header>> + Send>>>,
/// Stats per peer about the number of concurrent block announce validations.
block_announce_validation_per_peer_stats: HashMap<PeerId, usize>,
/// State sync in progress, if any.
state_sync: Option<StateSync<B, Client>>,
/// Warp sync in progress, if any.
@@ -424,51 +402,6 @@ impl<B: BlockT> PeerSyncState<B> {
}
}
/// Result of [`ChainSync::block_announce_validation`].
#[derive(Debug, Clone, PartialEq, Eq)]
enum PreValidateBlockAnnounce<H> {
/// The announcement failed at validation.
///
/// The peer reputation should be decreased.
Failure {
/// Who sent the processed block announcement?
who: PeerId,
/// Should the peer be disconnected?
disconnect: bool,
},
/// The pre-validation was sucessful and the announcement should be
/// further processed.
Process {
/// Is this the new best block of the peer?
is_new_best: bool,
/// The id of the peer that send us the announcement.
who: PeerId,
/// The announcement.
announce: BlockAnnounce<H>,
},
/// The announcement validation returned an error.
///
/// An error means that *this* node failed to validate it because some internal error happened.
/// If the block announcement was invalid, [`Self::Failure`] is the correct variant to express
/// this.
Error { who: PeerId },
/// The block announcement should be skipped.
///
/// This should *only* be returned when there wasn't a slot registered
/// for this block announcement validation.
Skip,
}
/// Result of [`ChainSync::has_slot_for_block_announce_validation`].
enum HasSlotForBlockAnnounceValidation {
/// Yes, there is a slot for the block announce validation.
Yes,
/// We reached the total maximum number of validation slots.
TotalMaximumSlotsReached,
/// We reached the maximum number of validation slots for the given peer.
MaximumPeerSlotsReached,
}
impl<B, Client> ChainSyncT<B> for ChainSync<B, Client>
where
B: BlockT,
@@ -692,7 +625,7 @@ where
self.extra_justifications.reset();
}
// The implementation is similar to on_block_announce with unknown parent hash.
// The implementation is similar to `on_validated_block_announce` with unknown parent hash.
fn set_sync_fork_request(
&mut self,
mut peers: Vec<PeerId>,
@@ -1107,119 +1040,88 @@ where
}
}
fn push_block_announce_validation(
fn on_validated_block_announce(
&mut self,
who: PeerId,
hash: B::Hash,
announce: BlockAnnounce<B::Header>,
is_best: bool,
who: PeerId,
announce: &BlockAnnounce<B::Header>,
) {
let header = &announce.header;
let number = *header.number();
debug!(
target: "sync",
"Pre-validating received block announcement {:?} with number {:?} from {}",
hash,
number,
who,
);
let number = *announce.header.number();
let hash = announce.header.hash();
let parent_status =
self.block_status(announce.header.parent_hash()).unwrap_or(BlockStatus::Unknown);
let known_parent = parent_status != BlockStatus::Unknown;
let ancient_parent = parent_status == BlockStatus::InChainPruned;
if number.is_zero() {
self.block_announce_validation.push(
async move {
warn!(
target: "sync",
"💔 Ignored genesis block (#0) announcement from {}: {}",
who,
hash,
);
PreValidateBlockAnnounce::Skip
}
.boxed(),
let known = self.is_known(&hash);
let peer = if let Some(peer) = self.peers.get_mut(&who) {
peer
} else {
error!(target: "sync", "💔 Called `on_validated_block_announce` with a bad peer ID");
return
};
if let PeerSyncState::AncestorSearch { .. } = peer.state {
trace!(target: "sync", "Peer {} is in the ancestor search state.", who);
return
}
if is_best {
// update their best block
peer.best_number = number;
peer.best_hash = hash;
}
// If the announced block is the best they have and is not ahead of us, our common number
// is either one further ahead or it's the one they just announced, if we know about it.
if is_best {
if known && self.best_queued_number >= number {
self.update_peer_common_number(&who, number);
} else if announce.header.parent_hash() == &self.best_queued_hash ||
known_parent && self.best_queued_number >= number
{
self.update_peer_common_number(&who, number.saturating_sub(One::one()));
}
}
self.allowed_requests.add(&who);
// known block case
if known || self.is_already_downloading(&hash) {
trace!(target: "sync", "Known block announce from {}: {}", who, hash);
if let Some(target) = self.fork_targets.get_mut(&hash) {
target.peers.insert(who);
}
return
}
if ancient_parent {
trace!(
target: "sync",
"Ignored ancient block announced from {}: {} {:?}",
who,
hash,
announce.header,
);
return
}
// Check if there is a slot for this block announce validation.
match self.has_slot_for_block_announce_validation(&who) {
HasSlotForBlockAnnounceValidation::Yes => {},
HasSlotForBlockAnnounceValidation::TotalMaximumSlotsReached => {
self.block_announce_validation.push(
async move {
warn!(
target: "sync",
"💔 Ignored block (#{} -- {}) announcement from {} because all validation slots are occupied.",
number,
hash,
who,
);
PreValidateBlockAnnounce::Skip
}
.boxed(),
);
return
},
HasSlotForBlockAnnounceValidation::MaximumPeerSlotsReached => {
self.block_announce_validation.push(async move {
warn!(
target: "sync",
"💔 Ignored block (#{} -- {}) announcement from {} because all validation slots for this peer are occupied.",
number,
hash,
who,
);
PreValidateBlockAnnounce::Skip
}.boxed());
return
},
}
// Let external validator check the block announcement.
let assoc_data = announce.data.as_ref().map_or(&[][..], |v| v.as_slice());
let future = self.block_announce_validator.validate(header, assoc_data);
self.block_announce_validation.push(
async move {
match future.await {
Ok(Validation::Success { is_new_best }) => PreValidateBlockAnnounce::Process {
is_new_best: is_new_best || is_best,
announce,
who,
},
Ok(Validation::Failure { disconnect }) => {
debug!(
target: "sync",
"Block announcement validation of block {:?} from {} failed",
hash,
who,
);
PreValidateBlockAnnounce::Failure { who, disconnect }
},
Err(e) => {
debug!(
target: "sync",
"💔 Block announcement validation of block {:?} errored: {}",
hash,
e,
);
PreValidateBlockAnnounce::Error { who }
},
}
}
.boxed(),
);
}
fn poll_block_announce_validation(
&mut self,
cx: &mut std::task::Context,
) -> Poll<PollBlockAnnounceValidation<B::Header>> {
match self.block_announce_validation.poll_next_unpin(cx) {
Poll::Ready(Some(res)) => {
self.peer_block_announce_validation_finished(&res);
Poll::Ready(self.finish_block_announce_validation(res))
},
_ => Poll::Pending,
if self.status().state == SyncState::Idle {
trace!(
target: "sync",
"Added sync target for block announced from {}: {} {:?}",
who,
hash,
announce.summary(),
);
self.fork_targets
.entry(hash)
.or_insert_with(|| ForkTarget {
number,
parent_hash: Some(*announce.header.parent_hash()),
peers: Default::default(),
})
.peers
.insert(who);
}
}
@@ -1319,10 +1221,7 @@ where
.map_err(|error: codec::Error| error.to_string())
}
fn poll(
&mut self,
cx: &mut std::task::Context,
) -> Poll<PollBlockAnnounceValidation<B::Header>> {
fn poll(&mut self, cx: &mut std::task::Context) -> Poll<()> {
// Should be called before `process_outbound_requests` to ensure
// that a potential target block is directly leading to requests.
if let Some(warp_sync) = &mut self.warp_sync {
@@ -1339,10 +1238,6 @@ where
}
}
if let Poll::Ready(announce) = self.poll_block_announce_validation(cx) {
return Poll::Ready(announce)
}
Poll::Pending
}
@@ -1395,7 +1290,6 @@ where
protocol_id: ProtocolId,
fork_id: &Option<String>,
roles: Roles,
block_announce_validator: Box<dyn BlockAnnounceValidator<B> + Send>,
max_parallel_downloads: u32,
max_blocks_per_request: u32,
warp_sync_params: Option<WarpSyncParams<B>>,
@@ -1430,12 +1324,9 @@ where
queue_blocks: Default::default(),
fork_targets: Default::default(),
allowed_requests: Default::default(),
block_announce_validator,
max_parallel_downloads,
max_blocks_per_request,
downloaded_blocks: 0,
block_announce_validation: Default::default(),
block_announce_validation_per_peer_stats: Default::default(),
state_sync: None,
warp_sync: None,
import_existing: false,
@@ -1586,186 +1477,6 @@ where
self.allowed_requests.set_all();
}
/// Checks if there is a slot for a block announce validation.
///
/// The total number and the number per peer of concurrent block announce validations
/// is capped.
///
/// Returns [`HasSlotForBlockAnnounceValidation`] to inform about the result.
///
/// # Note
///
/// It is *required* to call [`Self::peer_block_announce_validation_finished`] when the
/// validation is finished to clear the slot.
fn has_slot_for_block_announce_validation(
&mut self,
peer: &PeerId,
) -> HasSlotForBlockAnnounceValidation {
if self.block_announce_validation.len() >= MAX_CONCURRENT_BLOCK_ANNOUNCE_VALIDATIONS {
return HasSlotForBlockAnnounceValidation::TotalMaximumSlotsReached
}
match self.block_announce_validation_per_peer_stats.entry(*peer) {
Entry::Vacant(entry) => {
entry.insert(1);
HasSlotForBlockAnnounceValidation::Yes
},
Entry::Occupied(mut entry) => {
if *entry.get() < MAX_CONCURRENT_BLOCK_ANNOUNCE_VALIDATIONS_PER_PEER {
*entry.get_mut() += 1;
HasSlotForBlockAnnounceValidation::Yes
} else {
HasSlotForBlockAnnounceValidation::MaximumPeerSlotsReached
}
},
}
}
/// Should be called when a block announce validation is finished, to update the slots
/// of the peer that send the block announce.
fn peer_block_announce_validation_finished(
&mut self,
res: &PreValidateBlockAnnounce<B::Header>,
) {
let peer = match res {
PreValidateBlockAnnounce::Failure { who, .. } |
PreValidateBlockAnnounce::Process { who, .. } |
PreValidateBlockAnnounce::Error { who } => who,
PreValidateBlockAnnounce::Skip => return,
};
match self.block_announce_validation_per_peer_stats.entry(*peer) {
Entry::Vacant(_) => {
error!(
target: "sync",
"💔 Block announcement validation from peer {} finished for that no slot was allocated!",
peer,
);
},
Entry::Occupied(mut entry) => {
*entry.get_mut() = entry.get().saturating_sub(1);
if *entry.get() == 0 {
entry.remove();
}
},
}
}
/// This will finish processing of the block announcement.
fn finish_block_announce_validation(
&mut self,
pre_validation_result: PreValidateBlockAnnounce<B::Header>,
) -> PollBlockAnnounceValidation<B::Header> {
let (announce, is_best, who) = match pre_validation_result {
PreValidateBlockAnnounce::Failure { who, disconnect } => {
debug!(
target: "sync",
"Failed announce validation: {:?}, disconnect: {}",
who,
disconnect,
);
return PollBlockAnnounceValidation::Failure { who, disconnect }
},
PreValidateBlockAnnounce::Process { announce, is_new_best, who } =>
(announce, is_new_best, who),
PreValidateBlockAnnounce::Error { .. } | PreValidateBlockAnnounce::Skip => {
debug!(
target: "sync",
"Ignored announce validation",
);
return PollBlockAnnounceValidation::Skip
},
};
trace!(
target: "sync",
"Finished block announce validation: from {:?}: {:?}. local_best={}",
who,
announce.summary(),
is_best,
);
let number = *announce.header.number();
let hash = announce.header.hash();
let parent_status =
self.block_status(announce.header.parent_hash()).unwrap_or(BlockStatus::Unknown);
let known_parent = parent_status != BlockStatus::Unknown;
let ancient_parent = parent_status == BlockStatus::InChainPruned;
let known = self.is_known(&hash);
let peer = if let Some(peer) = self.peers.get_mut(&who) {
peer
} else {
error!(target: "sync", "💔 Called on_block_announce with a bad peer ID");
return PollBlockAnnounceValidation::Nothing { is_best, who, announce }
};
if let PeerSyncState::AncestorSearch { .. } = peer.state {
trace!(target: "sync", "Peer state is ancestor search.");
return PollBlockAnnounceValidation::Nothing { is_best, who, announce }
}
if is_best {
// update their best block
peer.best_number = number;
peer.best_hash = hash;
}
// If the announced block is the best they have and is not ahead of us, our common number
// is either one further ahead or it's the one they just announced, if we know about it.
if is_best {
if known && self.best_queued_number >= number {
self.update_peer_common_number(&who, number);
} else if announce.header.parent_hash() == &self.best_queued_hash ||
known_parent && self.best_queued_number >= number
{
self.update_peer_common_number(&who, number - One::one());
}
}
self.allowed_requests.add(&who);
// known block case
if known || self.is_already_downloading(&hash) {
trace!(target: "sync", "Known block announce from {}: {}", who, hash);
if let Some(target) = self.fork_targets.get_mut(&hash) {
target.peers.insert(who);
}
return PollBlockAnnounceValidation::Nothing { is_best, who, announce }
}
if ancient_parent {
trace!(
target: "sync",
"Ignored ancient block announced from {}: {} {:?}",
who,
hash,
announce.header,
);
return PollBlockAnnounceValidation::Nothing { is_best, who, announce }
}
if self.status().state == SyncState::Idle {
trace!(
target: "sync",
"Added sync target for block announced from {}: {} {:?}",
who,
hash,
announce.summary(),
);
self.fork_targets
.entry(hash)
.or_insert_with(|| ForkTarget {
number,
parent_hash: Some(*announce.header.parent_hash()),
peers: Default::default(),
})
.peers
.insert(who);
}
PollBlockAnnounceValidation::Nothing { is_best, who, announce }
}
/// Restart the sync process. This will reset all pending block requests and return an iterator
/// of new block requests to make to peers. Peers that were downloading finality data (i.e.
/// their state was `DownloadingJustification`) are unaffected and will stay in the same state.
@@ -3162,14 +2873,13 @@ fn validate_blocks<Block: BlockT>(
mod test {
use super::*;
use crate::service::network::NetworkServiceProvider;
use futures::{executor::block_on, future::poll_fn};
use futures::executor::block_on;
use sc_block_builder::BlockBuilderProvider;
use sc_network_common::{
role::Role,
sync::message::{BlockData, BlockState, FromBlock},
sync::message::{BlockAnnounce, BlockData, BlockState, FromBlock},
};
use sp_blockchain::HeaderBackend;
use sp_consensus::block_validation::DefaultBlockAnnounceValidator;
use substrate_test_runtime_client::{
runtime::{Block, Hash, Header},
BlockBuilderExt, ClientBlockImportExt, ClientExt, DefaultTestClientBuilderExt, TestClient,
@@ -3183,7 +2893,6 @@ mod test {
// internally we should process the response as the justification not being available.
let client = Arc::new(TestClientBuilder::new().build());
let block_announce_validator = Box::new(DefaultBlockAnnounceValidator);
let peer_id = PeerId::random();
let import_queue = Box::new(sc_consensus::import_queue::mock::MockImportQueueHandle::new());
@@ -3195,7 +2904,6 @@ mod test {
ProtocolId::from("test-protocol-name"),
&Some(String::from("test-fork-id")),
Roles::from(&Role::Full),
block_announce_validator,
1,
64,
None,
@@ -3262,7 +2970,6 @@ mod test {
ProtocolId::from("test-protocol-name"),
&Some(String::from("test-fork-id")),
Roles::from(&Role::Full),
Box::new(DefaultBlockAnnounceValidator),
1,
64,
None,
@@ -3344,23 +3051,16 @@ mod test {
/// Send a block annoucnement for the given `header`.
fn send_block_announce(
header: Header,
peer_id: &PeerId,
peer_id: PeerId,
sync: &mut ChainSync<Block, TestClient>,
) {
let block_annnounce = BlockAnnounce {
let announce = BlockAnnounce {
header: header.clone(),
state: Some(BlockState::Best),
data: Some(Vec::new()),
};
sync.push_block_announce_validation(*peer_id, header.hash(), block_annnounce, true);
// Poll until we have procssed the block announcement
block_on(poll_fn(|cx| loop {
if sync.poll_block_announce_validation(cx).is_pending() {
break Poll::Ready(())
}
}))
sync.on_validated_block_announce(true, peer_id, &announce);
}
/// Create a block response from the given `blocks`.
@@ -3444,7 +3144,6 @@ mod test {
ProtocolId::from("test-protocol-name"),
&Some(String::from("test-fork-id")),
Roles::from(&Role::Full),
Box::new(DefaultBlockAnnounceValidator),
5,
64,
None,
@@ -3491,7 +3190,7 @@ mod test {
assert!(sync.block_requests().is_empty());
// Let peer2 announce a fork of block 3
send_block_announce(block3_fork.header().clone(), &peer_id2, &mut sync);
send_block_announce(block3_fork.header().clone(), peer_id2, &mut sync);
// Import and tell sync that we now have the fork.
block_on(client.import(BlockOrigin::Own, block3_fork.clone())).unwrap();
@@ -3500,13 +3199,13 @@ mod test {
let block4 = build_block_at(block3_fork.hash(), false);
// Let peer2 announce block 4 and check that sync wants to get the block.
send_block_announce(block4.header().clone(), &peer_id2, &mut sync);
send_block_announce(block4.header().clone(), peer_id2, &mut sync);
let request = get_block_request(&mut sync, FromBlock::Hash(block4.hash()), 2, &peer_id2);
// Peer1 announces the same block, but as the common block is still `1`, sync will request
// block 2 again.
send_block_announce(block4.header().clone(), &peer_id1, &mut sync);
send_block_announce(block4.header().clone(), peer_id1, &mut sync);
let request2 = get_block_request(&mut sync, FromBlock::Number(2), 1, &peer_id1);
@@ -3571,7 +3270,6 @@ mod test {
ProtocolId::from("test-protocol-name"),
&Some(String::from("test-fork-id")),
Roles::from(&Role::Full),
Box::new(DefaultBlockAnnounceValidator),
5,
64,
None,
@@ -3647,7 +3345,7 @@ mod test {
sync.queue_blocks.clear();
// Let peer2 announce that it finished syncing
send_block_announce(best_block.header().clone(), &peer_id2, &mut sync);
send_block_announce(best_block.header().clone(), peer_id2, &mut sync);
let (peer1_req, peer2_req) =
sync.block_requests().into_iter().fold((None, None), |res, req| {
@@ -3729,7 +3427,6 @@ mod test {
ProtocolId::from("test-protocol-name"),
&Some(String::from("test-fork-id")),
Roles::from(&Role::Full),
Box::new(DefaultBlockAnnounceValidator),
5,
64,
None,
@@ -3754,7 +3451,7 @@ mod test {
sync.new_peer(peer_id1, common_block.hash(), *common_block.header().number())
.unwrap();
send_block_announce(fork_blocks.last().unwrap().header().clone(), &peer_id1, &mut sync);
send_block_announce(fork_blocks.last().unwrap().header().clone(), peer_id1, &mut sync);
let mut request =
get_block_request(&mut sync, FromBlock::Number(info.best_number), 1, &peer_id1);
@@ -3872,7 +3569,6 @@ mod test {
ProtocolId::from("test-protocol-name"),
&Some(String::from("test-fork-id")),
Roles::from(&Role::Full),
Box::new(DefaultBlockAnnounceValidator),
5,
64,
None,
@@ -3897,7 +3593,7 @@ mod test {
sync.new_peer(peer_id1, common_block.hash(), *common_block.header().number())
.unwrap();
send_block_announce(fork_blocks.last().unwrap().header().clone(), &peer_id1, &mut sync);
send_block_announce(fork_blocks.last().unwrap().header().clone(), peer_id1, &mut sync);
let mut request =
get_block_request(&mut sync, FromBlock::Number(info.best_number), 1, &peer_id1);
@@ -4017,7 +3713,6 @@ mod test {
ProtocolId::from("test-protocol-name"),
&Some(String::from("test-fork-id")),
Roles::from(&Role::Full),
Box::new(DefaultBlockAnnounceValidator),
1,
64,
None,
@@ -4039,7 +3734,7 @@ mod test {
// Create a "new" header and announce it
let mut header = blocks[0].header().clone();
header.number = 4;
send_block_announce(header, &peer_id1, &mut sync);
send_block_announce(header, peer_id1, &mut sync);
assert!(sync.fork_targets.len() == 1);
sync.peer_disconnected(&peer_id1);
@@ -4063,7 +3758,6 @@ mod test {
ProtocolId::from("test-protocol-name"),
&Some(String::from("test-fork-id")),
Roles::from(&Role::Full),
Box::new(DefaultBlockAnnounceValidator),
1,
64,
None,
@@ -4107,7 +3801,6 @@ mod test {
#[test]
fn sync_restart_removes_block_but_not_justification_requests() {
let mut client = Arc::new(TestClientBuilder::new().build());
let block_announce_validator = Box::new(DefaultBlockAnnounceValidator);
let import_queue = Box::new(sc_consensus::import_queue::mock::MockImportQueueHandle::new());
let (_chain_sync_network_provider, chain_sync_network_handle) =
NetworkServiceProvider::new();
@@ -4117,7 +3810,6 @@ mod test {
ProtocolId::from("test-protocol-name"),
&Some(String::from("test-fork-id")),
Roles::from(&Role::Full),
block_announce_validator,
1,
64,
None,
+5 -10
View File
@@ -24,7 +24,7 @@ use libp2p::PeerId;
use sc_network_common::sync::{
message::{BlockAnnounce, BlockData, BlockRequest, BlockResponse},
BadPeer, ChainSync as ChainSyncT, Metrics, OnBlockData, OnBlockJustification,
OpaqueBlockResponse, PeerInfo, PollBlockAnnounceValidation, SyncStatus,
OpaqueBlockResponse, PeerInfo, SyncStatus,
};
use sp_runtime::traits::{Block as BlockT, NumberFor};
@@ -71,17 +71,12 @@ mockall::mock! {
success: bool,
);
fn on_block_finalized(&mut self, hash: &Block::Hash, number: NumberFor<Block>);
fn push_block_announce_validation(
fn on_validated_block_announce(
&mut self,
who: PeerId,
hash: Block::Hash,
announce: BlockAnnounce<Block::Header>,
is_best: bool,
who: PeerId,
announce: &BlockAnnounce<Block::Header>,
);
fn poll_block_announce_validation<'a>(
&mut self,
cx: &mut std::task::Context<'a>,
) -> Poll<PollBlockAnnounceValidation<Block::Header>>;
fn peer_disconnected(&mut self, who: &PeerId);
fn metrics(&self) -> Metrics;
fn block_response_into_blocks(
@@ -92,7 +87,7 @@ mockall::mock! {
fn poll<'a>(
&mut self,
cx: &mut std::task::Context<'a>,
) -> Poll<PollBlockAnnounceValidation<Block::Header>>;
) -> Poll<()>;
fn send_block_request(
&mut self,
who: PeerId,
+10
View File
@@ -123,6 +123,11 @@ impl<T> TracingUnboundedSender<T> {
s
})
}
/// The number of elements in the channel (proxy function to [`async_channel::Sender`]).
pub fn len(&self) -> usize {
self.inner.len()
}
}
impl<T> TracingUnboundedReceiver<T> {
@@ -139,6 +144,11 @@ impl<T> TracingUnboundedReceiver<T> {
s
})
}
/// The number of elements in the channel (proxy function to [`async_channel::Receiver`]).
pub fn len(&self) -> usize {
self.inner.len()
}
}
impl<T> Drop for TracingUnboundedReceiver<T> {