Support synching of blocks that are not new_best (#6508)

* Start

* Remove debug println

* Add tests
This commit is contained in:
Bastian Köcher
2020-06-30 16:44:52 +02:00
committed by GitHub
parent 4fd770dcf9
commit 53be6ec510
9 changed files with 172 additions and 53 deletions
@@ -59,7 +59,7 @@ fn import_single_good_block_works() {
&mut substrate_test_runtime_client::new(),
BlockOrigin::File,
block,
&mut PassThroughVerifier(true)
&mut PassThroughVerifier::new(true)
) {
Ok(BlockImportResult::ImportedUnknown(ref num, ref aux, ref org))
if *num == number && *aux == expected_aux && *org == Some(peer_id) => {}
@@ -74,7 +74,7 @@ fn import_single_good_known_block_is_ignored() {
&mut client,
BlockOrigin::File,
block,
&mut PassThroughVerifier(true)
&mut PassThroughVerifier::new(true)
) {
Ok(BlockImportResult::ImportedKnown(ref n)) if *n == number => {}
_ => panic!()
@@ -89,7 +89,7 @@ fn import_single_good_block_without_header_fails() {
&mut substrate_test_runtime_client::new(),
BlockOrigin::File,
block,
&mut PassThroughVerifier(true)
&mut PassThroughVerifier::new(true)
) {
Err(BlockImportError::IncompleteHeader(ref org)) if *org == Some(peer_id) => {}
_ => panic!()
@@ -101,7 +101,7 @@ fn async_import_queue_drops() {
let executor = sp_core::testing::SpawnBlockingExecutor::new();
// Perform this test multiple times since it exhibits non-deterministic behavior.
for _ in 0..100 {
let verifier = PassThroughVerifier(true);
let verifier = PassThroughVerifier::new(true);
let queue = BasicQueue::new(
verifier,
+82 -10
View File
@@ -39,7 +39,7 @@ use sc_client_api::{
use sc_consensus::LongestChain;
use sc_block_builder::{BlockBuilder, BlockBuilderProvider};
use sc_network::config::Role;
use sp_consensus::block_validation::DefaultBlockAnnounceValidator;
use sp_consensus::block_validation::{DefaultBlockAnnounceValidator, BlockAnnounceValidator};
use sp_consensus::import_queue::{
BasicQueue, BoxJustificationImport, Verifier, BoxFinalityProofImport,
};
@@ -67,7 +67,33 @@ type AuthorityId = sp_consensus_babe::AuthorityId;
/// A Verifier that accepts all blocks and passes them on with the configured
/// finality to be imported.
#[derive(Clone)]
pub struct PassThroughVerifier(pub bool);
pub struct PassThroughVerifier {
finalized: bool,
fork_choice: ForkChoiceStrategy,
}
impl PassThroughVerifier {
/// Create a new instance.
///
/// Every verified block will use `finalized` for the `BlockImportParams`.
pub fn new(finalized: bool) -> Self {
Self {
finalized,
fork_choice: ForkChoiceStrategy::LongestChain,
}
}
/// Create a new instance.
///
/// Every verified block will use `finalized` for the `BlockImportParams` and
/// the given [`ForkChoiceStrategy`].
pub fn new_with_fork_choice(finalized: bool, fork_choice: ForkChoiceStrategy) -> Self {
Self {
finalized,
fork_choice,
}
}
}
/// This `Verifier` accepts all data as valid.
impl<B: BlockT> Verifier<B> for PassThroughVerifier {
@@ -85,9 +111,9 @@ impl<B: BlockT> Verifier<B> for PassThroughVerifier {
.map(|blob| vec![(well_known_cache_keys::AUTHORITIES, blob.to_vec())]);
let mut import = BlockImportParams::new(origin, header);
import.body = body;
import.finalized = self.0;
import.finalized = self.finalized;
import.justification = justification;
import.fork_choice = Some(ForkChoiceStrategy::LongestChain);
import.fork_choice = Some(self.fork_choice.clone());
Ok((import, maybe_keys))
}
@@ -294,6 +320,7 @@ impl<D> Peer<D> {
} else {
Default::default()
};
self.block_import.import_block(import_block, cache).expect("block_import failed");
self.network.service().announce_block(hash, Vec::new());
at = hash;
@@ -519,6 +546,15 @@ impl<B: BlockT> VerifierAdapter<B> {
}
}
/// Configuration for a full peer.
#[derive(Default)]
pub struct FullPeerConfig {
/// Pruning window size.
pub keep_blocks: Option<u32>,
/// Block announce validator.
pub block_announce_validator: Option<Box<dyn BlockAnnounceValidator<Block> + Send + Sync>>,
}
pub trait TestNetFactory: Sized {
type Verifier: 'static + Verifier<Block>;
type PeerData: Default;
@@ -579,12 +615,12 @@ pub trait TestNetFactory: Sized {
}
fn add_full_peer(&mut self) {
self.add_full_peer_with_states(None)
self.add_full_peer_with_config(Default::default())
}
/// Add a full peer.
fn add_full_peer_with_states(&mut self, keep_blocks: Option<u32>) {
let test_client_builder = match keep_blocks {
fn add_full_peer_with_config(&mut self, config: FullPeerConfig) {
let test_client_builder = match config.keep_blocks {
Some(keep_blocks) => TestClientBuilder::with_pruning_window(keep_blocks),
None => TestClientBuilder::with_default_backend(),
};
@@ -641,7 +677,8 @@ pub trait TestNetFactory: Sized {
transaction_pool: Arc::new(EmptyTransactionPool),
protocol_id: ProtocolId::from(&b"test-protocol-name"[..]),
import_queue,
block_announce_validator: Box::new(DefaultBlockAnnounceValidator::new(client.clone())),
block_announce_validator: config.block_announce_validator
.unwrap_or(Box::new(DefaultBlockAnnounceValidator)),
metrics_registry: None,
}).unwrap();
@@ -720,7 +757,7 @@ pub trait TestNetFactory: Sized {
transaction_pool: Arc::new(EmptyTransactionPool),
protocol_id: ProtocolId::from(&b"test-protocol-name"[..]),
import_queue,
block_announce_validator: Box::new(DefaultBlockAnnounceValidator::new(client.clone())),
block_announce_validator: Box::new(DefaultBlockAnnounceValidator),
metrics_registry: None,
}).unwrap();
@@ -787,6 +824,20 @@ pub trait TestNetFactory: Sized {
Poll::Ready(())
}
/// Polls the testnet until all peers are connected to each other.
///
/// Must be executed in a task context.
fn poll_until_connected(&mut self, cx: &mut FutureContext) -> Poll<()> {
self.poll(cx);
let num_peers = self.peers().len();
if self.peers().iter().all(|p| p.num_peers() == num_peers - 1) {
return Poll::Ready(())
}
Poll::Pending
}
/// Blocks the current thread until we are sync'ed.
///
/// Calls `poll_until_sync` repeatedly.
@@ -801,6 +852,15 @@ pub trait TestNetFactory: Sized {
futures::executor::block_on(futures::future::poll_fn::<(), _>(|cx| self.poll_until_idle(cx)));
}
/// Blocks the current thread until all peers are connected to each other.
///
/// Calls `poll_until_connected` repeatedly with the runtime passed as parameter.
fn block_until_connected(&mut self) {
futures::executor::block_on(
futures::future::poll_fn::<(), _>(|cx| self.poll_until_connected(cx)),
);
}
/// Polls the testnet. Processes all the pending actions and returns `NotReady`.
fn poll(&mut self, cx: &mut FutureContext) {
self.mut_peers(|peers| {
@@ -831,6 +891,17 @@ pub trait TestNetFactory: Sized {
pub struct TestNet {
peers: Vec<Peer<()>>,
fork_choice: ForkChoiceStrategy,
}
impl TestNet {
/// Create a `TestNet` that used the given fork choice rule.
pub fn with_fork_choice(fork_choice: ForkChoiceStrategy) -> Self {
Self {
peers: Vec::new(),
fork_choice,
}
}
}
impl TestNetFactory for TestNet {
@@ -841,13 +912,14 @@ impl TestNetFactory for TestNet {
fn from_config(_config: &ProtocolConfig) -> Self {
TestNet {
peers: Vec::new(),
fork_choice: ForkChoiceStrategy::LongestChain,
}
}
fn make_verifier(&self, _client: PeersClient, _config: &ProtocolConfig, _peer_data: &())
-> Self::Verifier
{
PassThroughVerifier(false)
PassThroughVerifier::new_with_fork_choice(false, self.fork_choice.clone())
}
fn peer(&mut self, i: usize) -> &mut Peer<()> {
+44 -4
View File
@@ -20,6 +20,8 @@ use sp_consensus::BlockOrigin;
use std::time::Duration;
use futures::executor::block_on;
use super::*;
use sp_consensus::block_validation::Validation;
use substrate_test_runtime::Header;
fn test_ancestor_search_when_common_is(n: usize) {
let _ = ::env_logger::try_init();
@@ -582,10 +584,10 @@ fn can_sync_explicit_forks() {
#[test]
fn syncs_header_only_forks() {
let _ = ::env_logger::try_init();
let _ = env_logger::try_init();
let mut net = TestNet::new(0);
net.add_full_peer_with_states(None);
net.add_full_peer_with_states(Some(3));
net.add_full_peer_with_config(Default::default());
net.add_full_peer_with_config(FullPeerConfig { keep_blocks: Some(3), ..Default::default() });
net.peer(0).push_blocks(2, false);
net.peer(1).push_blocks(2, false);
@@ -683,7 +685,7 @@ fn imports_stale_once() {
#[test]
fn can_sync_to_peers_with_wrong_common_block() {
let _ = ::env_logger::try_init();
let _ = env_logger::try_init();
let mut net = TestNet::new(2);
net.peer(0).push_blocks(2, true);
@@ -710,3 +712,41 @@ fn can_sync_to_peers_with_wrong_common_block() {
assert!(net.peer(1).client().header(&BlockId::Hash(final_hash)).unwrap().is_some());
}
/// Returns `is_new_best = true` for each validated announcement.
struct NewBestBlockAnnounceValidator;
impl BlockAnnounceValidator<Block> for NewBestBlockAnnounceValidator {
fn validate(
&mut self,
_: &Header,
_: &[u8],
) -> Result<Validation, Box<dyn std::error::Error + Send>> {
Ok(Validation::Success { is_new_best: true })
}
}
#[test]
fn sync_blocks_when_block_announce_validator_says_it_is_new_best() {
let _ = env_logger::try_init();
log::trace!(target: "sync", "Test");
let mut net = TestNet::with_fork_choice(ForkChoiceStrategy::Custom(false));
net.add_full_peer_with_config(Default::default());
net.add_full_peer_with_config(Default::default());
net.add_full_peer_with_config(FullPeerConfig {
block_announce_validator: Some(Box::new(NewBestBlockAnnounceValidator)),
..Default::default()
});
net.block_until_connected();
let block_hash = net.peer(0).push_blocks(1, false);
while !net.peer(2).has_block(&block_hash) {
net.block_until_idle();
}
// Peer1 should not have the block, because peer 0 did not reported the block
// as new best. However, peer2 has a special block announcement validator
// that flags all blocks as `is_new_best` and thus, it should have synced the blocks.
assert!(!net.peer(1).has_block(&block_hash));
}