// Copyright 2017-2019 Parity Technologies (UK) Ltd. // This file is part of Substrate. // Substrate is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // Substrate is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // You should have received a copy of the GNU General Public License // along with Substrate. If not, see . //! Substrate service components. use std::{sync::Arc, net::SocketAddr, marker::PhantomData, ops::Deref, ops::DerefMut}; use serde::{Serialize, de::DeserializeOwned}; use tokio::runtime::TaskExecutor; use crate::chain_spec::ChainSpec; use client_db; use client::{self, Client, runtime_api::{Metadata, TaggedTransactionQueue}}; use crate::{error, Service, maybe_start_server}; use consensus_common::import_queue::ImportQueue; use network::{self, OnDemand}; use substrate_executor::{NativeExecutor, NativeExecutionDispatch}; use transaction_pool::txpool::{self, Options as TransactionPoolOptions, Pool as TransactionPool}; use runtime_primitives::{ BuildStorage, traits::{Block as BlockT, Header as HeaderT, ProvideRuntimeApi}, generic::BlockId }; use crate::config::Configuration; use primitives::{Blake2Hasher, H256}; use rpc::{self, apis::system::SystemInfo}; use parking_lot::Mutex; // Type aliases. // These exist mainly to avoid typing `::Foo` all over the code. /// Network service type for a factory. pub type NetworkService = network::Service<::Block, ::NetworkProtocol>; /// Code executor type for a factory. pub type CodeExecutor = NativeExecutor<::RuntimeDispatch>; /// Full client backend type for a factory. pub type FullBackend = client_db::Backend<::Block>; /// Full client executor type for a factory. pub type FullExecutor = client::LocalCallExecutor< client_db::Backend<::Block>, CodeExecutor, >; /// Light client backend type for a factory. pub type LightBackend = client::light::backend::Backend< client_db::light::LightStorage<::Block>, network::OnDemand<::Block>, Blake2Hasher, >; /// Light client executor type for a factory. pub type LightExecutor = client::light::call_executor::RemoteOrLocalCallExecutor< ::Block, client::light::backend::Backend< client_db::light::LightStorage<::Block>, network::OnDemand<::Block>, Blake2Hasher >, client::light::call_executor::RemoteCallExecutor< client::light::blockchain::Blockchain< client_db::light::LightStorage<::Block>, network::OnDemand<::Block> >, network::OnDemand<::Block> >, client::LocalCallExecutor< client::light::backend::Backend< client_db::light::LightStorage<::Block>, network::OnDemand<::Block>, Blake2Hasher >, CodeExecutor > >; /// Full client type for a factory. pub type FullClient = Client, FullExecutor, ::Block, ::RuntimeApi>; /// Light client type for a factory. pub type LightClient = Client, LightExecutor, ::Block, ::RuntimeApi>; /// `ChainSpec` specialization for a factory. pub type FactoryChainSpec = ChainSpec<::Genesis>; /// `Genesis` specialization for a factory. pub type FactoryGenesis = ::Genesis; /// `Block` type for a factory. pub type FactoryBlock = ::Block; /// `Extrinsic` type for a factory. pub type FactoryExtrinsic = <::Block as BlockT>::Extrinsic; /// `Number` type for a factory. pub type FactoryBlockNumber = < as BlockT>::Header as HeaderT>::Number; /// Full `Configuration` type for a factory. pub type FactoryFullConfiguration = Configuration<::Configuration, FactoryGenesis>; /// Client type for `Components`. pub type ComponentClient = Client< ::Backend, ::Executor, FactoryBlock<::Factory>, ::RuntimeApi, >; /// Block type for `Components` pub type ComponentBlock = <::Factory as ServiceFactory>::Block; /// Extrinsic hash type for `Components` pub type ComponentExHash = <::TransactionPoolApi as txpool::ChainApi>::Hash; /// Extrinsic type. pub type ComponentExtrinsic = as BlockT>::Extrinsic; /// Extrinsic pool API type for `Components`. pub type PoolApi = ::TransactionPoolApi; /// A set of traits for the runtime genesis config. pub trait RuntimeGenesis: Serialize + DeserializeOwned + BuildStorage {} impl RuntimeGenesis for T {} /// Something that can start the RPC service. pub trait StartRPC { type ServersHandle: Send + Sync; fn start_rpc( client: Arc>, network: Arc>>, should_have_peers: bool, system_info: SystemInfo, rpc_http: Option, rpc_ws: Option, task_executor: TaskExecutor, transaction_pool: Arc>, ) -> error::Result; } impl StartRPC for C where ComponentClient: ProvideRuntimeApi, as ProvideRuntimeApi>::Api: Metadata>, { type ServersHandle = (Option, Option>); fn start_rpc( client: Arc>, network: Arc>>, should_have_peers: bool, rpc_system_info: SystemInfo, rpc_http: Option, rpc_ws: Option, task_executor: TaskExecutor, transaction_pool: Arc>, ) -> error::Result { let handler = || { let client = client.clone(); let subscriptions = rpc::apis::Subscriptions::new(task_executor.clone()); let chain = rpc::apis::chain::Chain::new(client.clone(), subscriptions.clone()); let state = rpc::apis::state::State::new(client.clone(), subscriptions.clone()); let author = rpc::apis::author::Author::new( client.clone(), transaction_pool.clone(), subscriptions ); let system = rpc::apis::system::System::new( rpc_system_info.clone(), network.clone(), should_have_peers ); rpc::rpc_handler::, ComponentExHash, _, _, _, _>( state, chain, author, system, ) }; Ok(( maybe_start_server(rpc_http, |address| rpc::start_http(address, handler()))?, maybe_start_server(rpc_ws, |address| rpc::start_ws(address, handler()))?.map(Mutex::new), )) } } /// Something that can maintain transaction pool on every imported block. pub trait MaintainTransactionPool { fn on_block_imported( id: &BlockId>, client: &ComponentClient, transaction_pool: &TransactionPool, ) -> error::Result<()>; } fn on_block_imported( id: &BlockId, client: &Client, transaction_pool: &TransactionPool, ) -> error::Result<()> where Block: BlockT::Out>, Backend: client::backend::Backend, Client: ProvideRuntimeApi, as ProvideRuntimeApi>::Api: TaggedTransactionQueue, Executor: client::CallExecutor, PoolApi: txpool::ChainApi, { // Avoid calling into runtime if there is nothing to prune from the pool anyway. if transaction_pool.status().is_empty() { return Ok(()) } if let Some(block) = client.block(id)? { let parent_id = BlockId::hash(*block.block.header().parent_hash()); let extrinsics = block.block.extrinsics(); transaction_pool.prune(id, &parent_id, extrinsics).map_err(|e| format!("{:?}", e))?; } Ok(()) } impl MaintainTransactionPool for C where ComponentClient: ProvideRuntimeApi, as ProvideRuntimeApi>::Api: TaggedTransactionQueue>, { fn on_block_imported( id: &BlockId>, client: &ComponentClient, transaction_pool: &TransactionPool, ) -> error::Result<()> { on_block_imported(id, client, transaction_pool) } } /// The super trait that combines all required traits a `Service` needs to implement. pub trait ServiceTrait: Deref> + Send + Sync + 'static + StartRPC + MaintainTransactionPool {} impl ServiceTrait for T where T: Deref> + Send + Sync + 'static + StartRPC + MaintainTransactionPool {} /// A collection of types and methods to build a service on top of the substrate service. pub trait ServiceFactory: 'static + Sized { /// Block type. type Block: BlockT; /// The type that implements the runtime API. type RuntimeApi: Send + Sync; /// Network protocol extensions. type NetworkProtocol: network::specialization::NetworkSpecialization; /// Chain runtime. type RuntimeDispatch: NativeExecutionDispatch + Send + Sync + 'static; /// Extrinsic pool backend type for the full client. type FullTransactionPoolApi: txpool::ChainApi::Hash, Block = Self::Block> + Send + 'static; /// Extrinsic pool backend type for the light client. type LightTransactionPoolApi: txpool::ChainApi::Hash, Block = Self::Block> + 'static; /// Genesis configuration for the runtime. type Genesis: RuntimeGenesis; /// Other configuration for service members. type Configuration: Default; /// Extended full service type. type FullService: ServiceTrait>; /// Extended light service type. type LightService: ServiceTrait>; /// ImportQueue for full client type FullImportQueue: consensus_common::import_queue::ImportQueue + 'static; /// ImportQueue for light clients type LightImportQueue: consensus_common::import_queue::ImportQueue + 'static; //TODO: replace these with a constructor trait. that TransactionPool implements. (#1242) /// Extrinsic pool constructor for the full client. fn build_full_transaction_pool(config: TransactionPoolOptions, client: Arc>) -> Result, error::Error>; /// Extrinsic pool constructor for the light client. fn build_light_transaction_pool(config: TransactionPoolOptions, client: Arc>) -> Result, error::Error>; /// Build network protocol. fn build_network_protocol(config: &FactoryFullConfiguration) -> Result; /// Build full service. fn new_full(config: FactoryFullConfiguration, executor: TaskExecutor) -> Result; /// Build light service. fn new_light(config: FactoryFullConfiguration, executor: TaskExecutor) -> Result; /// ImportQueue for a full client fn build_full_import_queue( config: &mut FactoryFullConfiguration, _client: Arc> ) -> Result { if let Some(name) = config.chain_spec.consensus_engine() { match name { _ => Err(format!("Chain Specification defines unknown consensus engine '{}'", name).into()) } } else { Err("Chain Specification doesn't contain any consensus_engine name".into()) } } /// ImportQueue for a light client fn build_light_import_queue( config: &mut FactoryFullConfiguration, _client: Arc> ) -> Result { if let Some(name) = config.chain_spec.consensus_engine() { match name { _ => Err(format!("Chain Specification defines unknown consensus engine '{}'", name).into()) } } else { Err("Chain Specification doesn't contain any consensus_engine name".into()) } } } /// A collection of types and function to generalise over full / light client type. pub trait Components: Sized + 'static { /// Associated service factory. type Factory: ServiceFactory; /// Client backend. type Backend: 'static + client::backend::Backend, Blake2Hasher>; /// Client executor. type Executor: 'static + client::CallExecutor, Blake2Hasher> + Send + Sync + Clone; /// The type that implements the runtime API. type RuntimeApi: Send + Sync; /// A type that can start the RPC. type RPC: StartRPC; // TODO: Traitify transaction pool and allow people to implement their own. (#1242) /// A type that can maintain transaction pool. type TransactionPool: MaintainTransactionPool; /// Extrinsic pool type. type TransactionPoolApi: 'static + txpool::ChainApi< Hash = as BlockT>::Hash, Block = FactoryBlock >; /// Our Import Queue type ImportQueue: ImportQueue> + 'static; /// Create client. fn build_client( config: &FactoryFullConfiguration, executor: CodeExecutor, ) -> Result< ( Arc>, Option>>> ), error::Error >; /// Create extrinsic pool. fn build_transaction_pool(config: TransactionPoolOptions, client: Arc>) -> Result, error::Error>; /// instance of import queue for clients fn build_import_queue( config: &mut FactoryFullConfiguration, client: Arc> ) -> Result; } /// A struct that implement `Components` for the full client. pub struct FullComponents { _factory: PhantomData, service: Service>, } impl FullComponents { pub fn new( config: FactoryFullConfiguration, task_executor: TaskExecutor ) -> Result { Ok( Self { _factory: Default::default(), service: Service::new(config, task_executor)?, } ) } } impl Deref for FullComponents { type Target = Service; fn deref(&self) -> &Self::Target { &self.service } } impl DerefMut for FullComponents { fn deref_mut(&mut self) -> &mut Service { &mut self.service } } impl Components for FullComponents { type Factory = Factory; type Executor = FullExecutor; type Backend = FullBackend; type TransactionPoolApi = ::FullTransactionPoolApi; type ImportQueue = Factory::FullImportQueue; type RuntimeApi = Factory::RuntimeApi; type RPC = Factory::FullService; type TransactionPool = Factory::FullService; fn build_client( config: &FactoryFullConfiguration, executor: CodeExecutor, ) -> Result<( Arc>, Option>>> ), error::Error> { let db_settings = client_db::DatabaseSettings { cache_size: config.database_cache_size.map(|u| u as usize), path: config.database_path.as_str().into(), pruning: config.pruning.clone(), }; Ok((Arc::new(client_db::new_client( db_settings, executor, &config.chain_spec, config.execution_strategies.clone(), )?), None)) } fn build_transaction_pool(config: TransactionPoolOptions, client: Arc>) -> Result, error::Error> { Factory::build_full_transaction_pool(config, client) } fn build_import_queue( config: &mut FactoryFullConfiguration, client: Arc> ) -> Result { Factory::build_full_import_queue(config, client) } } /// A struct that implement `Components` for the light client. pub struct LightComponents { _factory: PhantomData, service: Service>, } impl LightComponents { pub fn new( config: FactoryFullConfiguration, task_executor: TaskExecutor ) -> Result { Ok( Self { _factory: Default::default(), service: Service::new(config, task_executor)?, } ) } } impl Deref for LightComponents { type Target = Service; fn deref(&self) -> &Self::Target { &self.service } } impl Components for LightComponents { type Factory = Factory; type Executor = LightExecutor; type Backend = LightBackend; type TransactionPoolApi = ::LightTransactionPoolApi; type ImportQueue = ::LightImportQueue; type RuntimeApi = Factory::RuntimeApi; type RPC = Factory::LightService; type TransactionPool = Factory::LightService; fn build_client( config: &FactoryFullConfiguration, executor: CodeExecutor, ) -> Result< ( Arc>, Option>>> ), error::Error> { let db_settings = client_db::DatabaseSettings { cache_size: None, path: config.database_path.as_str().into(), pruning: config.pruning.clone(), }; let db_storage = client_db::light::LightStorage::new(db_settings)?; let light_blockchain = client::light::new_light_blockchain(db_storage); let fetch_checker = Arc::new(client::light::new_fetch_checker(light_blockchain.clone(), executor.clone())); let fetcher = Arc::new(network::OnDemand::new(fetch_checker)); let client_backend = client::light::new_light_backend(light_blockchain, fetcher.clone()); let client = client::light::new_light(client_backend, fetcher.clone(), &config.chain_spec, executor)?; Ok((Arc::new(client), Some(fetcher))) } fn build_transaction_pool(config: TransactionPoolOptions, client: Arc>) -> Result, error::Error> { Factory::build_light_transaction_pool(config, client) } fn build_import_queue( config: &mut FactoryFullConfiguration, client: Arc> ) -> Result { Factory::build_light_import_queue(config, client) } } #[cfg(test)] mod tests { use super::*; use parity_codec::Encode; use consensus_common::BlockOrigin; use substrate_test_client::{self, TestClient, AccountKeyring, runtime::{Extrinsic, Transfer}}; #[test] fn should_remove_transactions_from_the_pool() { let client = Arc::new(substrate_test_client::new()); let pool = TransactionPool::new(Default::default(), ::transaction_pool::ChainApi::new(client.clone())); let transaction = { let transfer = Transfer { amount: 5, nonce: 0, from: AccountKeyring::Alice.into(), to: Default::default(), }; let signature = AccountKeyring::from_public(&transfer.from).unwrap().sign(&transfer.encode()).into(); Extrinsic::Transfer(transfer, signature) }; // store the transaction in the pool pool.submit_one(&BlockId::hash(client.best_block_header().unwrap().hash()), transaction.clone()).unwrap(); // import the block let mut builder = client.new_block().unwrap(); builder.push(transaction.clone()).unwrap(); let block = builder.bake().unwrap(); let id = BlockId::hash(block.header().hash()); client.import(BlockOrigin::Own, block).unwrap(); // fire notification - this should clean up the queue assert_eq!(pool.status().ready, 1); on_block_imported( &id, &client, &pool, ).unwrap(); // then assert_eq!(pool.status().ready, 0); assert_eq!(pool.status().future, 0); } }