mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-09 20:11:09 +00:00
Fixed sync skipping some block announcements (#8459)
* Fixed sync missing some block announcements * Apply suggestions from code review Co-authored-by: André Silva <123550+andresilva@users.noreply.github.com> Co-authored-by: André Silva <123550+andresilva@users.noreply.github.com>
This commit is contained in:
@@ -144,6 +144,26 @@ pub struct RemoteReadResponse {
|
||||
pub proof: StorageProof,
|
||||
}
|
||||
|
||||
/// Announcement summary used for debug logging.
|
||||
#[derive(Debug)]
|
||||
pub struct AnnouncementSummary<H: HeaderT> {
|
||||
block_hash: H::Hash,
|
||||
number: H::Number,
|
||||
parent_hash: H::Hash,
|
||||
state: Option<BlockState>,
|
||||
}
|
||||
|
||||
impl<H: HeaderT> generic::BlockAnnounce<H> {
|
||||
pub fn summary(&self) -> AnnouncementSummary<H> {
|
||||
AnnouncementSummary {
|
||||
block_hash: self.header.hash(),
|
||||
number: *self.header.number(),
|
||||
parent_hash: self.header.parent_hash().clone(),
|
||||
state: self.state,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Generic types.
|
||||
pub mod generic {
|
||||
use bitflags::bitflags;
|
||||
|
||||
@@ -505,9 +505,10 @@ impl<B: BlockT> ChainSync<B> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Number of active sync requests.
|
||||
/// Number of active forks requests. This includes
|
||||
/// requests that are pending or could be issued right away.
|
||||
pub fn num_sync_requests(&self) -> usize {
|
||||
self.fork_targets.len()
|
||||
self.fork_targets.values().filter(|f| f.number <= self.best_queued_number).count()
|
||||
}
|
||||
|
||||
/// Number of downloaded blocks.
|
||||
@@ -1421,23 +1422,36 @@ impl<B: BlockT> ChainSync<B> {
|
||||
&mut self,
|
||||
pre_validation_result: PreValidateBlockAnnounce<B::Header>,
|
||||
) -> PollBlockAnnounceValidation<B::Header> {
|
||||
trace!(
|
||||
target: "sync",
|
||||
"Finished block announce validation: {:?}",
|
||||
pre_validation_result,
|
||||
);
|
||||
|
||||
let (announce, is_best, who) = match pre_validation_result {
|
||||
PreValidateBlockAnnounce::Failure { who, disconnect } => {
|
||||
debug!(
|
||||
target: "sync",
|
||||
"Failed announce validation: {:?}, disconnect: {}",
|
||||
who,
|
||||
disconnect,
|
||||
);
|
||||
return PollBlockAnnounceValidation::Failure { who, disconnect }
|
||||
},
|
||||
PreValidateBlockAnnounce::Process { announce, is_new_best, who } => {
|
||||
(announce, is_new_best, who)
|
||||
},
|
||||
PreValidateBlockAnnounce::Error { .. } | PreValidateBlockAnnounce::Skip =>
|
||||
return PollBlockAnnounceValidation::Skip,
|
||||
PreValidateBlockAnnounce::Error { .. } | PreValidateBlockAnnounce::Skip => {
|
||||
debug!(
|
||||
target: "sync",
|
||||
"Ignored announce validation",
|
||||
);
|
||||
return PollBlockAnnounceValidation::Skip
|
||||
},
|
||||
};
|
||||
|
||||
trace!(
|
||||
target: "sync",
|
||||
"Finished block announce validation: from {:?}: {:?}. local_best={}",
|
||||
who,
|
||||
announce.summary(),
|
||||
is_best,
|
||||
);
|
||||
|
||||
let number = *announce.header.number();
|
||||
let hash = announce.header.hash();
|
||||
let parent_status = self.block_status(announce.header.parent_hash()).unwrap_or(BlockStatus::Unknown);
|
||||
@@ -1508,25 +1522,22 @@ impl<B: BlockT> ChainSync<B> {
|
||||
return PollBlockAnnounceValidation::ImportHeader { is_best, announce, who }
|
||||
}
|
||||
|
||||
if number <= self.best_queued_number {
|
||||
trace!(
|
||||
target: "sync",
|
||||
"Added sync target for block announced from {}: {} {:?}",
|
||||
who,
|
||||
hash,
|
||||
announce.header,
|
||||
);
|
||||
self.fork_targets
|
||||
.entry(hash.clone())
|
||||
.or_insert_with(|| ForkTarget {
|
||||
number,
|
||||
parent_hash: Some(*announce.header.parent_hash()),
|
||||
peers: Default::default(),
|
||||
})
|
||||
.peers.insert(who.clone());
|
||||
}
|
||||
trace!(
|
||||
target: "sync",
|
||||
"Added sync target for block announced from {}: {} {:?}",
|
||||
who,
|
||||
hash,
|
||||
announce.summary(),
|
||||
);
|
||||
self.fork_targets
|
||||
.entry(hash.clone())
|
||||
.or_insert_with(|| ForkTarget {
|
||||
number,
|
||||
parent_hash: Some(*announce.header.parent_hash()),
|
||||
peers: Default::default(),
|
||||
})
|
||||
.peers.insert(who.clone());
|
||||
|
||||
trace!(target: "sync", "Announce validation result is nothing");
|
||||
PollBlockAnnounceValidation::Nothing { is_best, who, announce }
|
||||
}
|
||||
|
||||
|
||||
@@ -489,6 +489,11 @@ impl<D, B> Peer<D, B> where
|
||||
&self.network.service()
|
||||
}
|
||||
|
||||
/// Get a reference to the network worker.
|
||||
pub fn network(&self) -> &NetworkWorker<Block, <Block as BlockT>::Hash> {
|
||||
&self.network
|
||||
}
|
||||
|
||||
/// Test helper to compare the blockchain state of multiple (networked)
|
||||
/// clients.
|
||||
pub fn blockchain_canon_equals(&self, other: &Self) -> bool {
|
||||
@@ -985,12 +990,12 @@ pub trait TestNetFactory: Sized where <Self::BlockImport as BlockImport<Block>>:
|
||||
/// Polls the testnet. Processes all the pending actions.
|
||||
fn poll(&mut self, cx: &mut FutureContext) {
|
||||
self.mut_peers(|peers| {
|
||||
for peer in peers {
|
||||
trace!(target: "sync", "-- Polling {}", peer.id());
|
||||
for (i, peer) in peers.into_iter().enumerate() {
|
||||
trace!(target: "sync", "-- Polling {}: {}", i, peer.id());
|
||||
if let Poll::Ready(()) = peer.network.poll_unpin(cx) {
|
||||
panic!("NetworkWorker has terminated unexpectedly.")
|
||||
}
|
||||
trace!(target: "sync", "-- Polling complete {}", peer.id());
|
||||
trace!(target: "sync", "-- Polling complete {}: {}", i, peer.id());
|
||||
|
||||
// We poll `imported_blocks_stream`.
|
||||
while let Poll::Ready(Some(notification)) = peer.imported_blocks_stream.as_mut().poll_next(cx) {
|
||||
|
||||
@@ -740,6 +740,27 @@ impl BlockAnnounceValidator<Block> for NewBestBlockAnnounceValidator {
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns `Validation::Failure` for specified block number
|
||||
struct FailingBlockAnnounceValidator(u64);
|
||||
|
||||
impl BlockAnnounceValidator<Block> for FailingBlockAnnounceValidator {
|
||||
fn validate(
|
||||
&mut self,
|
||||
header: &Header,
|
||||
_: &[u8],
|
||||
) -> Pin<Box<dyn Future<Output = Result<Validation, Box<dyn std::error::Error + Send>>> + Send>> {
|
||||
let number = *header.number();
|
||||
let target_number = self.0;
|
||||
async move { Ok(
|
||||
if number == target_number {
|
||||
Validation::Failure { disconnect: false }
|
||||
} else {
|
||||
Validation::Success { is_new_best: true }
|
||||
}
|
||||
) }.boxed()
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn sync_blocks_when_block_announce_validator_says_it_is_new_best() {
|
||||
sp_tracing::try_init_simple();
|
||||
@@ -1010,3 +1031,59 @@ fn multiple_requests_are_accepted_as_long_as_they_are_not_fulfilled() {
|
||||
Poll::Ready(())
|
||||
}));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn syncs_all_forks_from_single_peer() {
|
||||
sp_tracing::try_init_simple();
|
||||
let mut net = TestNet::new(2);
|
||||
net.peer(0).push_blocks(10, false);
|
||||
net.peer(1).push_blocks(10, false);
|
||||
|
||||
// poll until the two nodes connect, otherwise announcing the block will not work
|
||||
net.block_until_connected();
|
||||
|
||||
// Peer 0 produces new blocks and announces.
|
||||
let branch1 = net.peer(0).push_blocks_at(BlockId::Number(10), 2, true);
|
||||
|
||||
// Wait till peer 1 starts downloading
|
||||
block_on(futures::future::poll_fn::<(), _>(|cx| {
|
||||
net.poll(cx);
|
||||
if net.peer(1).network().best_seen_block() != Some(12) {
|
||||
return Poll::Pending
|
||||
}
|
||||
Poll::Ready(())
|
||||
}));
|
||||
|
||||
// Peer 0 produces and announces another fork
|
||||
let branch2 = net.peer(0).push_blocks_at(BlockId::Number(10), 2, false);
|
||||
|
||||
net.block_until_sync();
|
||||
|
||||
// Peer 1 should have both branches,
|
||||
assert!(net.peer(1).client().header(&BlockId::Hash(branch1)).unwrap().is_some());
|
||||
assert!(net.peer(1).client().header(&BlockId::Hash(branch2)).unwrap().is_some());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn syncs_after_missing_announcement() {
|
||||
sp_tracing::try_init_simple();
|
||||
let mut net = TestNet::new(0);
|
||||
net.add_full_peer_with_config(Default::default());
|
||||
// Set peer 1 to ignore announcement
|
||||
net.add_full_peer_with_config(FullPeerConfig {
|
||||
block_announce_validator: Some(Box::new(FailingBlockAnnounceValidator(11))),
|
||||
..Default::default()
|
||||
});
|
||||
net.peer(0).push_blocks(10, false);
|
||||
net.peer(1).push_blocks(10, false);
|
||||
|
||||
net.block_until_connected();
|
||||
|
||||
// Peer 0 produces a new block and announces. Peer 1 ignores announcement.
|
||||
net.peer(0).push_blocks_at(BlockId::Number(10), 1, false);
|
||||
// Peer 0 produces another block and announces.
|
||||
let final_block = net.peer(0).push_blocks_at(BlockId::Number(11), 1, false);
|
||||
net.peer(1).push_blocks_at(BlockId::Number(10), 1, true);
|
||||
net.block_until_sync();
|
||||
assert!(net.peer(1).client().header(&BlockId::Hash(final_block)).unwrap().is_some());
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user