// This file is part of Substrate. // Copyright (C) 2017-2022 Parity Technologies (UK) Ltd. // SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // This program is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // You should have received a copy of the GNU General Public License // along with this program. If not, see . #![allow(missing_docs)] #[cfg(test)] mod block_import; #[cfg(test)] mod sync; use std::{ collections::HashMap, marker::PhantomData, pin::Pin, sync::Arc, task::{Context as FutureContext, Poll}, time::Duration, }; use async_std::future::timeout; use futures::{future::BoxFuture, prelude::*}; use libp2p::{build_multiaddr, PeerId}; use log::trace; use parking_lot::Mutex; use sc_block_builder::{BlockBuilder, BlockBuilderProvider}; use sc_client_api::{ backend::{AuxStore, Backend, Finalizer, TransactionFor}, BlockBackend, BlockImportNotification, BlockchainEvents, FinalityNotification, FinalityNotifications, ImportNotifications, }; use sc_consensus::{ BasicQueue, BlockCheckParams, BlockImport, BlockImportParams, BoxJustificationImport, ForkChoiceStrategy, ImportResult, JustificationImport, JustificationSyncLink, LongestChain, Verifier, }; use sc_network::{ config::{NetworkConfiguration, RequestResponseConfig, Role, SyncMode}, Multiaddr, NetworkService, NetworkWorker, }; use sc_network_common::{ config::{ MultiaddrWithPeerId, NonDefaultSetConfig, NonReservedPeerMode, ProtocolId, TransportConfig, }, protocol::{role::Roles, ProtocolName}, service::{NetworkBlock, NetworkStateInfo, NetworkSyncForkRequest}, sync::warp::{AuthorityList, EncodedProof, SetId, VerificationResult, WarpSyncProvider}, }; use sc_network_light::light_client_requests::handler::LightClientRequestHandler; use sc_network_sync::{ block_request_handler::BlockRequestHandler, service::network::NetworkServiceProvider, state_request_handler::StateRequestHandler, warp_request_handler, ChainSync, }; use sc_service::client::Client; use sp_blockchain::{ well_known_cache_keys::{self, Id as CacheKeyId}, Backend as BlockchainBackend, HeaderBackend, Info as BlockchainInfo, Result as ClientResult, }; use sp_consensus::{ block_validation::{BlockAnnounceValidator, DefaultBlockAnnounceValidator}, BlockOrigin, Error as ConsensusError, SyncOracle, }; use sp_core::H256; use sp_runtime::{ codec::{Decode, Encode}, generic::{BlockId, OpaqueDigestItemId}, traits::{Block as BlockT, Header as HeaderT, NumberFor, Zero}, Justification, Justifications, }; use substrate_test_runtime_client::AccountKeyring; pub use substrate_test_runtime_client::{ runtime::{Block, Extrinsic, Hash, Transfer}, TestClient, TestClientBuilder, TestClientBuilderExt, }; type AuthorityId = sp_consensus_babe::AuthorityId; /// A Verifier that accepts all blocks and passes them on with the configured /// finality to be imported. #[derive(Clone)] pub struct PassThroughVerifier { finalized: bool, } impl PassThroughVerifier { /// Create a new instance. /// /// Every verified block will use `finalized` for the `BlockImportParams`. pub fn new(finalized: bool) -> Self { Self { finalized } } } /// This `Verifier` accepts all data as valid. #[async_trait::async_trait] impl Verifier for PassThroughVerifier { async fn verify( &mut self, mut block: BlockImportParams, ) -> Result<(BlockImportParams, Option)>>), String> { let maybe_keys = block .header .digest() .log(|l| { l.try_as_raw(OpaqueDigestItemId::Consensus(b"aura")) .or_else(|| l.try_as_raw(OpaqueDigestItemId::Consensus(b"babe"))) }) .map(|blob| vec![(well_known_cache_keys::AUTHORITIES, blob.to_vec())]); if block.fork_choice.is_none() { block.fork_choice = Some(ForkChoiceStrategy::LongestChain); }; block.finalized = self.finalized; Ok((block, maybe_keys)) } } pub type PeersFullClient = Client< substrate_test_runtime_client::Backend, substrate_test_runtime_client::ExecutorDispatch, Block, substrate_test_runtime_client::runtime::RuntimeApi, >; #[derive(Clone)] pub struct PeersClient { client: Arc, backend: Arc, } impl PeersClient { pub fn as_client(&self) -> Arc { self.client.clone() } pub fn as_backend(&self) -> Arc { self.backend.clone() } pub fn as_block_import(&self) -> BlockImportAdapter { BlockImportAdapter::new(self.clone()) } pub fn get_aux(&self, key: &[u8]) -> ClientResult>> { self.client.get_aux(key) } pub fn info(&self) -> BlockchainInfo { self.client.info() } pub fn header( &self, block: &BlockId, ) -> ClientResult::Header>> { self.client.header(block) } pub fn has_state_at(&self, block: &BlockId) -> bool { let header = match self.header(block).unwrap() { Some(header) => header, None => return false, }; self.backend.have_state_at(&header.hash(), *header.number()) } pub fn justifications(&self, block: &BlockId) -> ClientResult> { self.client.justifications(block) } pub fn finality_notification_stream(&self) -> FinalityNotifications { self.client.finality_notification_stream() } pub fn import_notification_stream(&self) -> ImportNotifications { self.client.import_notification_stream() } pub fn finalize_block( &self, hash: &::Hash, justification: Option, notify: bool, ) -> ClientResult<()> { self.client.finalize_block(hash, justification, notify) } } #[async_trait::async_trait] impl BlockImport for PeersClient { type Error = ConsensusError; type Transaction = (); async fn check_block( &mut self, block: BlockCheckParams, ) -> Result { self.client.check_block(block).await } async fn import_block( &mut self, block: BlockImportParams, cache: HashMap>, ) -> Result { self.client.import_block(block.clear_storage_changes_and_mutate(), cache).await } } pub struct Peer { pub data: D, client: PeersClient, /// We keep a copy of the verifier so that we can invoke it for locally-generated blocks, /// instead of going through the import queue. verifier: VerifierAdapter, /// We keep a copy of the block_import so that we can invoke it for locally-generated blocks, /// instead of going through the import queue. block_import: BlockImportAdapter, select_chain: Option>, backend: Option>, network: NetworkWorker::Hash, PeersFullClient>, imported_blocks_stream: Pin> + Send>>, finality_notification_stream: Pin> + Send>>, listen_addr: Multiaddr, } impl Peer where B: BlockImport + Send + Sync, B::Transaction: Send, { /// Get this peer ID. pub fn id(&self) -> PeerId { self.network.service().local_peer_id() } /// Returns true if we're major syncing. pub fn is_major_syncing(&self) -> bool { self.network.service().is_major_syncing() } // Returns a clone of the local SelectChain, only available on full nodes pub fn select_chain( &self, ) -> Option> { self.select_chain.clone() } /// Returns the number of peers we're connected to. pub fn num_peers(&self) -> usize { self.network.num_connected_peers() } /// Returns the number of downloaded blocks. pub fn num_downloaded_blocks(&self) -> usize { self.network.num_downloaded_blocks() } /// Returns true if we have no peer. pub fn is_offline(&self) -> bool { self.num_peers() == 0 } /// Request a justification for the given block. pub fn request_justification(&self, hash: &::Hash, number: NumberFor) { self.network.service().request_justification(hash, number); } /// Announces an important block on the network. pub fn announce_block(&self, hash: ::Hash, data: Option>) { self.network.service().announce_block(hash, data); } /// Request explicit fork sync. pub fn set_sync_fork_request( &self, peers: Vec, hash: ::Hash, number: NumberFor, ) { self.network.service().set_sync_fork_request(peers, hash, number); } /// Add blocks to the peer -- edit the block before adding pub fn generate_blocks(&mut self, count: usize, origin: BlockOrigin, edit_block: F) -> H256 where F: FnMut( BlockBuilder, ) -> Block, { let best_hash = self.client.info().best_hash; self.generate_blocks_at( BlockId::Hash(best_hash), count, origin, edit_block, false, true, true, ForkChoiceStrategy::LongestChain, ) } /// Add blocks to the peer -- edit the block before adding and use custom fork choice rule. pub fn generate_blocks_with_fork_choice( &mut self, count: usize, origin: BlockOrigin, edit_block: F, fork_choice: ForkChoiceStrategy, ) -> H256 where F: FnMut( BlockBuilder, ) -> Block, { let best_hash = self.client.info().best_hash; self.generate_blocks_at( BlockId::Hash(best_hash), count, origin, edit_block, false, true, true, fork_choice, ) } /// Add blocks to the peer -- edit the block before adding. The chain will /// start at the given block iD. pub fn generate_blocks_at( &mut self, at: BlockId, count: usize, origin: BlockOrigin, mut edit_block: F, headers_only: bool, inform_sync_about_new_best_block: bool, announce_block: bool, fork_choice: ForkChoiceStrategy, ) -> H256 where F: FnMut( BlockBuilder, ) -> Block, { let full_client = self.client.as_client(); let mut at = full_client.header(&at).unwrap().unwrap().hash(); for _ in 0..count { let builder = full_client.new_block_at(&BlockId::Hash(at), Default::default(), false).unwrap(); let block = edit_block(builder); let hash = block.header.hash(); trace!( target: "test_network", "Generating {}, (#{}, parent={})", hash, block.header.number, block.header.parent_hash, ); let header = block.header.clone(); let mut import_block = BlockImportParams::new(origin, header.clone()); import_block.body = if headers_only { None } else { Some(block.extrinsics) }; import_block.fork_choice = Some(fork_choice); let (import_block, cache) = futures::executor::block_on(self.verifier.verify(import_block)).unwrap(); let cache = if let Some(cache) = cache { cache.into_iter().collect() } else { Default::default() }; futures::executor::block_on(self.block_import.import_block(import_block, cache)) .expect("block_import failed"); if announce_block { self.network.service().announce_block(hash, None); } at = hash; } if inform_sync_about_new_best_block { self.network.new_best_block_imported( at, *full_client.header(&BlockId::Hash(at)).ok().flatten().unwrap().number(), ); } at } /// Push blocks to the peer (simplified: with or without a TX) pub fn push_blocks(&mut self, count: usize, with_tx: bool) -> H256 { let best_hash = self.client.info().best_hash; self.push_blocks_at(BlockId::Hash(best_hash), count, with_tx) } /// Push blocks to the peer (simplified: with or without a TX) pub fn push_headers(&mut self, count: usize) -> H256 { let best_hash = self.client.info().best_hash; self.generate_tx_blocks_at(BlockId::Hash(best_hash), count, false, true, true, true) } /// Push blocks to the peer (simplified: with or without a TX) starting from /// given hash. pub fn push_blocks_at(&mut self, at: BlockId, count: usize, with_tx: bool) -> H256 { self.generate_tx_blocks_at(at, count, with_tx, false, true, true) } /// Push blocks to the peer (simplified: with or without a TX) starting from /// given hash without informing the sync protocol about the new best block. pub fn push_blocks_at_without_informing_sync( &mut self, at: BlockId, count: usize, with_tx: bool, ) -> H256 { self.generate_tx_blocks_at(at, count, with_tx, false, false, true) } /// Push blocks to the peer (simplified: with or without a TX) starting from /// given hash without announcing the block. pub fn push_blocks_at_without_announcing( &mut self, at: BlockId, count: usize, with_tx: bool, ) -> H256 { self.generate_tx_blocks_at(at, count, with_tx, false, true, false) } /// Push blocks/headers to the peer (simplified: with or without a TX) starting from /// given hash. fn generate_tx_blocks_at( &mut self, at: BlockId, count: usize, with_tx: bool, headers_only: bool, inform_sync_about_new_best_block: bool, announce_block: bool, ) -> H256 { let mut nonce = 0; if with_tx { self.generate_blocks_at( at, count, BlockOrigin::File, |mut builder| { let transfer = Transfer { from: AccountKeyring::Alice.into(), to: AccountKeyring::Alice.into(), amount: 1, nonce, }; builder.push(transfer.into_signed_tx()).unwrap(); nonce += 1; builder.build().unwrap().block }, headers_only, inform_sync_about_new_best_block, announce_block, ForkChoiceStrategy::LongestChain, ) } else { self.generate_blocks_at( at, count, BlockOrigin::File, |builder| builder.build().unwrap().block, headers_only, inform_sync_about_new_best_block, announce_block, ForkChoiceStrategy::LongestChain, ) } } pub fn push_authorities_change_block(&mut self, new_authorities: Vec) -> H256 { self.generate_blocks(1, BlockOrigin::File, |mut builder| { builder.push(Extrinsic::AuthoritiesChange(new_authorities.clone())).unwrap(); builder.build().unwrap().block }) } /// Get a reference to the client. pub fn client(&self) -> &PeersClient { &self.client } /// Get a reference to the network service. pub fn network_service(&self) -> &Arc::Hash>> { self.network.service() } /// Get a reference to the network worker. pub fn network(&self) -> &NetworkWorker::Hash, PeersFullClient> { &self.network } /// Test helper to compare the blockchain state of multiple (networked) /// clients. pub fn blockchain_canon_equals(&self, other: &Self) -> bool { if let (Some(mine), Some(others)) = (self.backend.clone(), other.backend.clone()) { mine.blockchain().info().best_hash == others.blockchain().info().best_hash } else { false } } /// Count the total number of imported blocks. pub fn blocks_count(&self) -> u64 { self.backend .as_ref() .map(|backend| backend.blockchain().info().best_number) .unwrap_or(0) } /// Return a collection of block hashes that failed verification pub fn failed_verifications(&self) -> HashMap<::Hash, String> { self.verifier.failed_verifications.lock().clone() } pub fn has_block(&self, hash: &H256) -> bool { self.backend .as_ref() .map(|backend| backend.blockchain().header(BlockId::hash(*hash)).unwrap().is_some()) .unwrap_or(false) } pub fn has_body(&self, hash: &H256) -> bool { self.backend .as_ref() .map(|backend| backend.blockchain().body(BlockId::hash(*hash)).unwrap().is_some()) .unwrap_or(false) } } pub trait BlockImportAdapterFull: BlockImport< Block, Transaction = TransactionFor, Error = ConsensusError, > + Send + Sync + Clone { } impl BlockImportAdapterFull for T where T: BlockImport< Block, Transaction = TransactionFor, Error = ConsensusError, > + Send + Sync + Clone { } /// Implements `BlockImport` for any `Transaction`. Internally the transaction is /// "converted", aka the field is set to `None`. /// /// This is required as the `TestNetFactory` trait does not distinguish between /// full and light nodes. #[derive(Clone)] pub struct BlockImportAdapter { inner: I, _phantom: PhantomData, } impl BlockImportAdapter { /// Create a new instance of `Self::Full`. pub fn new(inner: I) -> Self { Self { inner, _phantom: PhantomData } } } #[async_trait::async_trait] impl BlockImport for BlockImportAdapter where I: BlockImport + Send + Sync, I::Transaction: Send, Transaction: Send + 'static, { type Error = ConsensusError; type Transaction = Transaction; async fn check_block( &mut self, block: BlockCheckParams, ) -> Result { self.inner.check_block(block).await } async fn import_block( &mut self, block: BlockImportParams, cache: HashMap>, ) -> Result { self.inner.import_block(block.clear_storage_changes_and_mutate(), cache).await } } /// Implements `Verifier` and keeps track of failed verifications. struct VerifierAdapter { verifier: Arc>>>, failed_verifications: Arc>>, } #[async_trait::async_trait] impl Verifier for VerifierAdapter { async fn verify( &mut self, block: BlockImportParams, ) -> Result<(BlockImportParams, Option)>>), String> { let hash = block.header.hash(); self.verifier.lock().await.verify(block).await.map_err(|e| { self.failed_verifications.lock().insert(hash, e.clone()); e }) } } impl Clone for VerifierAdapter { fn clone(&self) -> Self { Self { verifier: self.verifier.clone(), failed_verifications: self.failed_verifications.clone(), } } } impl VerifierAdapter { fn new(verifier: impl Verifier + 'static) -> Self { VerifierAdapter { verifier: Arc::new(futures::lock::Mutex::new(Box::new(verifier))), failed_verifications: Default::default(), } } } struct TestWarpSyncProvider(Arc>); impl WarpSyncProvider for TestWarpSyncProvider { fn generate( &self, _start: B::Hash, ) -> Result> { let info = self.0.info(); let best_header = self.0.header(BlockId::hash(info.best_hash)).unwrap().unwrap(); Ok(EncodedProof(best_header.encode())) } fn verify( &self, proof: &EncodedProof, _set_id: SetId, _authorities: AuthorityList, ) -> Result, Box> { let EncodedProof(encoded) = proof; let header = B::Header::decode(&mut encoded.as_slice()).unwrap(); Ok(VerificationResult::Complete(0, Default::default(), header)) } fn current_authorities(&self) -> AuthorityList { Default::default() } } /// Configuration for a full peer. #[derive(Default)] pub struct FullPeerConfig { /// Pruning window size. /// /// NOTE: only finalized blocks are subject for removal! pub blocks_pruning: Option, /// Block announce validator. pub block_announce_validator: Option + Send + Sync>>, /// List of notification protocols that the network must support. pub notifications_protocols: Vec, /// List of request-response protocols that the network must support. pub request_response_protocols: Vec, /// The indices of the peers the peer should be connected to. /// /// If `None`, it will be connected to all other peers. pub connect_to_peers: Option>, /// Whether the full peer should have the authority role. pub is_authority: bool, /// Syncing mode pub sync_mode: SyncMode, /// Extra genesis storage. pub extra_storage: Option, /// Enable transaction indexing. pub storage_chain: bool, } pub trait TestNetFactory: Default + Sized where >::Transaction: Send, { type Verifier: 'static + Verifier; type BlockImport: BlockImport + Clone + Send + Sync + 'static; type PeerData: Default; /// This one needs to be implemented! fn make_verifier(&self, client: PeersClient, peer_data: &Self::PeerData) -> Self::Verifier; /// Get reference to peer. fn peer(&mut self, i: usize) -> &mut Peer; fn peers(&self) -> &Vec>; fn mut_peers>)>( &mut self, closure: F, ); /// Get custom block import handle for fresh client, along with peer data. fn make_block_import( &self, client: PeersClient, ) -> ( BlockImportAdapter, Option>, Self::PeerData, ); /// Create new test network with this many peers. fn new(n: usize) -> Self { trace!(target: "test_network", "Creating test network"); let mut net = Self::default(); for i in 0..n { trace!(target: "test_network", "Adding peer {}", i); net.add_full_peer(); } net } fn add_full_peer(&mut self) { self.add_full_peer_with_config(Default::default()) } /// Add a full peer. fn add_full_peer_with_config(&mut self, config: FullPeerConfig) { let mut test_client_builder = match (config.blocks_pruning, config.storage_chain) { (Some(blocks_pruning), true) => TestClientBuilder::with_tx_storage(blocks_pruning), (None, true) => TestClientBuilder::with_tx_storage(u32::MAX), (Some(blocks_pruning), false) => TestClientBuilder::with_pruning_window(blocks_pruning), (None, false) => TestClientBuilder::with_default_backend(), }; if let Some(storage) = config.extra_storage { let genesis_extra_storage = test_client_builder.genesis_init_mut().extra_storage(); *genesis_extra_storage = storage; } if matches!(config.sync_mode, SyncMode::Fast { .. } | SyncMode::Warp) { test_client_builder = test_client_builder.set_no_genesis(); } let backend = test_client_builder.backend(); let (c, longest_chain) = test_client_builder.build_with_longest_chain(); let client = Arc::new(c); let (block_import, justification_import, data) = self .make_block_import(PeersClient { client: client.clone(), backend: backend.clone() }); let verifier = self .make_verifier(PeersClient { client: client.clone(), backend: backend.clone() }, &data); let verifier = VerifierAdapter::new(verifier); let import_queue = Box::new(BasicQueue::new( verifier.clone(), Box::new(block_import.clone()), justification_import, &sp_core::testing::TaskExecutor::new(), None, )); let listen_addr = build_multiaddr![Memory(rand::random::())]; let mut network_config = NetworkConfiguration::new("test-node", "test-client", Default::default(), None); network_config.sync_mode = config.sync_mode; network_config.transport = TransportConfig::MemoryOnly; network_config.listen_addresses = vec![listen_addr.clone()]; network_config.allow_non_globals_in_dht = true; network_config .request_response_protocols .extend(config.request_response_protocols); network_config.extra_sets = config .notifications_protocols .into_iter() .map(|p| NonDefaultSetConfig { notifications_protocol: p, fallback_names: Vec::new(), max_notification_size: 1024 * 1024, handshake: None, set_config: Default::default(), }) .collect(); if let Some(connect_to) = config.connect_to_peers { let addrs = connect_to .iter() .map(|v| { let peer_id = self.peer(*v).network_service().local_peer_id(); let multiaddr = self.peer(*v).listen_addr.clone(); MultiaddrWithPeerId { peer_id, multiaddr } }) .collect(); network_config.default_peers_set.reserved_nodes = addrs; network_config.default_peers_set.non_reserved_mode = NonReservedPeerMode::Deny; } let protocol_id = ProtocolId::from("test-protocol-name"); let fork_id = Some(String::from("test-fork-id")); let block_request_protocol_config = { let (handler, protocol_config) = BlockRequestHandler::new(&protocol_id, None, client.clone(), 50); self.spawn_task(handler.run().boxed()); protocol_config }; let state_request_protocol_config = { let (handler, protocol_config) = StateRequestHandler::new(&protocol_id, None, client.clone(), 50); self.spawn_task(handler.run().boxed()); protocol_config }; let light_client_request_protocol_config = { let (handler, protocol_config) = LightClientRequestHandler::new(&protocol_id, None, client.clone()); self.spawn_task(handler.run().boxed()); protocol_config }; let warp_sync = Arc::new(TestWarpSyncProvider(client.clone())); let warp_protocol_config = { let (handler, protocol_config) = warp_request_handler::RequestHandler::new( protocol_id.clone(), client .block_hash(0u32.into()) .ok() .flatten() .expect("Genesis block exists; qed"), None, warp_sync.clone(), ); self.spawn_task(handler.run().boxed()); protocol_config }; let block_announce_validator = config .block_announce_validator .unwrap_or_else(|| Box::new(DefaultBlockAnnounceValidator)); let (chain_sync_network_provider, chain_sync_network_handle) = NetworkServiceProvider::new(); let (chain_sync, chain_sync_service) = ChainSync::new( match network_config.sync_mode { SyncMode::Full => sc_network_common::sync::SyncMode::Full, SyncMode::Fast { skip_proofs, storage_chain_mode } => sc_network_common::sync::SyncMode::LightState { skip_proofs, storage_chain_mode, }, SyncMode::Warp => sc_network_common::sync::SyncMode::Warp, }, client.clone(), block_announce_validator, network_config.max_parallel_downloads, Some(warp_sync), chain_sync_network_handle, ) .unwrap(); let block_announce_config = chain_sync.get_block_announce_proto_config( protocol_id.clone(), &fork_id, Roles::from(if config.is_authority { &Role::Authority } else { &Role::Full }), client.info().best_number, client.info().best_hash, client .block_hash(Zero::zero()) .ok() .flatten() .expect("Genesis block exists; qed"), ); let network = NetworkWorker::new(sc_network::config::Params { role: if config.is_authority { Role::Authority } else { Role::Full }, executor: None, network_config, chain: client.clone(), protocol_id, fork_id, import_queue, chain_sync: Box::new(chain_sync), chain_sync_service, metrics_registry: None, block_announce_config, block_request_protocol_config, state_request_protocol_config, light_client_request_protocol_config, warp_sync_protocol_config: Some(warp_protocol_config), request_response_protocol_configs: Vec::new(), }) .unwrap(); trace!(target: "test_network", "Peer identifier: {}", network.service().local_peer_id()); let service = network.service().clone(); async_std::task::spawn(async move { chain_sync_network_provider.run(service).await; }); self.mut_peers(move |peers| { for peer in peers.iter_mut() { peer.network .add_known_address(network.service().local_peer_id(), listen_addr.clone()); } let imported_blocks_stream = Box::pin(client.import_notification_stream().fuse()); let finality_notification_stream = Box::pin(client.finality_notification_stream().fuse()); peers.push(Peer { data, client: PeersClient { client: client.clone(), backend: backend.clone() }, select_chain: Some(longest_chain), backend: Some(backend), imported_blocks_stream, finality_notification_stream, block_import, verifier, network, listen_addr, }); }); } /// Used to spawn background tasks, e.g. the block request protocol handler. fn spawn_task(&self, f: BoxFuture<'static, ()>) { async_std::task::spawn(f); } /// Polls the testnet until all nodes are in sync. /// /// Must be executed in a task context. fn poll_until_sync(&mut self, cx: &mut FutureContext) -> Poll<()> { self.poll(cx); // Return `NotReady` if there's a mismatch in the highest block number. let mut highest = None; for peer in self.peers().iter() { if peer.is_major_syncing() || peer.network.num_queued_blocks() != 0 { return Poll::Pending } if peer.network.num_sync_requests() != 0 { return Poll::Pending } match (highest, peer.client.info().best_hash) { (None, b) => highest = Some(b), (Some(ref a), ref b) if a == b => {}, (Some(_), _) => return Poll::Pending, } } Poll::Ready(()) } /// Polls the testnet until theres' no activiy of any kind. /// /// Must be executed in a task context. fn poll_until_idle(&mut self, cx: &mut FutureContext) -> Poll<()> { self.poll(cx); for peer in self.peers().iter() { if peer.is_major_syncing() || peer.network.num_queued_blocks() != 0 { return Poll::Pending } if peer.network.num_sync_requests() != 0 { return Poll::Pending } } Poll::Ready(()) } /// Polls the testnet until all peers are connected to each other. /// /// Must be executed in a task context. fn poll_until_connected(&mut self, cx: &mut FutureContext) -> Poll<()> { self.poll(cx); let num_peers = self.peers().len(); if self.peers().iter().all(|p| p.num_peers() == num_peers - 1) { return Poll::Ready(()) } Poll::Pending } /// Blocks the current thread until we are sync'ed. /// /// Calls `poll_until_sync` repeatedly. /// (If we've not synced within 10 mins then panic rather than hang.) fn block_until_sync(&mut self) { futures::executor::block_on(timeout( Duration::from_secs(10 * 60), futures::future::poll_fn::<(), _>(|cx| self.poll_until_sync(cx)), )) .expect("sync didn't happen within 10 mins"); } /// Blocks the current thread until there are no pending packets. /// /// Calls `poll_until_idle` repeatedly with the runtime passed as parameter. fn block_until_idle(&mut self) { futures::executor::block_on(futures::future::poll_fn::<(), _>(|cx| { self.poll_until_idle(cx) })); } /// Blocks the current thread until all peers are connected to each other. /// /// Calls `poll_until_connected` repeatedly with the runtime passed as parameter. fn block_until_connected(&mut self) { futures::executor::block_on(futures::future::poll_fn::<(), _>(|cx| { self.poll_until_connected(cx) })); } /// Polls the testnet. Processes all the pending actions. fn poll(&mut self, cx: &mut FutureContext) { self.mut_peers(|peers| { for (i, peer) in peers.iter_mut().enumerate() { trace!(target: "sync", "-- Polling {}: {}", i, peer.id()); if let Poll::Ready(()) = peer.network.poll_unpin(cx) { panic!("NetworkWorker has terminated unexpectedly.") } trace!(target: "sync", "-- Polling complete {}: {}", i, peer.id()); // We poll `imported_blocks_stream`. while let Poll::Ready(Some(notification)) = peer.imported_blocks_stream.as_mut().poll_next(cx) { peer.network.service().announce_block(notification.hash, None); } // We poll `finality_notification_stream`. while let Poll::Ready(Some(notification)) = peer.finality_notification_stream.as_mut().poll_next(cx) { peer.network.on_block_finalized(notification.hash, notification.header); } } }); } } #[derive(Default)] pub struct TestNet { peers: Vec>, } impl TestNetFactory for TestNet { type Verifier = PassThroughVerifier; type PeerData = (); type BlockImport = PeersClient; fn make_verifier(&self, _client: PeersClient, _peer_data: &()) -> Self::Verifier { PassThroughVerifier::new(false) } fn make_block_import( &self, client: PeersClient, ) -> ( BlockImportAdapter, Option>, Self::PeerData, ) { (client.as_block_import(), None, ()) } fn peer(&mut self, i: usize) -> &mut Peer<(), Self::BlockImport> { &mut self.peers[i] } fn peers(&self) -> &Vec> { &self.peers } fn mut_peers>)>(&mut self, closure: F) { closure(&mut self.peers); } } pub struct ForceFinalized(PeersClient); #[async_trait::async_trait] impl JustificationImport for ForceFinalized { type Error = ConsensusError; async fn on_start(&mut self) -> Vec<(H256, NumberFor)> { Vec::new() } async fn import_justification( &mut self, hash: H256, _number: NumberFor, justification: Justification, ) -> Result<(), Self::Error> { self.0 .finalize_block(&hash, Some(justification), true) .map_err(|_| ConsensusError::InvalidJustification) } } #[derive(Default)] pub struct JustificationTestNet(TestNet); impl TestNetFactory for JustificationTestNet { type Verifier = PassThroughVerifier; type PeerData = (); type BlockImport = PeersClient; fn make_verifier(&self, client: PeersClient, peer_data: &()) -> Self::Verifier { self.0.make_verifier(client, peer_data) } fn peer(&mut self, i: usize) -> &mut Peer { self.0.peer(i) } fn peers(&self) -> &Vec> { self.0.peers() } fn mut_peers>)>( &mut self, closure: F, ) { self.0.mut_peers(closure) } fn make_block_import( &self, client: PeersClient, ) -> ( BlockImportAdapter, Option>, Self::PeerData, ) { (client.as_block_import(), Some(Box::new(ForceFinalized(client))), Default::default()) } }