// Copyright 2017-2019 Parity Technologies (UK) Ltd. // This file is part of Substrate. // Substrate is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // Substrate is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // You should have received a copy of the GNU General Public License // along with Substrate. If not, see . //! Substrate service components. use std::{sync::Arc, ops::Deref, ops::DerefMut}; use serde::{Serialize, de::DeserializeOwned}; use crate::chain_spec::ChainSpec; use keystore::KeyStorePtr; use client_db; use client::{self, Client, runtime_api}; use crate::{error, Service}; use consensus_common::{import_queue::ImportQueue, SelectChain}; use network::{ self, OnDemand, FinalityProofProvider, NetworkStateInfo, config::BoxFinalityProofRequestBuilder }; use substrate_executor::{NativeExecutor, NativeExecutionDispatch}; use transaction_pool::txpool::{self, Options as TransactionPoolOptions, Pool as TransactionPool}; use sr_primitives::{ BuildStorage, traits::{Block as BlockT, Header as HeaderT, ProvideRuntimeApi}, generic::BlockId }; use crate::config::Configuration; use primitives::{Blake2Hasher, H256, traits::BareCryptoStorePtr}; use rpc::{self, system::SystemInfo}; use futures::{prelude::*, future::Executor}; use futures03::{FutureExt as _, channel::mpsc, compat::Compat}; // Type aliases. // These exist mainly to avoid typing `::Foo` all over the code. /// Network service type for `Components`. pub type NetworkService = network::NetworkService< ComponentBlock, <::Factory as ServiceFactory>::NetworkProtocol, ComponentExHash >; /// Code executor type for a factory. pub type CodeExecutor = NativeExecutor<::RuntimeDispatch>; /// Full client backend type for a factory. pub type FullBackend = client_db::Backend<::Block>; /// Full client executor type for a factory. pub type FullExecutor = client::LocalCallExecutor< client_db::Backend<::Block>, CodeExecutor, >; /// Light client backend type for a factory. pub type LightBackend = client::light::backend::Backend< client_db::light::LightStorage<::Block>, network::OnDemand<::Block>, Blake2Hasher, >; /// Light client executor type for a factory. pub type LightExecutor = client::light::call_executor::RemoteOrLocalCallExecutor< ::Block, client::light::backend::Backend< client_db::light::LightStorage<::Block>, network::OnDemand<::Block>, Blake2Hasher >, client::light::call_executor::RemoteCallExecutor< client::light::blockchain::Blockchain< client_db::light::LightStorage<::Block>, network::OnDemand<::Block> >, network::OnDemand<::Block>, >, client::LocalCallExecutor< client::light::backend::Backend< client_db::light::LightStorage<::Block>, network::OnDemand<::Block>, Blake2Hasher >, CodeExecutor > >; /// Full client type for a factory. pub type FullClient = Client, FullExecutor, ::Block, ::RuntimeApi>; /// Light client type for a factory. pub type LightClient = Client, LightExecutor, ::Block, ::RuntimeApi>; /// `ChainSpec` specialization for a factory. pub type FactoryChainSpec = ChainSpec<::Genesis>; /// `Genesis` specialization for a factory. pub type FactoryGenesis = ::Genesis; /// `Block` type for a factory. pub type FactoryBlock = ::Block; /// `Extrinsic` type for a factory. pub type FactoryExtrinsic = <::Block as BlockT>::Extrinsic; /// `Number` type for a factory. pub type FactoryBlockNumber = < as BlockT>::Header as HeaderT>::Number; /// Full `Configuration` type for a factory. pub type FactoryFullConfiguration = Configuration<::Configuration, FactoryGenesis>; /// Client type for `Components`. pub type ComponentClient = Client< ::Backend, ::Executor, FactoryBlock<::Factory>, ::RuntimeApi, >; /// A offchain workers storage backend type. pub type ComponentOffchainStorage = < ::Backend as client::backend::Backend, Blake2Hasher> >::OffchainStorage; /// Block type for `Components` pub type ComponentBlock = <::Factory as ServiceFactory>::Block; /// Extrinsic hash type for `Components` pub type ComponentExHash = <::TransactionPoolApi as txpool::ChainApi>::Hash; /// Extrinsic type. pub type ComponentExtrinsic = as BlockT>::Extrinsic; /// Extrinsic pool API type for `Components`. pub type PoolApi = ::TransactionPoolApi; /// A set of traits for the runtime genesis config. pub trait RuntimeGenesis: Serialize + DeserializeOwned + BuildStorage {} impl RuntimeGenesis for T {} /// A transport-agnostic handler of the RPC queries. pub type RpcHandler = rpc_servers::RpcHandler; /// Something that can create and store initial session keys from given seeds. pub trait InitialSessionKeys { /// Generate the initial session keys for the given seeds and store them in /// an internal keystore. fn generate_initial_session_keys( client: Arc>, seeds: Vec, ) -> error::Result<()>; } impl InitialSessionKeys for C where ComponentClient: ProvideRuntimeApi, as ProvideRuntimeApi>::Api: session::SessionKeys>, { fn generate_initial_session_keys( client: Arc>, seeds: Vec, ) -> error::Result<()> { session::generate_initial_session_keys(client, seeds).map_err(Into::into) } } /// Something that can start the RPC service. pub trait StartRpc { fn start_rpc( client: Arc>, system_send_back: mpsc::UnboundedSender>>, system_info: SystemInfo, task_executor: TaskExecutor, transaction_pool: Arc>, rpc_extensions: impl rpc::RpcExtension, keystore: KeyStorePtr, ) -> RpcHandler; } impl StartRpc for C where ComponentClient: ProvideRuntimeApi, as ProvideRuntimeApi>::Api: runtime_api::Metadata> + session::SessionKeys>, { fn start_rpc( client: Arc>, system_send_back: mpsc::UnboundedSender>>, rpc_system_info: SystemInfo, task_executor: TaskExecutor, transaction_pool: Arc>, rpc_extensions: impl rpc::RpcExtension, keystore: KeyStorePtr, ) -> RpcHandler { use rpc::{chain, state, author, system}; let subscriptions = rpc::Subscriptions::new(task_executor.clone()); let chain = chain::Chain::new(client.clone(), subscriptions.clone()); let state = state::State::new(client.clone(), subscriptions.clone()); let author = rpc::author::Author::new( client, transaction_pool, subscriptions, keystore, ); let system = system::System::new(rpc_system_info, system_send_back); rpc_servers::rpc_handler(( state::StateApi::to_delegate(state), chain::ChainApi::to_delegate(chain), author::AuthorApi::to_delegate(author), system::SystemApi::to_delegate(system), rpc_extensions, )) } } /// Something that can maintain transaction pool on every imported block. pub trait MaintainTransactionPool { fn maintain_transaction_pool( id: &BlockId>, client: &ComponentClient, transaction_pool: &TransactionPool, ) -> error::Result<()>; } fn maintain_transaction_pool( id: &BlockId, client: &Client, transaction_pool: &TransactionPool, ) -> error::Result<()> where Block: BlockT::Out>, Backend: client::backend::Backend, Client: ProvideRuntimeApi, as ProvideRuntimeApi>::Api: runtime_api::TaggedTransactionQueue, Executor: client::CallExecutor, PoolApi: txpool::ChainApi, { // Avoid calling into runtime if there is nothing to prune from the pool anyway. if transaction_pool.status().is_empty() { return Ok(()) } if let Some(block) = client.block(id)? { let parent_id = BlockId::hash(*block.block.header().parent_hash()); let extrinsics = block.block.extrinsics(); transaction_pool.prune(id, &parent_id, extrinsics).map_err(|e| format!("{:?}", e))?; } Ok(()) } impl MaintainTransactionPool for C where ComponentClient: ProvideRuntimeApi, as ProvideRuntimeApi>::Api: runtime_api::TaggedTransactionQueue>, { fn maintain_transaction_pool( id: &BlockId>, client: &ComponentClient, transaction_pool: &TransactionPool, ) -> error::Result<()> { maintain_transaction_pool(id, client, transaction_pool) } } pub trait OffchainWorker { fn offchain_workers( number: &FactoryBlockNumber, offchain: &offchain::OffchainWorkers< ComponentClient, ComponentOffchainStorage, ComponentBlock >, pool: &Arc>, network_state: &Arc, is_validator: bool, ) -> error::Result + Send>>; } impl OffchainWorker for C where ComponentClient: ProvideRuntimeApi, as ProvideRuntimeApi>::Api: offchain::OffchainWorkerApi>, { fn offchain_workers( number: &FactoryBlockNumber, offchain: &offchain::OffchainWorkers< ComponentClient, ComponentOffchainStorage, ComponentBlock >, pool: &Arc>, network_state: &Arc, is_validator: bool, ) -> error::Result + Send>> { let future = offchain.on_block_imported(number, pool, network_state.clone(), is_validator) .map(|()| Ok(())); Ok(Box::new(Compat::new(future))) } } /// The super trait that combines all required traits a `Service` needs to implement. pub trait ServiceTrait: Deref> + Send + 'static + StartRpc + MaintainTransactionPool + OffchainWorker + InitialSessionKeys {} impl ServiceTrait for T where T: Deref> + Send + 'static + StartRpc + MaintainTransactionPool + OffchainWorker + InitialSessionKeys {} /// Alias for a an implementation of `futures::future::Executor`. pub type TaskExecutor = Arc + Send>> + Send + Sync>; /// A collection of types and methods to build a service on top of the substrate service. pub trait ServiceFactory: 'static + Sized { /// Block type. type Block: BlockT; /// The type that implements the runtime API. type RuntimeApi: Send + Sync; /// Network protocol extensions. type NetworkProtocol: network::specialization::NetworkSpecialization; /// Chain runtime. type RuntimeDispatch: NativeExecutionDispatch + Send + Sync + 'static; /// Extrinsic pool backend type for the full client. type FullTransactionPoolApi: txpool::ChainApi::Hash, Block = Self::Block> + Send + 'static; /// Extrinsic pool backend type for the light client. type LightTransactionPoolApi: txpool::ChainApi::Hash, Block = Self::Block> + 'static; /// Genesis configuration for the runtime. type Genesis: RuntimeGenesis; /// Other configuration for service members. type Configuration: Default; /// RPC initialisation. type RpcExtensions: rpc::RpcExtension; /// Extended full service type. type FullService: ServiceTrait>; /// Extended light service type. type LightService: ServiceTrait>; /// ImportQueue for full client type FullImportQueue: ImportQueue + 'static; /// ImportQueue for light clients type LightImportQueue: ImportQueue + 'static; /// The Fork Choice Strategy for the chain type SelectChain: SelectChain + 'static; //TODO: replace these with a constructor trait. that TransactionPool implements. (#1242) /// Extrinsic pool constructor for the full client. fn build_full_transaction_pool(config: TransactionPoolOptions, client: Arc>) -> Result, error::Error>; /// Extrinsic pool constructor for the light client. fn build_light_transaction_pool(config: TransactionPoolOptions, client: Arc>) -> Result, error::Error>; /// Build network protocol. fn build_network_protocol(config: &FactoryFullConfiguration) -> Result; /// Build finality proof provider for serving network requests on full node. fn build_finality_proof_provider( client: Arc> ) -> Result>>, error::Error>; /// Build the Fork Choice algorithm for full client fn build_select_chain( config: &mut FactoryFullConfiguration, client: Arc>, ) -> Result; /// Build full service. fn new_full(config: FactoryFullConfiguration) -> Result; /// Build light service. fn new_light(config: FactoryFullConfiguration) -> Result; /// ImportQueue for a full client fn build_full_import_queue( config: &mut FactoryFullConfiguration, _client: Arc>, _select_chain: Self::SelectChain, _transaction_pool: Option>>, ) -> Result { if let Some(name) = config.chain_spec.consensus_engine() { match name { _ => Err(format!("Chain Specification defines unknown consensus engine '{}'", name).into()) } } else { Err("Chain Specification doesn't contain any consensus_engine name".into()) } } /// ImportQueue for a light client fn build_light_import_queue( config: &mut FactoryFullConfiguration, _client: Arc> ) -> Result<(Self::LightImportQueue, BoxFinalityProofRequestBuilder), error::Error> { if let Some(name) = config.chain_spec.consensus_engine() { match name { _ => Err(format!("Chain Specification defines unknown consensus engine '{}'", name).into()) } } else { Err("Chain Specification doesn't contain any consensus_engine name".into()) } } /// Create custom RPC method handlers for full node. fn build_full_rpc_extensions( client: Arc>, transaction_pool: Arc>, ) -> Self::RpcExtensions; /// Create custom RPC method handlers for light node. fn build_light_rpc_extensions( client: Arc>, transaction_pool: Arc>, ) -> Self::RpcExtensions; } /// A collection of types and function to generalize over full / light client type. pub trait Components: Sized + 'static { /// Associated service factory. type Factory: ServiceFactory; /// Client backend. type Backend: 'static + client::backend::Backend, Blake2Hasher>; /// Client executor. type Executor: 'static + client::CallExecutor, Blake2Hasher> + Send + Sync + Clone; /// The type that implements the runtime API. type RuntimeApi: Send + Sync; /// The type that can start all runtime-dependent services. type RuntimeServices: ServiceTrait; /// The type that can extend the RPC methods. type RpcExtensions: rpc::RpcExtension; // TODO: Traitify transaction pool and allow people to implement their own. (#1242) /// Extrinsic pool type. type TransactionPoolApi: 'static + txpool::ChainApi< Hash = as BlockT>::Hash, Block = FactoryBlock >; /// Our Import Queue type ImportQueue: ImportQueue> + 'static; /// The Fork Choice Strategy for the chain type SelectChain: SelectChain>; /// Create client. fn build_client( config: &FactoryFullConfiguration, executor: CodeExecutor, keystore: Option, ) -> Result< ( Arc>, Option>>> ), error::Error >; /// Create extrinsic pool. fn build_transaction_pool(config: TransactionPoolOptions, client: Arc>) -> Result, error::Error>; /// Build the queue that imports blocks from the network, and optionally a way for the network /// to build requests for proofs of finality. fn build_import_queue( config: &mut FactoryFullConfiguration, client: Arc>, select_chain: Option, _transaction_pool: Option>>, ) -> Result<(Self::ImportQueue, Option>>), error::Error>; /// Finality proof provider for serving network requests. fn build_finality_proof_provider( client: Arc> ) -> Result::Block>>>, error::Error>; /// Build fork choice selector fn build_select_chain( config: &mut FactoryFullConfiguration, client: Arc> ) -> Result, error::Error>; /// Build RPC extensions fn build_rpc_extensions( client: Arc>, transaction_pool: Arc>, ) -> Self::RpcExtensions; } /// A struct that implement `Components` for the full client. pub struct FullComponents { service: Service>, } impl FullComponents { /// Create new `FullComponents` pub fn new( config: FactoryFullConfiguration ) -> Result { Ok( Self { service: Service::new(config)?, } ) } } impl Deref for FullComponents { type Target = Service; fn deref(&self) -> &Self::Target { &self.service } } impl DerefMut for FullComponents { fn deref_mut(&mut self) -> &mut Service { &mut self.service } } impl Future for FullComponents { type Item = (); type Error = super::Error; fn poll(&mut self) -> Poll { self.service.poll() } } impl Executor + Send>> for FullComponents { fn execute( &self, future: Box + Send> ) -> Result<(), futures::future::ExecuteError + Send>>> { self.service.execute(future) } } impl Components for FullComponents { type Factory = Factory; type Executor = FullExecutor; type Backend = FullBackend; type TransactionPoolApi = ::FullTransactionPoolApi; type ImportQueue = Factory::FullImportQueue; type RuntimeApi = Factory::RuntimeApi; type RuntimeServices = Factory::FullService; type RpcExtensions = Factory::RpcExtensions; type SelectChain = Factory::SelectChain; fn build_client( config: &FactoryFullConfiguration, executor: CodeExecutor, keystore: Option, ) -> Result< (Arc>, Option>>>), error::Error, > { let db_settings = client_db::DatabaseSettings { cache_size: config.database_cache_size.map(|u| u as usize), state_cache_size: config.state_cache_size, state_cache_child_ratio: config.state_cache_child_ratio.map(|v| (v, 100)), path: config.database_path.clone(), pruning: config.pruning.clone(), }; Ok(( Arc::new( client_db::new_client( db_settings, executor, &config.chain_spec, config.execution_strategies.clone(), keystore, )? ), None, )) } fn build_transaction_pool( config: TransactionPoolOptions, client: Arc> ) -> Result, error::Error> { Factory::build_full_transaction_pool(config, client) } fn build_import_queue( config: &mut FactoryFullConfiguration, client: Arc>, select_chain: Option, transaction_pool: Option>>, ) -> Result<(Self::ImportQueue, Option>>), error::Error> { let select_chain = select_chain .ok_or(error::Error::SelectChainRequired)?; Factory::build_full_import_queue(config, client, select_chain, transaction_pool) .map(|queue| (queue, None)) } fn build_select_chain( config: &mut FactoryFullConfiguration, client: Arc> ) -> Result, error::Error> { Self::Factory::build_select_chain(config, client).map(Some) } fn build_finality_proof_provider( client: Arc> ) -> Result::Block>>>, error::Error> { Factory::build_finality_proof_provider(client) } fn build_rpc_extensions( client: Arc>, transaction_pool: Arc>, ) -> Self::RpcExtensions { Factory::build_full_rpc_extensions(client, transaction_pool) } } /// A struct that implement `Components` for the light client. pub struct LightComponents { service: Service>, } impl LightComponents { /// Create new `LightComponents` pub fn new( config: FactoryFullConfiguration, ) -> Result { Ok( Self { service: Service::new(config)?, } ) } } impl Deref for LightComponents { type Target = Service; fn deref(&self) -> &Self::Target { &self.service } } impl DerefMut for LightComponents { fn deref_mut(&mut self) -> &mut Self::Target { &mut self.service } } impl Future for LightComponents { type Item = (); type Error = super::Error; fn poll(&mut self) -> Poll { self.service.poll() } } impl Executor + Send>> for LightComponents { fn execute( &self, future: Box + Send> ) -> Result<(), futures::future::ExecuteError + Send>>> { self.service.execute(future) } } impl Components for LightComponents { type Factory = Factory; type Executor = LightExecutor; type Backend = LightBackend; type TransactionPoolApi = ::LightTransactionPoolApi; type ImportQueue = ::LightImportQueue; type RuntimeApi = Factory::RuntimeApi; type RuntimeServices = Factory::LightService; type RpcExtensions = Factory::RpcExtensions; type SelectChain = Factory::SelectChain; fn build_client( config: &FactoryFullConfiguration, executor: CodeExecutor, _: Option, ) -> Result< ( Arc>, Option>>> ), error::Error> { let db_settings = client_db::DatabaseSettings { cache_size: None, state_cache_size: config.state_cache_size, state_cache_child_ratio: config.state_cache_child_ratio.map(|v| (v, 100)), path: config.database_path.clone(), pruning: config.pruning.clone(), }; let db_storage = client_db::light::LightStorage::new(db_settings)?; let light_blockchain = client::light::new_light_blockchain(db_storage); let fetch_checker = Arc::new( client::light::new_fetch_checker(light_blockchain.clone(), executor.clone()) ); let fetcher = Arc::new(network::OnDemand::new(fetch_checker)); let client_backend = client::light::new_light_backend(light_blockchain, fetcher.clone()); let client = client::light::new_light(client_backend, fetcher.clone(), &config.chain_spec, executor)?; Ok((Arc::new(client), Some(fetcher))) } fn build_transaction_pool(config: TransactionPoolOptions, client: Arc>) -> Result, error::Error> { Factory::build_light_transaction_pool(config, client) } fn build_import_queue( config: &mut FactoryFullConfiguration, client: Arc>, _select_chain: Option, _transaction_pool: Option>>, ) -> Result<(Self::ImportQueue, Option>>), error::Error> { Factory::build_light_import_queue(config, client) .map(|(queue, builder)| (queue, Some(builder))) } fn build_finality_proof_provider( _client: Arc> ) -> Result::Block>>>, error::Error> { Ok(None) } fn build_select_chain( _config: &mut FactoryFullConfiguration, _client: Arc> ) -> Result, error::Error> { Ok(None) } fn build_rpc_extensions( client: Arc>, transaction_pool: Arc>, ) -> Self::RpcExtensions { Factory::build_light_rpc_extensions(client, transaction_pool) } } #[cfg(test)] mod tests { use super::*; use consensus_common::BlockOrigin; use substrate_test_runtime_client::{prelude::*, runtime::Transfer}; #[test] fn should_remove_transactions_from_the_pool() { let (client, longest_chain) = TestClientBuilder::new().build_with_longest_chain(); let client = Arc::new(client); let pool = TransactionPool::new(Default::default(), ::transaction_pool::ChainApi::new(client.clone())); let transaction = Transfer { amount: 5, nonce: 0, from: AccountKeyring::Alice.into(), to: Default::default(), }.into_signed_tx(); let best = longest_chain.best_chain().unwrap(); // store the transaction in the pool pool.submit_one(&BlockId::hash(best.hash()), transaction.clone()).unwrap(); // import the block let mut builder = client.new_block(Default::default()).unwrap(); builder.push(transaction.clone()).unwrap(); let block = builder.bake().unwrap(); let id = BlockId::hash(block.header().hash()); client.import(BlockOrigin::Own, block).unwrap(); // fire notification - this should clean up the queue assert_eq!(pool.status().ready, 1); maintain_transaction_pool( &id, &client, &pool, ).unwrap(); // then assert_eq!(pool.status().ready, 0); assert_eq!(pool.status().future, 0); } }