Fix a bunch of low work dependency cycles (#4354)

* take test-client off sp-consensus

* use test primitives rather than test client in authority discovery tests

* move runtime-interface tests

* don't forget to remove the dev-dependency

* remove more unneeded dev deps

* add changes_trie_config to test prrimitives

* Separate network crates from its integration tests

* Fix up consensus crates for networking test changes

* remove unnecessary dependencies

* remove unused addition

* remove unnecessary dev-dependencies

* fixing finality grandpa tests

* removing unnecessary executor dependencies
This commit is contained in:
Benjamin Kampmann
2019-12-11 10:27:34 +01:00
committed by GitHub
parent ed50be1eb5
commit 605c0e655e
30 changed files with 249 additions and 161 deletions
@@ -0,0 +1,89 @@
// Copyright 2017-2019 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
//! Testing block import logic.
use consensus::ImportedAux;
use consensus::import_queue::{
import_single_block, BasicQueue, BlockImportError, BlockImportResult, IncomingBlock,
};
use test_client::{self, prelude::*};
use test_client::runtime::{Block, Hash};
use sp_runtime::generic::BlockId;
use super::*;
fn prepare_good_block() -> (TestClient, Hash, u64, PeerId, IncomingBlock<Block>) {
let client = test_client::new();
let block = client.new_block(Default::default()).unwrap().bake().unwrap();
client.import(BlockOrigin::File, block).unwrap();
let (hash, number) = (client.block_hash(1).unwrap().unwrap(), 1);
let header = client.header(&BlockId::Number(1)).unwrap();
let justification = client.justification(&BlockId::Number(1)).unwrap();
let peer_id = PeerId::random();
(client, hash, number, peer_id.clone(), IncomingBlock {
hash,
header,
body: Some(Vec::new()),
justification,
origin: Some(peer_id.clone()),
allow_missing_state: false,
import_existing: false,
})
}
#[test]
fn import_single_good_block_works() {
let (_, _hash, number, peer_id, block) = prepare_good_block();
let mut expected_aux = ImportedAux::default();
expected_aux.is_new_best = true;
match import_single_block(&mut test_client::new(), BlockOrigin::File, block, &mut PassThroughVerifier(true)) {
Ok(BlockImportResult::ImportedUnknown(ref num, ref aux, ref org))
if *num == number && *aux == expected_aux && *org == Some(peer_id) => {}
r @ _ => panic!("{:?}", r)
}
}
#[test]
fn import_single_good_known_block_is_ignored() {
let (mut client, _hash, number, _, block) = prepare_good_block();
match import_single_block(&mut client, BlockOrigin::File, block, &mut PassThroughVerifier(true)) {
Ok(BlockImportResult::ImportedKnown(ref n)) if *n == number => {}
_ => panic!()
}
}
#[test]
fn import_single_good_block_without_header_fails() {
let (_, _, _, peer_id, mut block) = prepare_good_block();
block.header = None;
match import_single_block(&mut test_client::new(), BlockOrigin::File, block, &mut PassThroughVerifier(true)) {
Err(BlockImportError::IncompleteHeader(ref org)) if *org == Some(peer_id) => {}
_ => panic!()
}
}
#[test]
fn async_import_queue_drops() {
// Perform this test multiple times since it exhibits non-deterministic behavior.
for _ in 0..100 {
let verifier = PassThroughVerifier(true);
let queue = BasicQueue::new(verifier, Box::new(test_client::new()), None, None);
drop(queue);
}
}
+823
View File
@@ -0,0 +1,823 @@
// Copyright 2017-2019 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
#![allow(missing_docs)]
#[cfg(test)]
mod block_import;
#[cfg(test)]
mod sync;
use std::collections::HashMap;
use std::sync::Arc;
use libp2p::build_multiaddr;
use log::trace;
use sc_network::FinalityProofProvider;
use sp_blockchain::{
Result as ClientResult, well_known_cache_keys::{self, Id as CacheKeyId},
};
use client_api::{
ClientInfo, BlockchainEvents, BlockImportNotification,
FinalityNotifications, ImportNotifications,
FinalityNotification,
backend::{AuxStore, Backend, Finalizer}
};
use block_builder::BlockBuilder;
use client::LongestChain;
use sc_network::config::Roles;
use consensus::block_validation::DefaultBlockAnnounceValidator;
use consensus::import_queue::BasicQueue;
use consensus::import_queue::{
BoxBlockImport, BoxJustificationImport, Verifier, BoxFinalityProofImport,
};
use consensus::block_import::{BlockImport, ImportResult};
use consensus::Error as ConsensusError;
use consensus::{BlockOrigin, ForkChoiceStrategy, BlockImportParams, BlockCheckParams, JustificationImport};
use futures::prelude::*;
use futures03::{StreamExt as _, TryStreamExt as _};
use sc_network::{NetworkWorker, NetworkService, ReportHandle, config::ProtocolId};
use sc_network::config::{NetworkConfiguration, TransportConfig, BoxFinalityProofRequestBuilder};
use libp2p::PeerId;
use parking_lot::Mutex;
use primitives::H256;
use sc_network::{Context, ProtocolConfig};
use sp_runtime::generic::{BlockId, OpaqueDigestItemId};
use sp_runtime::traits::{Block as BlockT, Header, NumberFor};
use sp_runtime::Justification;
use sc_network::TransactionPool;
use sc_network::specialization::NetworkSpecialization;
use test_client::{self, AccountKeyring};
pub use test_client::runtime::{Block, Extrinsic, Hash, Transfer};
pub use test_client::{TestClient, TestClientBuilder, TestClientBuilderExt};
type AuthorityId = babe_primitives::AuthorityId;
/// A Verifier that accepts all blocks and passes them on with the configured
/// finality to be imported.
#[derive(Clone)]
pub struct PassThroughVerifier(pub bool);
/// This `Verifier` accepts all data as valid.
impl<B: BlockT> Verifier<B> for PassThroughVerifier {
fn verify(
&mut self,
origin: BlockOrigin,
header: B::Header,
justification: Option<Justification>,
body: Option<Vec<B::Extrinsic>>
) -> Result<(BlockImportParams<B>, Option<Vec<(CacheKeyId, Vec<u8>)>>), String> {
let maybe_keys = header.digest()
.log(|l| l.try_as_raw(OpaqueDigestItemId::Consensus(b"aura"))
.or_else(|| l.try_as_raw(OpaqueDigestItemId::Consensus(b"babe")))
)
.map(|blob| vec![(well_known_cache_keys::AUTHORITIES, blob.to_vec())]);
Ok((BlockImportParams {
origin,
header,
body,
finalized: self.0,
justification,
post_digests: vec![],
auxiliary: Vec::new(),
fork_choice: ForkChoiceStrategy::LongestChain,
allow_missing_state: false,
import_existing: false,
}, maybe_keys))
}
}
/// The test specialization.
#[derive(Clone)]
pub struct DummySpecialization;
impl NetworkSpecialization<Block> for DummySpecialization {
fn status(&self) -> Vec<u8> {
vec![]
}
fn on_connect(
&mut self,
_ctx: &mut dyn Context<Block>,
_peer_id: PeerId,
_status: sc_network::message::Status<Block>
) {}
fn on_disconnect(&mut self, _ctx: &mut dyn Context<Block>, _peer_id: PeerId) {}
fn on_message(
&mut self,
_ctx: &mut dyn Context<Block>,
_peer_id: PeerId,
_message: Vec<u8>,
) {}
}
pub type PeersFullClient =
client::Client<test_client::Backend, test_client::Executor, Block, test_client::runtime::RuntimeApi>;
pub type PeersLightClient =
client::Client<test_client::LightBackend, test_client::LightExecutor, Block, test_client::runtime::RuntimeApi>;
#[derive(Clone)]
pub enum PeersClient {
Full(Arc<PeersFullClient>, Arc<test_client::Backend>),
Light(Arc<PeersLightClient>, Arc<test_client::LightBackend>),
}
impl PeersClient {
pub fn as_full(&self) -> Option<Arc<PeersFullClient>> {
match *self {
PeersClient::Full(ref client, ref _backend) => Some(client.clone()),
_ => None,
}
}
pub fn as_block_import(&self) -> BoxBlockImport<Block> {
match *self {
PeersClient::Full(ref client, ref _backend) => Box::new(client.clone()) as _,
PeersClient::Light(ref client, ref _backend) => Box::new(client.clone()) as _,
}
}
pub fn get_aux(&self, key: &[u8]) -> ClientResult<Option<Vec<u8>>> {
match *self {
PeersClient::Full(ref client, ref _backend) => client.get_aux(key),
PeersClient::Light(ref client, ref _backend) => client.get_aux(key),
}
}
pub fn info(&self) -> ClientInfo<Block> {
match *self {
PeersClient::Full(ref client, ref _backend) => client.info(),
PeersClient::Light(ref client, ref _backend) => client.info(),
}
}
pub fn header(&self, block: &BlockId<Block>) -> ClientResult<Option<<Block as BlockT>::Header>> {
match *self {
PeersClient::Full(ref client, ref _backend) => client.header(block),
PeersClient::Light(ref client, ref _backend) => client.header(block),
}
}
pub fn justification(&self, block: &BlockId<Block>) -> ClientResult<Option<Justification>> {
match *self {
PeersClient::Full(ref client, ref _backend) => client.justification(block),
PeersClient::Light(ref client, ref _backend) => client.justification(block),
}
}
pub fn finality_notification_stream(&self) -> FinalityNotifications<Block> {
match *self {
PeersClient::Full(ref client, ref _backend) => client.finality_notification_stream(),
PeersClient::Light(ref client, ref _backend) => client.finality_notification_stream(),
}
}
pub fn import_notification_stream(&self) -> ImportNotifications<Block>{
match *self {
PeersClient::Full(ref client, ref _backend) => client.import_notification_stream(),
PeersClient::Light(ref client, ref _backend) => client.import_notification_stream(),
}
}
pub fn finalize_block(
&self,
id: BlockId<Block>,
justification: Option<Justification>,
notify: bool
) -> ClientResult<()> {
match *self {
PeersClient::Full(ref client, ref _backend) => client.finalize_block(id, justification, notify),
PeersClient::Light(ref client, ref _backend) => client.finalize_block(id, justification, notify),
}
}
}
pub struct Peer<D, S: NetworkSpecialization<Block>> {
pub data: D,
client: PeersClient,
/// We keep a copy of the verifier so that we can invoke it for locally-generated blocks,
/// instead of going through the import queue.
verifier: VerifierAdapter<dyn Verifier<Block>>,
/// We keep a copy of the block_import so that we can invoke it for locally-generated blocks,
/// instead of going through the import queue.
block_import: Box<dyn BlockImport<Block, Error = ConsensusError>>,
select_chain: Option<LongestChain<test_client::Backend, Block>>,
backend: Option<Arc<test_client::Backend>>,
network: NetworkWorker<Block, S, <Block as BlockT>::Hash>,
imported_blocks_stream: Box<dyn Stream<Item = BlockImportNotification<Block>, Error = ()> + Send>,
finality_notification_stream: Box<dyn Stream<Item = FinalityNotification<Block>, Error = ()> + Send>,
}
impl<D, S: NetworkSpecialization<Block>> Peer<D, S> {
/// Get this peer ID.
pub fn id(&self) -> PeerId {
self.network.service().local_peer_id()
}
/// Returns true if we're major syncing.
pub fn is_major_syncing(&self) -> bool {
self.network.service().is_major_syncing()
}
// Returns a clone of the local SelectChain, only available on full nodes
pub fn select_chain(&self) -> Option<LongestChain<test_client::Backend, Block>> {
self.select_chain.clone()
}
/// Returns the number of peers we're connected to.
pub fn num_peers(&self) -> usize {
self.network.num_connected_peers()
}
/// Returns true if we have no peer.
pub fn is_offline(&self) -> bool {
self.num_peers() == 0
}
/// Request a justification for the given block.
pub fn request_justification(&self, hash: &<Block as BlockT>::Hash, number: NumberFor<Block>) {
self.network.service().request_justification(hash, number);
}
/// Announces an important block on the network.
pub fn announce_block(&self, hash: <Block as BlockT>::Hash, data: Vec<u8>) {
self.network.service().announce_block(hash, data);
}
/// Request explicit fork sync.
pub fn set_sync_fork_request(&self, peers: Vec<PeerId>, hash: <Block as BlockT>::Hash, number: NumberFor<Block>) {
self.network.service().set_sync_fork_request(peers, hash, number);
}
/// Add blocks to the peer -- edit the block before adding
pub fn generate_blocks<F>(&mut self, count: usize, origin: BlockOrigin, edit_block: F) -> H256
where F: FnMut(BlockBuilder<Block, PeersFullClient>) -> Block
{
let best_hash = self.client.info().chain.best_hash;
self.generate_blocks_at(BlockId::Hash(best_hash), count, origin, edit_block)
}
/// Add blocks to the peer -- edit the block before adding. The chain will
/// start at the given block iD.
fn generate_blocks_at<F>(
&mut self,
at: BlockId<Block>,
count: usize,
origin: BlockOrigin,
mut edit_block: F
) -> H256 where F: FnMut(BlockBuilder<Block, PeersFullClient>) -> Block {
let full_client = self.client.as_full().expect("blocks could only be generated by full clients");
let mut at = full_client.header(&at).unwrap().unwrap().hash();
for _ in 0..count {
let builder = full_client.new_block_at(&BlockId::Hash(at), Default::default()
).unwrap();
let block = edit_block(builder);
let hash = block.header.hash();
trace!(
target: "test_network",
"Generating {}, (#{}, parent={})",
hash,
block.header.number,
block.header.parent_hash
);
let header = block.header.clone();
let (import_block, cache) = self.verifier.verify(
origin,
header.clone(),
None,
Some(block.extrinsics)
).unwrap();
let cache = if let Some(cache) = cache {
cache.into_iter().collect()
} else {
Default::default()
};
self.block_import.import_block(import_block, cache).expect("block_import failed");
self.network.on_block_imported(hash, header, Vec::new(), true);
at = hash;
}
self.network.service().announce_block(at.clone(), Vec::new());
at
}
/// Push blocks to the peer (simplified: with or without a TX)
pub fn push_blocks(&mut self, count: usize, with_tx: bool) -> H256 {
let best_hash = self.client.info().chain.best_hash;
self.push_blocks_at(BlockId::Hash(best_hash), count, with_tx)
}
/// Push blocks to the peer (simplified: with or without a TX) starting from
/// given hash.
pub fn push_blocks_at(&mut self, at: BlockId<Block>, count: usize, with_tx: bool) -> H256 {
let mut nonce = 0;
if with_tx {
self.generate_blocks_at(at, count, BlockOrigin::File, |mut builder| {
let transfer = Transfer {
from: AccountKeyring::Alice.into(),
to: AccountKeyring::Alice.into(),
amount: 1,
nonce,
};
builder.push(transfer.into_signed_tx()).unwrap();
nonce = nonce + 1;
builder.bake().unwrap()
})
} else {
self.generate_blocks_at(at, count, BlockOrigin::File, |builder| builder.bake().unwrap())
}
}
pub fn push_authorities_change_block(&mut self, new_authorities: Vec<AuthorityId>) -> H256 {
self.generate_blocks(1, BlockOrigin::File, |mut builder| {
builder.push(Extrinsic::AuthoritiesChange(new_authorities.clone())).unwrap();
builder.bake().unwrap()
})
}
/// Get a reference to the client.
pub fn client(&self) -> &PeersClient {
&self.client
}
/// Get a reference to the network service.
pub fn network_service(&self) -> &Arc<NetworkService<Block, S, <Block as BlockT>::Hash>> {
&self.network.service()
}
/// Test helper to compare the blockchain state of multiple (networked)
/// clients.
/// Potentially costly, as it creates in-memory copies of both blockchains in order
/// to compare them. If you have easier/softer checks that are sufficient, e.g.
/// by using .info(), you should probably use it instead of this.
pub fn blockchain_canon_equals(&self, other: &Self) -> bool {
if let (Some(mine), Some(others)) = (self.backend.clone(), other.backend.clone()) {
mine.as_in_memory().blockchain()
.canon_equals_to(others.as_in_memory().blockchain())
} else {
false
}
}
/// Count the total number of imported blocks.
pub fn blocks_count(&self) -> u64 {
self.backend.as_ref().map(
|backend| backend.blocks_count()
).unwrap_or(0)
}
}
pub struct EmptyTransactionPool;
impl TransactionPool<Hash, Block> for EmptyTransactionPool {
fn transactions(&self) -> Vec<(Hash, Extrinsic)> {
Vec::new()
}
fn hash_of(&self, _transaction: &Extrinsic) -> Hash {
Hash::default()
}
fn import(
&self,
_report_handle: ReportHandle,
_who: PeerId,
_rep_change_good: sc_network::ReputationChange,
_rep_change_bad: sc_network::ReputationChange,
_transaction: Extrinsic
) {}
fn on_broadcasted(&self, _: HashMap<Hash, Vec<String>>) {}
}
pub trait SpecializationFactory {
fn create() -> Self;
}
impl SpecializationFactory for DummySpecialization {
fn create() -> DummySpecialization {
DummySpecialization
}
}
/// Implements `BlockImport` on an `Arc<Mutex<impl BlockImport>>`. Used internally. Necessary to overcome the way the
/// `TestNet` trait is designed, more specifically `make_block_import` returning a `Box<BlockImport>` makes it
/// impossible to clone the underlying object.
struct BlockImportAdapter<T: ?Sized>(Arc<Mutex<Box<T>>>);
impl<T: ?Sized> Clone for BlockImportAdapter<T> {
fn clone(&self) -> Self {
BlockImportAdapter(self.0.clone())
}
}
impl<T: ?Sized + BlockImport<Block>> BlockImport<Block> for BlockImportAdapter<T> {
type Error = T::Error;
fn check_block(
&mut self,
block: BlockCheckParams<Block>,
) -> Result<ImportResult, Self::Error> {
self.0.lock().check_block(block)
}
fn import_block(
&mut self,
block: BlockImportParams<Block>,
cache: HashMap<well_known_cache_keys::Id, Vec<u8>>,
) -> Result<ImportResult, Self::Error> {
self.0.lock().import_block(block, cache)
}
}
/// Implements `Verifier` on an `Arc<Mutex<impl Verifier>>`. Used internally.
struct VerifierAdapter<T: ?Sized>(Arc<Mutex<Box<T>>>);
impl<T: ?Sized> Clone for VerifierAdapter<T> {
fn clone(&self) -> Self {
VerifierAdapter(self.0.clone())
}
}
impl<B: BlockT, T: ?Sized + Verifier<B>> Verifier<B> for VerifierAdapter<T> {
fn verify(
&mut self,
origin: BlockOrigin,
header: B::Header,
justification: Option<Justification>,
body: Option<Vec<B::Extrinsic>>
) -> Result<(BlockImportParams<B>, Option<Vec<(CacheKeyId, Vec<u8>)>>), String> {
self.0.lock().verify(origin, header, justification, body)
}
}
pub trait TestNetFactory: Sized {
type Specialization: NetworkSpecialization<Block> + SpecializationFactory;
type Verifier: 'static + Verifier<Block>;
type PeerData: Default;
/// These two need to be implemented!
fn from_config(config: &ProtocolConfig) -> Self;
fn make_verifier(
&self,
client: PeersClient,
config: &ProtocolConfig,
peer_data: &Self::PeerData,
) -> Self::Verifier;
/// Get reference to peer.
fn peer(&mut self, i: usize) -> &mut Peer<Self::PeerData, Self::Specialization>;
fn peers(&self) -> &Vec<Peer<Self::PeerData, Self::Specialization>>;
fn mut_peers<F: FnOnce(&mut Vec<Peer<Self::PeerData, Self::Specialization>>)>(&mut self, closure: F);
/// Get custom block import handle for fresh client, along with peer data.
fn make_block_import(&self, client: PeersClient)
-> (
BoxBlockImport<Block>,
Option<BoxJustificationImport<Block>>,
Option<BoxFinalityProofImport<Block>>,
Option<BoxFinalityProofRequestBuilder<Block>>,
Self::PeerData,
)
{
(client.as_block_import(), None, None, None, Default::default())
}
/// Get finality proof provider (if supported).
fn make_finality_proof_provider(&self, _client: PeersClient) -> Option<Arc<dyn FinalityProofProvider<Block>>> {
None
}
fn default_config() -> ProtocolConfig {
ProtocolConfig::default()
}
/// Create new test network with this many peers.
fn new(n: usize) -> Self {
trace!(target: "test_network", "Creating test network");
let config = Self::default_config();
let mut net = Self::from_config(&config);
for i in 0..n {
trace!(target: "test_network", "Adding peer {}", i);
net.add_full_peer(&config);
}
net
}
fn add_full_peer(&mut self, config: &ProtocolConfig) {
self.add_full_peer_with_states(config, None)
}
/// Add a full peer.
fn add_full_peer_with_states(&mut self, config: &ProtocolConfig, keep_blocks: Option<u32>) {
let test_client_builder = match keep_blocks {
Some(keep_blocks) => TestClientBuilder::with_pruning_window(keep_blocks),
None => TestClientBuilder::with_default_backend(),
};
let backend = test_client_builder.backend();
let (c, longest_chain) = test_client_builder.build_with_longest_chain();
let client = Arc::new(c);
let (
block_import,
justification_import,
finality_proof_import,
finality_proof_request_builder,
data,
) = self.make_block_import(PeersClient::Full(client.clone(), backend.clone()));
let block_import = BlockImportAdapter(Arc::new(Mutex::new(block_import)));
let verifier = self.make_verifier(
PeersClient::Full(client.clone(), backend.clone()),
config,
&data,
);
let verifier = VerifierAdapter(Arc::new(Mutex::new(Box::new(verifier) as Box<_>)));
let import_queue = Box::new(BasicQueue::new(
verifier.clone(),
Box::new(block_import.clone()),
justification_import,
finality_proof_import,
));
let listen_addr = build_multiaddr![Memory(rand::random::<u64>())];
let network = NetworkWorker::new(sc_network::config::Params {
roles: config.roles,
network_config: NetworkConfiguration {
listen_addresses: vec![listen_addr.clone()],
transport: TransportConfig::MemoryOnly,
..NetworkConfiguration::default()
},
chain: client.clone(),
finality_proof_provider: self.make_finality_proof_provider(PeersClient::Full(client.clone(), backend.clone())),
finality_proof_request_builder,
on_demand: None,
transaction_pool: Arc::new(EmptyTransactionPool),
protocol_id: ProtocolId::from(&b"test-protocol-name"[..]),
import_queue,
specialization: self::SpecializationFactory::create(),
block_announce_validator: Box::new(DefaultBlockAnnounceValidator::new(client.clone()))
}).unwrap();
self.mut_peers(|peers| {
for peer in peers.iter_mut() {
peer.network.add_known_address(network.service().local_peer_id(), listen_addr.clone());
}
let imported_blocks_stream = Box::new(client.import_notification_stream()
.map(|v| Ok::<_, ()>(v)).compat().fuse());
let finality_notification_stream = Box::new(client.finality_notification_stream()
.map(|v| Ok::<_, ()>(v)).compat().fuse());
peers.push(Peer {
data,
client: PeersClient::Full(client, backend.clone()),
select_chain: Some(longest_chain),
backend: Some(backend),
imported_blocks_stream,
finality_notification_stream,
block_import: Box::new(block_import),
verifier,
network,
});
});
}
/// Add a light peer.
fn add_light_peer(&mut self, config: &ProtocolConfig) {
let mut config = config.clone();
config.roles = Roles::LIGHT;
let (c, backend) = test_client::new_light();
let client = Arc::new(c);
let (
block_import,
justification_import,
finality_proof_import,
finality_proof_request_builder,
data,
) = self.make_block_import(PeersClient::Light(client.clone(), backend.clone()));
let block_import = BlockImportAdapter(Arc::new(Mutex::new(block_import)));
let verifier = self.make_verifier(
PeersClient::Light(client.clone(), backend.clone()),
&config,
&data,
);
let verifier = VerifierAdapter(Arc::new(Mutex::new(Box::new(verifier) as Box<_>)));
let import_queue = Box::new(BasicQueue::new(
verifier.clone(),
Box::new(block_import.clone()),
justification_import,
finality_proof_import,
));
let listen_addr = build_multiaddr![Memory(rand::random::<u64>())];
let network = NetworkWorker::new(sc_network::config::Params {
roles: config.roles,
network_config: NetworkConfiguration {
listen_addresses: vec![listen_addr.clone()],
transport: TransportConfig::MemoryOnly,
..NetworkConfiguration::default()
},
chain: client.clone(),
finality_proof_provider: self.make_finality_proof_provider(PeersClient::Light(client.clone(), backend.clone())),
finality_proof_request_builder,
on_demand: None,
transaction_pool: Arc::new(EmptyTransactionPool),
protocol_id: ProtocolId::from(&b"test-protocol-name"[..]),
import_queue,
specialization: self::SpecializationFactory::create(),
block_announce_validator: Box::new(DefaultBlockAnnounceValidator::new(client.clone()))
}).unwrap();
self.mut_peers(|peers| {
for peer in peers.iter_mut() {
peer.network.add_known_address(network.service().local_peer_id(), listen_addr.clone());
}
let imported_blocks_stream = Box::new(client.import_notification_stream()
.map(|v| Ok::<_, ()>(v)).compat().fuse());
let finality_notification_stream = Box::new(client.finality_notification_stream()
.map(|v| Ok::<_, ()>(v)).compat().fuse());
peers.push(Peer {
data,
verifier,
select_chain: None,
backend: None,
block_import: Box::new(block_import),
client: PeersClient::Light(client, backend),
imported_blocks_stream,
finality_notification_stream,
network,
});
});
}
/// Polls the testnet until all nodes are in sync.
///
/// Must be executed in a task context.
fn poll_until_sync(&mut self) -> Async<()> {
self.poll();
// Return `NotReady` if there's a mismatch in the highest block number.
let mut highest = None;
for peer in self.peers().iter() {
if peer.is_major_syncing() || peer.network.num_queued_blocks() != 0 {
return Async::NotReady
}
match (highest, peer.client.info().chain.best_hash) {
(None, b) => highest = Some(b),
(Some(ref a), ref b) if a == b => {},
(Some(_), _) => return Async::NotReady,
}
}
Async::Ready(())
}
/// Blocks the current thread until we are sync'ed.
///
/// Calls `poll_until_sync` repeatidely with the runtime passed as parameter.
fn block_until_sync(&mut self, runtime: &mut tokio::runtime::current_thread::Runtime) {
runtime.block_on(futures::future::poll_fn::<(), (), _>(|| Ok(self.poll_until_sync()))).unwrap();
}
/// Polls the testnet. Processes all the pending actions and returns `NotReady`.
fn poll(&mut self) {
self.mut_peers(|peers| {
for peer in peers {
trace!(target: "sync", "-- Polling {}", peer.id());
peer.network.poll().unwrap();
trace!(target: "sync", "-- Polling complete {}", peer.id());
// We poll `imported_blocks_stream`.
while let Ok(Async::Ready(Some(notification))) = peer.imported_blocks_stream.poll() {
peer.network.on_block_imported(notification.hash, notification.header, Vec::new(), true);
}
// We poll `finality_notification_stream`, but we only take the last event.
let mut last = None;
while let Ok(Async::Ready(Some(item))) = peer.finality_notification_stream.poll() {
last = Some(item);
}
if let Some(notification) = last {
peer.network.on_block_finalized(notification.hash, notification.header);
}
}
});
}
}
pub struct TestNet {
peers: Vec<Peer<(), DummySpecialization>>,
}
impl TestNetFactory for TestNet {
type Specialization = DummySpecialization;
type Verifier = PassThroughVerifier;
type PeerData = ();
/// Create new test network with peers and given config.
fn from_config(_config: &ProtocolConfig) -> Self {
TestNet {
peers: Vec::new(),
}
}
fn make_verifier(&self, _client: PeersClient, _config: &ProtocolConfig, _peer_data: &())
-> Self::Verifier
{
PassThroughVerifier(false)
}
fn peer(&mut self, i: usize) -> &mut Peer<(), Self::Specialization> {
&mut self.peers[i]
}
fn peers(&self) -> &Vec<Peer<(), Self::Specialization>> {
&self.peers
}
fn mut_peers<F: FnOnce(&mut Vec<Peer<(), Self::Specialization>>)>(&mut self, closure: F) {
closure(&mut self.peers);
}
}
pub struct ForceFinalized(PeersClient);
impl JustificationImport<Block> for ForceFinalized {
type Error = ConsensusError;
fn import_justification(
&mut self,
hash: H256,
_number: NumberFor<Block>,
justification: Justification,
) -> Result<(), Self::Error> {
self.0.finalize_block(BlockId::Hash(hash), Some(justification), true)
.map_err(|_| ConsensusError::InvalidJustification.into())
}
}
pub struct JustificationTestNet(TestNet);
impl TestNetFactory for JustificationTestNet {
type Specialization = DummySpecialization;
type Verifier = PassThroughVerifier;
type PeerData = ();
fn from_config(config: &ProtocolConfig) -> Self {
JustificationTestNet(TestNet::from_config(config))
}
fn make_verifier(&self, client: PeersClient, config: &ProtocolConfig, peer_data: &()) -> Self::Verifier {
self.0.make_verifier(client, config, peer_data)
}
fn peer(&mut self, i: usize) -> &mut Peer<Self::PeerData, Self::Specialization> {
self.0.peer(i)
}
fn peers(&self) -> &Vec<Peer<Self::PeerData, Self::Specialization>> {
self.0.peers()
}
fn mut_peers<F: FnOnce(&mut Vec<Peer<Self::PeerData, Self::Specialization>>)>(&mut self, closure: F) {
self.0.mut_peers(closure)
}
fn make_block_import(&self, client: PeersClient)
-> (
BoxBlockImport<Block>,
Option<BoxJustificationImport<Block>>,
Option<BoxFinalityProofImport<Block>>,
Option<BoxFinalityProofRequestBuilder<Block>>,
Self::PeerData,
)
{
(client.as_block_import(), Some(Box::new(ForceFinalized(client))), None, None, Default::default())
}
}
+662
View File
@@ -0,0 +1,662 @@
// Copyright 2017-2019 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
use sc_network::config::Roles;
use consensus::BlockOrigin;
use futures03::TryFutureExt as _;
use std::time::Duration;
use tokio::runtime::current_thread;
use super::*;
fn test_ancestor_search_when_common_is(n: usize) {
let _ = ::env_logger::try_init();
let mut runtime = current_thread::Runtime::new().unwrap();
let mut net = TestNet::new(3);
net.peer(0).push_blocks(n, false);
net.peer(1).push_blocks(n, false);
net.peer(2).push_blocks(n, false);
net.peer(0).push_blocks(10, true);
net.peer(1).push_blocks(100, false);
net.peer(2).push_blocks(100, false);
net.block_until_sync(&mut runtime);
let peer1 = &net.peers()[1];
assert!(net.peers()[0].blockchain_canon_equals(peer1));
}
#[test]
fn sync_peers_works() {
let _ = ::env_logger::try_init();
let mut runtime = current_thread::Runtime::new().unwrap();
let mut net = TestNet::new(3);
runtime.block_on(futures::future::poll_fn::<(), (), _>(|| -> Result<_, ()> {
net.poll();
for peer in 0..3 {
if net.peer(peer).num_peers() != 2 {
return Ok(Async::NotReady)
}
}
Ok(Async::Ready(()))
})).unwrap();
}
#[test]
fn sync_cycle_from_offline_to_syncing_to_offline() {
let _ = ::env_logger::try_init();
let mut runtime = current_thread::Runtime::new().unwrap();
let mut net = TestNet::new(3);
for peer in 0..3 {
// Offline, and not major syncing.
assert!(net.peer(peer).is_offline());
assert!(!net.peer(peer).is_major_syncing());
}
// Generate blocks.
net.peer(2).push_blocks(100, false);
// Block until all nodes are online and nodes 0 and 1 and major syncing.
runtime.block_on(futures::future::poll_fn::<(), (), _>(|| -> Result<_, ()> {
net.poll();
for peer in 0..3 {
// Online
if net.peer(peer).is_offline() {
return Ok(Async::NotReady)
}
if peer < 2 {
// Major syncing.
if !net.peer(peer).is_major_syncing() {
return Ok(Async::NotReady)
}
}
}
Ok(Async::Ready(()))
})).unwrap();
// Block until all nodes are done syncing.
runtime.block_on(futures::future::poll_fn::<(), (), _>(|| -> Result<_, ()> {
net.poll();
for peer in 0..3 {
if net.peer(peer).is_major_syncing() {
return Ok(Async::NotReady)
}
}
Ok(Async::Ready(()))
})).unwrap();
// Now drop nodes 1 and 2, and check that node 0 is offline.
net.peers.remove(2);
net.peers.remove(1);
runtime.block_on(futures::future::poll_fn::<(), (), _>(|| -> Result<_, ()> {
net.poll();
if !net.peer(0).is_offline() {
Ok(Async::NotReady)
} else {
Ok(Async::Ready(()))
}
})).unwrap();
}
#[test]
fn syncing_node_not_major_syncing_when_disconnected() {
let _ = ::env_logger::try_init();
let mut runtime = current_thread::Runtime::new().unwrap();
let mut net = TestNet::new(3);
// Generate blocks.
net.peer(2).push_blocks(100, false);
// Check that we're not major syncing when disconnected.
assert!(!net.peer(1).is_major_syncing());
// Check that we switch to major syncing.
runtime.block_on(futures::future::poll_fn::<(), (), _>(|| -> Result<_, ()> {
net.poll();
if !net.peer(1).is_major_syncing() {
Ok(Async::NotReady)
} else {
Ok(Async::Ready(()))
}
})).unwrap();
// Destroy two nodes, and check that we switch to non-major syncing.
net.peers.remove(2);
net.peers.remove(0);
runtime.block_on(futures::future::poll_fn::<(), (), _>(|| -> Result<_, ()> {
net.poll();
if net.peer(0).is_major_syncing() {
Ok(Async::NotReady)
} else {
Ok(Async::Ready(()))
}
})).unwrap();
}
#[test]
fn sync_from_two_peers_works() {
let _ = ::env_logger::try_init();
let mut runtime = current_thread::Runtime::new().unwrap();
let mut net = TestNet::new(3);
net.peer(1).push_blocks(100, false);
net.peer(2).push_blocks(100, false);
net.block_until_sync(&mut runtime);
let peer1 = &net.peers()[1];
assert!(net.peers()[0].blockchain_canon_equals(peer1));
assert!(!net.peer(0).is_major_syncing());
}
#[test]
fn sync_from_two_peers_with_ancestry_search_works() {
let _ = ::env_logger::try_init();
let mut runtime = current_thread::Runtime::new().unwrap();
let mut net = TestNet::new(3);
net.peer(0).push_blocks(10, true);
net.peer(1).push_blocks(100, false);
net.peer(2).push_blocks(100, false);
net.block_until_sync(&mut runtime);
let peer1 = &net.peers()[1];
assert!(net.peers()[0].blockchain_canon_equals(peer1));
}
#[test]
fn ancestry_search_works_when_backoff_is_one() {
let _ = ::env_logger::try_init();
let mut runtime = current_thread::Runtime::new().unwrap();
let mut net = TestNet::new(3);
net.peer(0).push_blocks(1, false);
net.peer(1).push_blocks(2, false);
net.peer(2).push_blocks(2, false);
net.block_until_sync(&mut runtime);
let peer1 = &net.peers()[1];
assert!(net.peers()[0].blockchain_canon_equals(peer1));
}
#[test]
fn ancestry_search_works_when_ancestor_is_genesis() {
let _ = ::env_logger::try_init();
let mut runtime = current_thread::Runtime::new().unwrap();
let mut net = TestNet::new(3);
net.peer(0).push_blocks(13, true);
net.peer(1).push_blocks(100, false);
net.peer(2).push_blocks(100, false);
net.block_until_sync(&mut runtime);
let peer1 = &net.peers()[1];
assert!(net.peers()[0].blockchain_canon_equals(peer1));
}
#[test]
fn ancestry_search_works_when_common_is_one() {
test_ancestor_search_when_common_is(1);
}
#[test]
fn ancestry_search_works_when_common_is_two() {
test_ancestor_search_when_common_is(2);
}
#[test]
fn ancestry_search_works_when_common_is_hundred() {
test_ancestor_search_when_common_is(100);
}
#[test]
fn sync_long_chain_works() {
let _ = ::env_logger::try_init();
let mut runtime = current_thread::Runtime::new().unwrap();
let mut net = TestNet::new(2);
net.peer(1).push_blocks(500, false);
net.block_until_sync(&mut runtime);
let peer1 = &net.peers()[1];
assert!(net.peers()[0].blockchain_canon_equals(peer1));
}
#[test]
fn sync_no_common_longer_chain_fails() {
let _ = ::env_logger::try_init();
let mut runtime = current_thread::Runtime::new().unwrap();
let mut net = TestNet::new(3);
net.peer(0).push_blocks(20, true);
net.peer(1).push_blocks(20, false);
runtime.block_on(futures::future::poll_fn::<(), (), _>(|| -> Result<_, ()> {
net.poll();
if net.peer(0).is_major_syncing() {
Ok(Async::NotReady)
} else {
Ok(Async::Ready(()))
}
})).unwrap();
let peer1 = &net.peers()[1];
assert!(!net.peers()[0].blockchain_canon_equals(peer1));
}
#[test]
fn sync_justifications() {
let _ = ::env_logger::try_init();
let mut runtime = current_thread::Runtime::new().unwrap();
let mut net = JustificationTestNet::new(3);
net.peer(0).push_blocks(20, false);
net.block_until_sync(&mut runtime);
// there's currently no justification for block #10
assert_eq!(net.peer(0).client().justification(&BlockId::Number(10)).unwrap(), None);
assert_eq!(net.peer(1).client().justification(&BlockId::Number(10)).unwrap(), None);
// we finalize block #10, #15 and #20 for peer 0 with a justification
net.peer(0).client().finalize_block(BlockId::Number(10), Some(Vec::new()), true).unwrap();
net.peer(0).client().finalize_block(BlockId::Number(15), Some(Vec::new()), true).unwrap();
net.peer(0).client().finalize_block(BlockId::Number(20), Some(Vec::new()), true).unwrap();
let h1 = net.peer(1).client().header(&BlockId::Number(10)).unwrap().unwrap();
let h2 = net.peer(1).client().header(&BlockId::Number(15)).unwrap().unwrap();
let h3 = net.peer(1).client().header(&BlockId::Number(20)).unwrap().unwrap();
// peer 1 should get the justifications from the network
net.peer(1).request_justification(&h1.hash().into(), 10);
net.peer(1).request_justification(&h2.hash().into(), 15);
net.peer(1).request_justification(&h3.hash().into(), 20);
runtime.block_on(futures::future::poll_fn::<(), (), _>(|| {
net.poll();
for height in (10..21).step_by(5) {
if net.peer(0).client().justification(&BlockId::Number(height)).unwrap() != Some(Vec::new()) {
return Ok(Async::NotReady);
}
if net.peer(1).client().justification(&BlockId::Number(height)).unwrap() != Some(Vec::new()) {
return Ok(Async::NotReady);
}
}
Ok(Async::Ready(()))
})).unwrap();
}
#[test]
fn sync_justifications_across_forks() {
let _ = ::env_logger::try_init();
let mut runtime = current_thread::Runtime::new().unwrap();
let mut net = JustificationTestNet::new(3);
// we push 5 blocks
net.peer(0).push_blocks(5, false);
// and then two forks 5 and 6 blocks long
let f1_best = net.peer(0).push_blocks_at(BlockId::Number(5), 5, false);
let f2_best = net.peer(0).push_blocks_at(BlockId::Number(5), 6, false);
// peer 1 will only see the longer fork. but we'll request justifications
// for both and finalize the small fork instead.
net.block_until_sync(&mut runtime);
net.peer(0).client().finalize_block(BlockId::Hash(f1_best), Some(Vec::new()), true).unwrap();
net.peer(1).request_justification(&f1_best, 10);
net.peer(1).request_justification(&f2_best, 11);
runtime.block_on(futures::future::poll_fn::<(), (), _>(|| {
net.poll();
if net.peer(0).client().justification(&BlockId::Number(10)).unwrap() == Some(Vec::new()) &&
net.peer(1).client().justification(&BlockId::Number(10)).unwrap() == Some(Vec::new())
{
Ok(Async::Ready(()))
} else {
Ok(Async::NotReady)
}
})).unwrap();
}
#[test]
fn sync_after_fork_works() {
let _ = ::env_logger::try_init();
let mut runtime = current_thread::Runtime::new().unwrap();
let mut net = TestNet::new(3);
net.peer(0).push_blocks(30, false);
net.peer(1).push_blocks(30, false);
net.peer(2).push_blocks(30, false);
net.peer(0).push_blocks(10, true);
net.peer(1).push_blocks(20, false);
net.peer(2).push_blocks(20, false);
net.peer(1).push_blocks(10, true);
net.peer(2).push_blocks(1, false);
// peer 1 has the best chain
net.block_until_sync(&mut runtime);
let peer1 = &net.peers()[1];
assert!(net.peers()[0].blockchain_canon_equals(peer1));
(net.peers()[1].blockchain_canon_equals(peer1));
(net.peers()[2].blockchain_canon_equals(peer1));
}
#[test]
fn syncs_all_forks() {
let _ = ::env_logger::try_init();
let mut runtime = current_thread::Runtime::new().unwrap();
let mut net = TestNet::new(4);
net.peer(0).push_blocks(2, false);
net.peer(1).push_blocks(2, false);
net.peer(0).push_blocks(2, true);
net.peer(1).push_blocks(4, false);
net.block_until_sync(&mut runtime);
// Check that all peers have all of the blocks.
assert_eq!(9, net.peer(0).blocks_count());
assert_eq!(9, net.peer(1).blocks_count());
}
#[test]
fn own_blocks_are_announced() {
let _ = ::env_logger::try_init();
let mut runtime = current_thread::Runtime::new().unwrap();
let mut net = TestNet::new(3);
net.block_until_sync(&mut runtime); // connect'em
net.peer(0).generate_blocks(1, BlockOrigin::Own, |builder| builder.bake().unwrap());
net.block_until_sync(&mut runtime);
assert_eq!(net.peer(0).client.info().chain.best_number, 1);
assert_eq!(net.peer(1).client.info().chain.best_number, 1);
let peer0 = &net.peers()[0];
assert!(net.peers()[1].blockchain_canon_equals(peer0));
(net.peers()[2].blockchain_canon_equals(peer0));
}
#[test]
fn blocks_are_not_announced_by_light_nodes() {
let _ = ::env_logger::try_init();
let mut runtime = current_thread::Runtime::new().unwrap();
let mut net = TestNet::new(0);
// full peer0 is connected to light peer
// light peer1 is connected to full peer2
let mut light_config = ProtocolConfig::default();
light_config.roles = Roles::LIGHT;
net.add_full_peer(&ProtocolConfig::default());
net.add_light_peer(&light_config);
// Sync between 0 and 1.
net.peer(0).push_blocks(1, false);
assert_eq!(net.peer(0).client.info().chain.best_number, 1);
net.block_until_sync(&mut runtime);
assert_eq!(net.peer(1).client.info().chain.best_number, 1);
// Add another node and remove node 0.
net.add_full_peer(&ProtocolConfig::default());
net.peers.remove(0);
// Poll for a few seconds and make sure 1 and 2 (now 0 and 1) don't sync together.
let mut delay = futures_timer::Delay::new(Duration::from_secs(5)).compat();
runtime.block_on(futures::future::poll_fn::<(), (), _>(|| {
net.poll();
delay.poll().map_err(|_| ())
})).unwrap();
assert_eq!(net.peer(1).client.info().chain.best_number, 0);
}
#[test]
fn can_sync_small_non_best_forks() {
let _ = ::env_logger::try_init();
let mut runtime = current_thread::Runtime::new().unwrap();
let mut net = TestNet::new(2);
net.peer(0).push_blocks(30, false);
net.peer(1).push_blocks(30, false);
// small fork + reorg on peer 1.
net.peer(0).push_blocks_at(BlockId::Number(30), 2, true);
let small_hash = net.peer(0).client().info().chain.best_hash;
net.peer(0).push_blocks_at(BlockId::Number(30), 10, false);
assert_eq!(net.peer(0).client().info().chain.best_number, 40);
// peer 1 only ever had the long fork.
net.peer(1).push_blocks(10, false);
assert_eq!(net.peer(1).client().info().chain.best_number, 40);
assert!(net.peer(0).client().header(&BlockId::Hash(small_hash)).unwrap().is_some());
assert!(net.peer(1).client().header(&BlockId::Hash(small_hash)).unwrap().is_none());
// poll until the two nodes connect, otherwise announcing the block will not work
runtime.block_on(futures::future::poll_fn::<(), (), _>(|| -> Result<_, ()> {
net.poll();
if net.peer(0).num_peers() == 0 {
Ok(Async::NotReady)
} else {
Ok(Async::Ready(()))
}
})).unwrap();
// synchronization: 0 synced to longer chain and 1 didn't sync to small chain.
assert_eq!(net.peer(0).client().info().chain.best_number, 40);
assert!(net.peer(0).client().header(&BlockId::Hash(small_hash)).unwrap().is_some());
assert!(!net.peer(1).client().header(&BlockId::Hash(small_hash)).unwrap().is_some());
net.peer(0).announce_block(small_hash, Vec::new());
// after announcing, peer 1 downloads the block.
runtime.block_on(futures::future::poll_fn::<(), (), _>(|| -> Result<_, ()> {
net.poll();
assert!(net.peer(0).client().header(&BlockId::Hash(small_hash)).unwrap().is_some());
if net.peer(1).client().header(&BlockId::Hash(small_hash)).unwrap().is_none() {
return Ok(Async::NotReady)
}
Ok(Async::Ready(()))
})).unwrap();
net.block_until_sync(&mut runtime);
let another_fork = net.peer(0).push_blocks_at(BlockId::Number(35), 2, true);
net.peer(0).announce_block(another_fork, Vec::new());
runtime.block_on(futures::future::poll_fn::<(), (), _>(|| -> Result<_, ()> {
net.poll();
if net.peer(1).client().header(&BlockId::Hash(another_fork)).unwrap().is_none() {
return Ok(Async::NotReady)
}
Ok(Async::Ready(()))
})).unwrap();
}
#[test]
fn can_not_sync_from_light_peer() {
let _ = ::env_logger::try_init();
let mut runtime = current_thread::Runtime::new().unwrap();
// given the network with 1 full nodes (#0) and 1 light node (#1)
let mut net = TestNet::new(1);
net.add_light_peer(&Default::default());
// generate some blocks on #0
net.peer(0).push_blocks(1, false);
// and let the light client sync from this node
net.block_until_sync(&mut runtime);
// ensure #0 && #1 have the same best block
let full0_info = net.peer(0).client.info().chain;
let light_info = net.peer(1).client.info().chain;
assert_eq!(full0_info.best_number, 1);
assert_eq!(light_info.best_number, 1);
assert_eq!(light_info.best_hash, full0_info.best_hash);
// add new full client (#2) && remove #0
net.add_full_peer(&Default::default());
net.peers.remove(0);
// ensure that the #2 (now #1) fails to sync block #1 even after 5 seconds
let mut test_finished = futures_timer::Delay::new(Duration::from_secs(5)).compat();
runtime.block_on(futures::future::poll_fn::<(), (), _>(|| -> Result<_, ()> {
net.poll();
test_finished.poll().map_err(|_| ())
})).unwrap();
}
#[test]
fn light_peer_imports_header_from_announce() {
let _ = ::env_logger::try_init();
let mut runtime = current_thread::Runtime::new().unwrap();
fn import_with_announce(net: &mut TestNet, runtime: &mut current_thread::Runtime, hash: H256) {
net.peer(0).announce_block(hash, Vec::new());
runtime.block_on(futures::future::poll_fn::<(), (), _>(|| {
net.poll();
if net.peer(1).client().header(&BlockId::Hash(hash)).unwrap().is_some() {
Ok(Async::Ready(()))
} else {
Ok(Async::NotReady)
}
})).unwrap();
}
// given the network with 1 full nodes (#0) and 1 light node (#1)
let mut net = TestNet::new(1);
net.add_light_peer(&Default::default());
// let them connect to each other
net.block_until_sync(&mut runtime);
// check that NEW block is imported from announce message
let new_hash = net.peer(0).push_blocks(1, false);
import_with_announce(&mut net, &mut runtime, new_hash);
// check that KNOWN STALE block is imported from announce message
let known_stale_hash = net.peer(0).push_blocks_at(BlockId::Number(0), 1, true);
import_with_announce(&mut net, &mut runtime, known_stale_hash);
}
#[test]
fn can_sync_explicit_forks() {
let _ = ::env_logger::try_init();
let mut runtime = current_thread::Runtime::new().unwrap();
let mut net = TestNet::new(2);
net.peer(0).push_blocks(30, false);
net.peer(1).push_blocks(30, false);
// small fork + reorg on peer 1.
net.peer(0).push_blocks_at(BlockId::Number(30), 2, true);
let small_hash = net.peer(0).client().info().chain.best_hash;
let small_number = net.peer(0).client().info().chain.best_number;
net.peer(0).push_blocks_at(BlockId::Number(30), 10, false);
assert_eq!(net.peer(0).client().info().chain.best_number, 40);
// peer 1 only ever had the long fork.
net.peer(1).push_blocks(10, false);
assert_eq!(net.peer(1).client().info().chain.best_number, 40);
assert!(net.peer(0).client().header(&BlockId::Hash(small_hash)).unwrap().is_some());
assert!(net.peer(1).client().header(&BlockId::Hash(small_hash)).unwrap().is_none());
// poll until the two nodes connect, otherwise announcing the block will not work
runtime.block_on(futures::future::poll_fn::<(), (), _>(|| -> Result<_, ()> {
net.poll();
if net.peer(0).num_peers() == 0 || net.peer(1).num_peers() == 0 {
Ok(Async::NotReady)
} else {
Ok(Async::Ready(()))
}
})).unwrap();
// synchronization: 0 synced to longer chain and 1 didn't sync to small chain.
assert_eq!(net.peer(0).client().info().chain.best_number, 40);
assert!(net.peer(0).client().header(&BlockId::Hash(small_hash)).unwrap().is_some());
assert!(!net.peer(1).client().header(&BlockId::Hash(small_hash)).unwrap().is_some());
// request explicit sync
let first_peer_id = net.peer(0).id();
net.peer(1).set_sync_fork_request(vec![first_peer_id], small_hash, small_number);
// peer 1 downloads the block.
runtime.block_on(futures::future::poll_fn::<(), (), _>(|| -> Result<_, ()> {
net.poll();
assert!(net.peer(0).client().header(&BlockId::Hash(small_hash)).unwrap().is_some());
if net.peer(1).client().header(&BlockId::Hash(small_hash)).unwrap().is_none() {
return Ok(Async::NotReady)
}
Ok(Async::Ready(()))
})).unwrap();
}
#[test]
fn syncs_header_only_forks() {
let _ = ::env_logger::try_init();
let mut runtime = current_thread::Runtime::new().unwrap();
let mut net = TestNet::new(0);
let config = ProtocolConfig::default();
net.add_full_peer_with_states(&config, None);
net.add_full_peer_with_states(&config, Some(3));
net.peer(0).push_blocks(2, false);
net.peer(1).push_blocks(2, false);
net.peer(0).push_blocks(2, true);
let small_hash = net.peer(0).client().info().chain.best_hash;
let small_number = net.peer(0).client().info().chain.best_number;
net.peer(1).push_blocks(4, false);
net.block_until_sync(&mut runtime);
// Peer 1 will sync the small fork even though common block state is missing
assert_eq!(9, net.peer(0).blocks_count());
assert_eq!(9, net.peer(1).blocks_count());
// Request explicit header-only sync request for the ancient fork.
let first_peer_id = net.peer(0).id();
net.peer(1).set_sync_fork_request(vec![first_peer_id], small_hash, small_number);
runtime.block_on(futures::future::poll_fn::<(), (), _>(|| -> Result<_, ()> {
net.poll();
if net.peer(1).client().header(&BlockId::Hash(small_hash)).unwrap().is_none() {
return Ok(Async::NotReady)
}
Ok(Async::Ready(()))
})).unwrap();
}
#[test]
fn does_not_sync_announced_old_best_block() {
let _ = ::env_logger::try_init();
let mut runtime = current_thread::Runtime::new().unwrap();
let mut net = TestNet::new(3);
let old_hash = net.peer(0).push_blocks(1, false);
let old_hash_with_parent = net.peer(0).push_blocks(1, false);
net.peer(0).push_blocks(18, true);
net.peer(1).push_blocks(20, true);
net.peer(0).announce_block(old_hash, Vec::new());
runtime.block_on(futures::future::poll_fn::<(), (), _>(|| -> Result<_, ()> {
// poll once to import announcement
net.poll();
Ok(Async::Ready(()))
})).unwrap();
assert!(!net.peer(1).is_major_syncing());
net.peer(0).announce_block(old_hash_with_parent, Vec::new());
runtime.block_on(futures::future::poll_fn::<(), (), _>(|| -> Result<_, ()> {
// poll once to import announcement
net.poll();
Ok(Async::Ready(()))
})).unwrap();
assert!(!net.peer(1).is_major_syncing());
}