diff --git a/substrate/core/cli/src/lib.rs b/substrate/core/cli/src/lib.rs index 5821eddfef..0e06c6232d 100644 --- a/substrate/core/cli/src/lib.rs +++ b/substrate/core/cli/src/lib.rs @@ -50,7 +50,6 @@ extern crate clap; extern crate error_chain; #[macro_use] extern crate log; -#[macro_use] extern crate structopt; mod params; diff --git a/substrate/core/client/src/client.rs b/substrate/core/client/src/client.rs index 2c85bbd588..a88aa6422c 100644 --- a/substrate/core/client/src/client.rs +++ b/substrate/core/client/src/client.rs @@ -24,7 +24,6 @@ use primitives::AuthorityId; use runtime_primitives::{ Justification, generic::{BlockId, SignedBlock}, - transaction_validity::{TransactionValidity, TransactionTag}, }; use consensus::{ImportBlock, ImportResult, BlockOrigin}; use runtime_primitives::traits::{ @@ -32,7 +31,7 @@ use runtime_primitives::traits::{ ApiRef, ProvideRuntimeApi, Digest, DigestItem, }; use runtime_primitives::BuildStorage; -use runtime_api::{Core as CoreAPI, CallRuntimeAt, TaggedTransactionQueue, ConstructRuntimeApi}; +use runtime_api::{Core as CoreAPI, CallRuntimeAt, ConstructRuntimeApi}; use primitives::{Blake2Hasher, H256, ChangesTrieConfiguration, convert_hash}; use primitives::storage::{StorageKey, StorageData}; use primitives::storage::well_known_keys; @@ -139,8 +138,6 @@ pub struct BlockImportNotification { pub header: Block::Header, /// Is this the new best block. pub is_new_best: bool, - /// Tags provided by transactions imported in that block. - pub tags: Vec, } /// Summary of a finalized block. @@ -537,37 +534,6 @@ impl Client where block_builder::BlockBuilder::at_block(parent, &self) } - // TODO [ToDr] Optimize and re-use tags from the pool. - fn transaction_tags( - &self, - at: Block::Hash, - body: &Option> - ) -> error::Result> where - RA: TaggedTransactionQueue, - E: CallExecutor + Send + Sync + Clone, - { - let id = BlockId::Hash(at); - Ok(match body { - None => vec![], - Some(ref extrinsics) => { - let mut tags = vec![]; - for tx in extrinsics { - let tx = self.runtime_api().validate_transaction(&id, &tx)?; - match tx { - TransactionValidity::Valid { mut provides, .. } => { - tags.append(&mut provides); - }, - // silently ignore invalid extrinsics, - // cause they might just be inherent - _ => {} - } - - } - tags - }, - }) - } - fn execute_and_import_block( &self, origin: BlockOrigin, @@ -579,7 +545,6 @@ impl Client where finalized: bool, aux: Vec<(Vec, Option>)>, ) -> error::Result where - RA: TaggedTransactionQueue, E: CallExecutor + Send + Sync + Clone, { let parent_hash = import_headers.post().parent_hash().clone(); @@ -607,7 +572,6 @@ impl Client where self.apply_finality(parent_hash, None, last_best, make_notifications)?; } - let tags = self.transaction_tags(parent_hash, &body)?; let mut transaction = self.backend.begin_operation(BlockId::Hash(parent_hash))?; let (storage_update, changes_update, storage_changes) = match transaction.state()? { Some(transaction_state) => { @@ -697,7 +661,6 @@ impl Client where origin, header: import_headers.into_post(), is_new_best, - tags, }; self.import_notification_sinks.lock() @@ -1055,7 +1018,6 @@ impl consensus::BlockImport for Client B: backend::Backend, E: CallExecutor + Clone + Send + Sync, Block: BlockT, - RA: TaggedTransactionQueue { type Error = Error; diff --git a/substrate/core/consensus/common/src/lib.rs b/substrate/core/consensus/common/src/lib.rs index 227131914c..fdba87b1c7 100644 --- a/substrate/core/consensus/common/src/lib.rs +++ b/substrate/core/consensus/common/src/lib.rs @@ -29,7 +29,6 @@ extern crate sr_primitives as runtime_primitives; extern crate tokio; extern crate parity_codec as codec; -#[macro_use] extern crate parity_codec_derive; #[macro_use] diff --git a/substrate/core/finality-grandpa/src/lib.rs b/substrate/core/finality-grandpa/src/lib.rs index 1a800cd44d..c6f55e89ad 100644 --- a/substrate/core/finality-grandpa/src/lib.rs +++ b/substrate/core/finality-grandpa/src/lib.rs @@ -88,7 +88,6 @@ use client::{ error::Error as ClientError, error::ErrorKind as ClientErrorKind, }; use client::blockchain::HeaderBackend; -use client::runtime_api::TaggedTransactionQueue; use codec::{Encode, Decode}; use consensus_common::{BlockImport, ImportBlock, ImportResult, Authorities}; use runtime_primitives::traits::{ @@ -890,9 +889,9 @@ impl, RA, PRA> BlockImport B: Backend + 'static, E: CallExecutor + 'static + Clone + Send + Sync, DigestFor: Encode, - RA: TaggedTransactionQueue, + RA: Send + Sync, PRA: ProvideRuntimeApi, - PRA::Api: GrandpaApi + PRA::Api: GrandpaApi, { type Error = ClientError; @@ -1059,7 +1058,6 @@ impl, RA, PRA> Authorities for GrandpaBloc where B: Backend + 'static, E: CallExecutor + 'static + Clone + Send + Sync, - RA: TaggedTransactionQueue, // necessary for client to import `BlockImport`. { type Error = as Authorities>::Error; diff --git a/substrate/core/finality-grandpa/src/until_imported.rs b/substrate/core/finality-grandpa/src/until_imported.rs index aba0472131..028b710acb 100644 --- a/substrate/core/finality-grandpa/src/until_imported.rs +++ b/substrate/core/finality-grandpa/src/until_imported.rs @@ -432,7 +432,6 @@ mod tests { origin: BlockOrigin::File, header, is_new_best: false, - tags: Vec::new(), }).unwrap(); } } diff --git a/substrate/core/network/src/test/mod.rs b/substrate/core/network/src/test/mod.rs index daa961f8a2..6b0d36a1fc 100644 --- a/substrate/core/network/src/test/mod.rs +++ b/substrate/core/network/src/test/mod.rs @@ -189,6 +189,7 @@ impl, D> Peer { io.to_disconnect.clone() } + #[cfg(test)] fn with_io<'a, F, U>(&'a self, f: F) -> U where F: FnOnce(&mut TestIo<'a>) -> U { let mut io = TestIo::new(&self.queue, None); f(&mut io) diff --git a/substrate/core/service/src/components.rs b/substrate/core/service/src/components.rs index cc6771a634..fe2e7df385 100644 --- a/substrate/core/service/src/components.rs +++ b/substrate/core/service/src/components.rs @@ -21,15 +21,16 @@ use serde::{Serialize, de::DeserializeOwned}; use tokio::runtime::TaskExecutor; use chain_spec::{ChainSpec, Properties}; use client_db; -use client::{self, Client, runtime_api::{TaggedTransactionQueue, Metadata}}; -use {error, Service, RpcConfig, maybe_start_server, TransactionPoolAdapter}; +use client::{self, Client, runtime_api::{Metadata, TaggedTransactionQueue}}; +use {error, Service, RpcConfig, maybe_start_server}; use network::{self, OnDemand, import_queue::ImportQueue}; use substrate_executor::{NativeExecutor, NativeExecutionDispatch}; use transaction_pool::txpool::{self, Options as TransactionPoolOptions, Pool as TransactionPool}; -use runtime_primitives::{traits::Block as BlockT, traits::Header as HeaderT, BuildStorage, generic::SignedBlock}; +use runtime_primitives::{BuildStorage, traits::{Block as BlockT, Header as HeaderT, ProvideRuntimeApi}, generic::{BlockId, SignedBlock}}; use config::Configuration; use primitives::{Blake2Hasher, H256}; use rpc; +use parking_lot::Mutex; // Type aliases. // These exist mainly to avoid typing `::Foo` all over the code. @@ -118,8 +119,10 @@ impl RuntimeGenesis for T {} /// Something that can start the RPC service. pub trait StartRPC { + type ServersHandle: Send + Sync; + fn start_rpc( - client: Arc, C::RuntimeApi>>, + client: Arc>, chain_name: String, impl_name: &'static str, impl_version: &'static str, @@ -128,15 +131,17 @@ pub trait StartRPC { properties: Properties, task_executor: TaskExecutor, transaction_pool: Arc>, - ) -> Result<(Option, Option), error::Error>; + ) -> error::Result; } -impl StartRPC for T where - T::RuntimeApi: Metadata>, - for<'de> SignedBlock>: ::serde::Deserialize<'de>, +impl StartRPC for C where + C::RuntimeApi: Metadata>, + for<'de> SignedBlock>: ::serde::Deserialize<'de>, { + type ServersHandle = (Option, Option>); + fn start_rpc( - client: Arc, T::RuntimeApi>>, + client: Arc>, chain_name: String, impl_name: &'static str, impl_version: &'static str, @@ -144,8 +149,8 @@ impl StartRPC for T where rpc_ws: Option, properties: Properties, task_executor: TaskExecutor, - transaction_pool: Arc>, - ) -> Result<(Option, Option), error::Error> { + transaction_pool: Arc>, + ) -> error::Result { let rpc_config = RpcConfig { properties, chain_name, impl_name, impl_version }; let handler = || { @@ -156,7 +161,7 @@ impl StartRPC for T where let author = rpc::apis::author::Author::new( client.clone(), transaction_pool.clone(), subscriptions ); - rpc::rpc_handler::, ComponentExHash, _, _, _, _>( + rpc::rpc_handler::, ComponentExHash, _, _, _, _>( state, chain, author, @@ -166,45 +171,59 @@ impl StartRPC for T where Ok(( maybe_start_server(rpc_http, |address| rpc::start_http(address, handler()))?, - maybe_start_server(rpc_ws, |address| rpc::start_ws(address, handler()))?, + maybe_start_server(rpc_ws, |address| rpc::start_ws(address, handler()))?.map(Mutex::new), )) } } -/// Something that can create an instance of `network::Params`. -pub trait CreateNetworkParams { - fn create_network_params( - client: Arc, C::RuntimeApi>>, - roles: network::config::Roles, - network_config: network::config::NetworkConfiguration, - on_demand: Option, NetworkService>>>, - transaction_pool_adapter: TransactionPoolAdapter, - specialization: S, - ) -> network::config::Params, S, ComponentExHash>; +/// Something that can maintain transaction pool on every imported block. +pub trait MaintainTransactionPool { + fn on_block_imported( + id: &BlockId>, + client: &ComponentClient, + transaction_pool: &TransactionPool, + ) -> error::Result<()>; } -impl CreateNetworkParams for T where - T::RuntimeApi: TaggedTransactionQueue> +impl MaintainTransactionPool for C where + ComponentClient: ProvideRuntimeApi, + C::RuntimeApi: TaggedTransactionQueue>, { - fn create_network_params( - client: Arc, T::RuntimeApi>>, - roles: network::config::Roles, - network_config: network::config::NetworkConfiguration, - on_demand: Option, NetworkService>>>, - transaction_pool_adapter: TransactionPoolAdapter, - specialization: S, - ) -> network::config::Params, S, ComponentExHash> { - network::config::Params { - config: network::config::ProtocolConfig { roles }, - network_config, - chain: client, - on_demand: on_demand.map(|d| d as Arc>>), - transaction_pool: Arc::new(transaction_pool_adapter), - specialization, - } + // TODO [ToDr] Optimize and re-use tags from the pool. + fn on_block_imported( + id: &BlockId>, + client: &ComponentClient, + transaction_pool: &TransactionPool, + ) -> error::Result<()> { + use runtime_primitives::transaction_validity::TransactionValidity; + + let block = client.block(id)?; + let tags = match block { + None => return Ok(()), + Some(block) => { + let mut tags = vec![]; + for tx in block.block.extrinsics() { + let tx = client.runtime_api().validate_transaction(id, &tx)?; + match tx { + TransactionValidity::Valid { mut provides, .. } => { + tags.append(&mut provides); + }, + // silently ignore invalid extrinsics, + // cause they might just be inherent + _ => {} + } + + } + tags + } + }; + + transaction_pool.prune_tags(id, tags).map_err(|e| format!("{:?}", e))?; + Ok(()) } } + /// The super trait that combines all required traits a `Service` needs to implement. pub trait ServiceTrait: Deref> @@ -212,10 +231,10 @@ pub trait ServiceTrait: + Sync + 'static + StartRPC - + CreateNetworkParams + + MaintainTransactionPool {} impl ServiceTrait for T where - T: Deref> + Send + Sync + 'static + StartRPC + CreateNetworkParams + T: Deref> + Send + Sync + 'static + StartRPC + MaintainTransactionPool {} /// A collection of types and methods to build a service on top of the substrate service. @@ -245,7 +264,7 @@ pub trait ServiceFactory: 'static + Sized { /// ImportQueue for light clients type LightImportQueue: network::import_queue::ImportQueue + 'static; - //TODO: replace these with a constructor trait. that TransactionPool implements. + //TODO: replace these with a constructor trait. that TransactionPool implements. (#1242) /// Extrinsic pool constructor for the full client. fn build_full_transaction_pool(config: TransactionPoolOptions, client: Arc>) -> Result, error::Error>; @@ -303,17 +322,18 @@ pub trait Components: Sized + 'static { type Backend: 'static + client::backend::Backend, Blake2Hasher>; /// Client executor. type Executor: 'static + client::CallExecutor, Blake2Hasher> + Send + Sync + Clone; - /// Extrinsic pool type. - type TransactionPoolApi: 'static + txpool::ChainApi< - Hash = <::Block as BlockT>::Hash, - Block = FactoryBlock - >; /// The type that implements the runtime API. type RuntimeApi: Send + Sync; /// A type that can start the RPC. type RPC: StartRPC; - /// A type that can create the network params. - type CreateNetworkParams: CreateNetworkParams; + // TODO [ToDr] Traitify transaction pool and allow people to implement their own. (#1242) + /// A type that can maintain transaction pool. + type TransactionPool: MaintainTransactionPool; + /// Extrinsic pool type. + type TransactionPoolApi: 'static + txpool::ChainApi< + Hash = as BlockT>::Hash, + Block = FactoryBlock + >; /// Our Import Queue type ImportQueue: ImportQueue> + 'static; @@ -383,7 +403,7 @@ impl Components for FullComponents { type ImportQueue = Factory::FullImportQueue; type RuntimeApi = Factory::RuntimeApi; type RPC = Factory::FullService; - type CreateNetworkParams = Factory::FullService; + type TransactionPool = Factory::FullService; fn build_client( config: &FactoryFullConfiguration, @@ -458,7 +478,7 @@ impl Components for LightComponents { type ImportQueue = ::LightImportQueue; type RuntimeApi = Factory::RuntimeApi; type RPC = Factory::LightService; - type CreateNetworkParams = Factory::LightService; + type TransactionPool = Factory::LightService; fn build_client( config: &FactoryFullConfiguration, diff --git a/substrate/core/service/src/error.rs b/substrate/core/service/src/error.rs index ce6f61ca04..c3bacd1980 100644 --- a/substrate/core/service/src/error.rs +++ b/substrate/core/service/src/error.rs @@ -24,6 +24,7 @@ error_chain! { foreign_links { Io(::std::io::Error) #[doc="IO error"]; } + links { Client(client::error::Error, client::error::ErrorKind) #[doc="Client error"]; Network(network::error::Error, network::error::ErrorKind) #[doc="Network error"]; diff --git a/substrate/core/service/src/lib.rs b/substrate/core/service/src/lib.rs index f5e578d72f..603f9df82a 100644 --- a/substrate/core/service/src/lib.rs +++ b/substrate/core/service/src/lib.rs @@ -63,11 +63,10 @@ use std::collections::HashMap; #[doc(hidden)] pub use std::{ops::Deref, result::Result, sync::Arc}; use futures::prelude::*; -use parking_lot::Mutex; use keystore::Store as Keystore; use client::BlockchainEvents; -use runtime_primitives::traits::{Header, As}; use runtime_primitives::generic::BlockId; +use runtime_primitives::traits::{Header, As}; use exit_future::Signal; #[doc(hidden)] pub use tokio::runtime::TaskExecutor; @@ -88,7 +87,7 @@ pub use components::{ServiceFactory, FullBackend, FullExecutor, LightBackend, FactoryFullConfiguration, RuntimeGenesis, FactoryGenesis, ComponentExHash, ComponentExtrinsic, FactoryExtrinsic }; -use components::{StartRPC, CreateNetworkParams}; +use components::{StartRPC, MaintainTransactionPool}; #[doc(hidden)] pub use network::OnDemand; @@ -104,8 +103,7 @@ pub struct Service { signal: Option, /// Configuration of this Service pub config: FactoryFullConfiguration, - _rpc_http: Option, - _rpc_ws: Option>, // WsServer is not `Sync`, but the service needs to be. + _rpc: Box<::std::any::Any + Send + Sync>, _telemetry: Option, } @@ -121,12 +119,7 @@ pub fn new_client(config: &FactoryFullConfi Ok(client) } -impl Service - where - Components: components::Components, - ::Executor: std::clone::Clone, - // ::RuntimeApi: client::runtime_api::BlockBuilder<<::Factory as ServiceFactory>::Block, Error=client::error::Error, OverlayedChanges=client::runtime_api::OverlayedChanges> + Sync + Send + client::runtime_api::Core<<::Factory as components::ServiceFactory>::Block, primitives::AuthorityId, Error=client::error::Error, OverlayedChanges=client::runtime_api::OverlayedChanges> + client::runtime_api::ConstructRuntimeApi::Factory as ServiceFactory>::Block> + client::runtime_api::Metadata<<::Factory as components::ServiceFactory>::Block, Vec, Error=client::error::Error> + client::runtime_api::TaggedTransactionQueue<<::Factory as components::ServiceFactory>::Block, Error=client::error::Error>, -{ +impl Service { /// Creates a new service. pub fn new( mut config: FactoryFullConfiguration, @@ -170,20 +163,20 @@ impl Service let transaction_pool = Arc::new( Components::build_transaction_pool(config.transaction_pool.clone(), client.clone())? ); - let transaction_pool_adapter = TransactionPoolAdapter:: { + let transaction_pool_adapter = Arc::new(TransactionPoolAdapter:: { imports_external_transactions: !(config.roles == Roles::LIGHT), pool: transaction_pool.clone(), client: client.clone(), - }; + }); - let network_params = Components::CreateNetworkParams::create_network_params( - client.clone(), - config.roles, - config.network.clone(), - on_demand.clone(), - transaction_pool_adapter, - network_protocol, - ); + let network_params = network::config::Params { + config: network::config::ProtocolConfig { roles: config.roles }, + network_config: config.network.clone(), + chain: client.clone(), + on_demand: on_demand.as_ref().map(|d| d.clone() as _), + transaction_pool: transaction_pool_adapter.clone() as _, + specialization: network_protocol, + }; let protocol_id = { let protocol_id_full = config.chain_spec.protocol_id().unwrap_or(DEFAULT_PROTOCOL_ID).as_bytes(); @@ -206,15 +199,21 @@ impl Service { // block notifications let network = Arc::downgrade(&network); - let txpool = transaction_pool.clone(); + let txpool = Arc::downgrade(&transaction_pool); + let wclient = Arc::downgrade(&client); let events = client.import_notification_stream() .for_each(move |notification| { if let Some(network) = network.upgrade() { network.on_block_imported(notification.hash, ¬ification.header); } - txpool.prune_tags(&BlockId::hash(notification.hash), notification.tags) - .map_err(|e| warn!("Error removing extrinsics: {:?}", e))?; + if let (Some(txpool), Some(client)) = (txpool.upgrade(), wclient.upgrade()) { + Components::TransactionPool::on_block_imported( + &BlockId::hash(notification.hash), + &*client, + &*txpool, + ).map_err(|e| warn!("Pool error processing new block: {:?}", e))?; + } Ok(()) }) .select(exit.clone()) @@ -241,7 +240,7 @@ impl Service // RPC - let (rpc_http, rpc_ws) = Components::RPC::start_rpc( + let rpc = Components::RPC::start_rpc( client.clone(), config.chain_spec.name().to_string(), config.impl_name, config.impl_version, config.rpc_http, config.rpc_ws, config.chain_spec.properties(), task_executor.clone(), transaction_pool.clone() @@ -275,15 +274,14 @@ impl Service }; Ok(Service { - client: client, + client, network: Some(network), - transaction_pool: transaction_pool, + transaction_pool, signal: Some(signal), - keystore: keystore, + keystore, config, exit, - _rpc_http: rpc_http, - _rpc_ws: rpc_ws.map(Mutex::new), + _rpc: Box::new(rpc), _telemetry: telemetry, }) } diff --git a/substrate/core/telemetry/src/lib.rs b/substrate/core/telemetry/src/lib.rs index 64e55d48b2..927994cca5 100644 --- a/substrate/core/telemetry/src/lib.rs +++ b/substrate/core/telemetry/src/lib.rs @@ -27,7 +27,7 @@ extern crate slog_async; extern crate slog_json; #[macro_use] extern crate log; -#[macro_use(o, kv)] +#[macro_use(o)] extern crate slog; extern crate slog_scope; diff --git a/substrate/core/transaction-pool/graph/src/pool.rs b/substrate/core/transaction-pool/graph/src/pool.rs index d9b4b33f36..a16db8206e 100644 --- a/substrate/core/transaction-pool/graph/src/pool.rs +++ b/substrate/core/transaction-pool/graph/src/pool.rs @@ -54,7 +54,7 @@ pub type TransactionFor = Arc, ExtrinsicFor>>; pub trait ChainApi: Send + Sync { /// Block type. type Block: traits::Block; - /// Hash type + /// Transaction Hash type type Hash: hash::Hash + Eq + traits::Member + Serialize; /// Error type. type Error: From + error::IntoPoolError; diff --git a/substrate/srml/support/src/lib.rs b/substrate/srml/support/src/lib.rs index f825ef9027..2f4b9b3ecb 100644 --- a/substrate/srml/support/src/lib.rs +++ b/substrate/srml/support/src/lib.rs @@ -30,7 +30,6 @@ pub extern crate sr_primitives as runtime_primitives; extern crate srml_metadata; extern crate mashup; - #[macro_use] extern crate srml_support_procedural;