Sync: Fix issue of not freeing a block announcement slot (#8006)

* Sync: Fix issue of not freeing a block announcement slot

There was a bug that when the block announcement validation returned an
error, the slot reserved for this validation wasn't freed. This could
lead to a situation where we rejected any block announcement from such a
peer for that the block announcement returned an error multiple times.

* Better logging

* Fuck I'm dumb

* 🤦
This commit is contained in:
Bastian Köcher
2021-01-29 17:29:25 +01:00
committed by GitHub
parent 92cde30078
commit d40f9c166f
3 changed files with 87 additions and 11 deletions
+45 -11
View File
@@ -400,7 +400,18 @@ enum PreValidateBlockAnnounce<H> {
/// The announcement.
announce: BlockAnnounce<H>,
},
/// The announcement validation returned an error.
///
/// An error means that *this* node failed to validate it because some internal error happened.
/// If the block announcement was invalid, [`Self::Failure`] is the correct variant to express
/// this.
Error {
who: PeerId,
},
/// The block announcement should be skipped.
///
/// This should *only* be returned when there wasn't a slot registered
/// for this block announcement validation.
Skip,
}
@@ -1223,6 +1234,11 @@ impl<B: BlockT> ChainSync<B> {
/// is capped.
///
/// Returns [`HasSlotForBlockAnnounceValidation`] to inform about the result.
///
/// # Note
///
/// It is *required* to call [`Self::peer_block_announce_validation_finished`] when the
/// validation is finished to clear the slot.
fn has_slot_for_block_announce_validation(&mut self, peer: &PeerId) -> HasSlotForBlockAnnounceValidation {
if self.block_announce_validation.len() >= MAX_CONCURRENT_BLOCK_ANNOUNCE_VALIDATIONS {
return HasSlotForBlockAnnounceValidation::TotalMaximumSlotsReached
@@ -1324,15 +1340,20 @@ impl<B: BlockT> ChainSync<B> {
Ok(Validation::Failure { disconnect }) => {
debug!(
target: "sync",
"Block announcement validation of block {} from {} failed",
"Block announcement validation of block {:?} from {} failed",
hash,
who,
);
PreValidateBlockAnnounce::Failure { who, disconnect }
}
Err(e) => {
error!(target: "sync", "💔 Block announcement validation errored: {}", e);
PreValidateBlockAnnounce::Skip
error!(
target: "sync",
"💔 Block announcement validation of block {:?} errored: {}",
hash,
e,
);
PreValidateBlockAnnounce::Error { who }
}
}
}.boxed());
@@ -1352,14 +1373,27 @@ impl<B: BlockT> ChainSync<B> {
cx: &mut std::task::Context,
) -> Poll<PollBlockAnnounceValidation<B::Header>> {
match self.block_announce_validation.poll_next_unpin(cx) {
Poll::Ready(Some(res)) => Poll::Ready(self.finish_block_announce_validation(res)),
Poll::Ready(Some(res)) => {
self.peer_block_announce_validation_finished(&res);
Poll::Ready(self.finish_block_announce_validation(res))
},
_ => Poll::Pending,
}
}
/// Should be called when a block announce validation was finished, to update the stats
/// of the given peer.
fn peer_block_announce_validation_finished(&mut self, peer: &PeerId) {
/// Should be called when a block announce validation is finished, to update the slots
/// of the peer that send the block announce.
fn peer_block_announce_validation_finished(
&mut self,
res: &PreValidateBlockAnnounce<B::Header>,
) {
let peer = match res {
PreValidateBlockAnnounce::Failure { who, .. } |
PreValidateBlockAnnounce::Process { who, .. } |
PreValidateBlockAnnounce::Error { who } => who,
PreValidateBlockAnnounce::Skip => return,
};
match self.block_announce_validation_per_peer_stats.entry(peer.clone()) {
Entry::Vacant(_) => {
error!(
@@ -1369,7 +1403,8 @@ impl<B: BlockT> ChainSync<B> {
);
},
Entry::Occupied(mut entry) => {
if entry.get_mut().saturating_sub(1) == 0 {
*entry.get_mut() = entry.get().saturating_sub(1);
if *entry.get() == 0 {
entry.remove();
}
}
@@ -1389,14 +1424,13 @@ impl<B: BlockT> ChainSync<B> {
let (announce, is_best, who) = match pre_validation_result {
PreValidateBlockAnnounce::Failure { who, disconnect } => {
self.peer_block_announce_validation_finished(&who);
return PollBlockAnnounceValidation::Failure { who, disconnect }
},
PreValidateBlockAnnounce::Process { announce, is_new_best, who } => {
self.peer_block_announce_validation_finished(&who);
(announce, is_new_best, who)
},
PreValidateBlockAnnounce::Skip => return PollBlockAnnounceValidation::Skip,
PreValidateBlockAnnounce::Error { .. } | PreValidateBlockAnnounce::Skip =>
return PollBlockAnnounceValidation::Skip,
};
let number = *announce.header.number();
+38
View File
@@ -897,3 +897,41 @@ fn block_announce_data_is_propagated() {
net.block_until_idle();
}
}
#[test]
fn continue_to_sync_after_some_block_announcement_verifications_failed() {
struct TestBlockAnnounceValidator;
impl BlockAnnounceValidator<Block> for TestBlockAnnounceValidator {
fn validate(
&mut self,
header: &Header,
_: &[u8],
) -> Pin<Box<dyn Future<Output = Result<Validation, Box<dyn std::error::Error + Send>>> + Send>> {
let number = *header.number();
async move {
if number < 100 {
Err(Box::<dyn std::error::Error + Send + Sync>::from(String::from("error")) as Box<_>)
} else {
Ok(Validation::Success { is_new_best: false })
}
}.boxed()
}
}
sp_tracing::try_init_simple();
let mut net = TestNet::new(1);
net.add_full_peer_with_config(FullPeerConfig {
block_announce_validator: Some(Box::new(TestBlockAnnounceValidator)),
..Default::default()
});
net.block_until_connected();
net.block_until_idle();
let block_hash = net.peer(0).push_blocks(500, true);
net.block_until_sync();
assert!(net.peer(1).has_block(&block_hash));
}
@@ -59,6 +59,10 @@ pub trait BlockAnnounceValidator<B: Block> {
///
/// Returning [`Validation::Failure`] will lead to a decrease of the
/// peers reputation as it sent us invalid data.
///
/// The returned future should only resolve to an error iff there was an internal error validating
/// the block announcement. If the block announcement itself is invalid, this should *always*
/// return [`Validation::Failure`].
fn validate(
&mut self,
header: &B::Header,