Move pool maintainance to a background task. (#1236)

* Move pool maintainance to a background task.

* Remove commented code.

* Add issue number to TODOs

* Fix warnings and tests.
This commit is contained in:
Tomasz Drwięga
2018-12-10 13:13:48 +00:00
committed by Bastian Köcher
parent 742cb33d90
commit 507c13e31c
12 changed files with 108 additions and 132 deletions
-1
View File
@@ -50,7 +50,6 @@ extern crate clap;
extern crate error_chain; extern crate error_chain;
#[macro_use] #[macro_use]
extern crate log; extern crate log;
#[macro_use]
extern crate structopt; extern crate structopt;
mod params; mod params;
+1 -39
View File
@@ -24,7 +24,6 @@ use primitives::AuthorityId;
use runtime_primitives::{ use runtime_primitives::{
Justification, Justification,
generic::{BlockId, SignedBlock}, generic::{BlockId, SignedBlock},
transaction_validity::{TransactionValidity, TransactionTag},
}; };
use consensus::{ImportBlock, ImportResult, BlockOrigin}; use consensus::{ImportBlock, ImportResult, BlockOrigin};
use runtime_primitives::traits::{ use runtime_primitives::traits::{
@@ -32,7 +31,7 @@ use runtime_primitives::traits::{
ApiRef, ProvideRuntimeApi, Digest, DigestItem, ApiRef, ProvideRuntimeApi, Digest, DigestItem,
}; };
use runtime_primitives::BuildStorage; use runtime_primitives::BuildStorage;
use runtime_api::{Core as CoreAPI, CallRuntimeAt, TaggedTransactionQueue, ConstructRuntimeApi}; use runtime_api::{Core as CoreAPI, CallRuntimeAt, ConstructRuntimeApi};
use primitives::{Blake2Hasher, H256, ChangesTrieConfiguration, convert_hash}; use primitives::{Blake2Hasher, H256, ChangesTrieConfiguration, convert_hash};
use primitives::storage::{StorageKey, StorageData}; use primitives::storage::{StorageKey, StorageData};
use primitives::storage::well_known_keys; use primitives::storage::well_known_keys;
@@ -139,8 +138,6 @@ pub struct BlockImportNotification<Block: BlockT> {
pub header: Block::Header, pub header: Block::Header,
/// Is this the new best block. /// Is this the new best block.
pub is_new_best: bool, pub is_new_best: bool,
/// Tags provided by transactions imported in that block.
pub tags: Vec<TransactionTag>,
} }
/// Summary of a finalized block. /// Summary of a finalized block.
@@ -537,37 +534,6 @@ impl<B, E, Block, RA> Client<B, E, Block, RA> where
block_builder::BlockBuilder::at_block(parent, &self) block_builder::BlockBuilder::at_block(parent, &self)
} }
// TODO [ToDr] Optimize and re-use tags from the pool.
fn transaction_tags(
&self,
at: Block::Hash,
body: &Option<Vec<Block::Extrinsic>>
) -> error::Result<Vec<TransactionTag>> where
RA: TaggedTransactionQueue<Block>,
E: CallExecutor<Block, Blake2Hasher> + Send + Sync + Clone,
{
let id = BlockId::Hash(at);
Ok(match body {
None => vec![],
Some(ref extrinsics) => {
let mut tags = vec![];
for tx in extrinsics {
let tx = self.runtime_api().validate_transaction(&id, &tx)?;
match tx {
TransactionValidity::Valid { mut provides, .. } => {
tags.append(&mut provides);
},
// silently ignore invalid extrinsics,
// cause they might just be inherent
_ => {}
}
}
tags
},
})
}
fn execute_and_import_block( fn execute_and_import_block(
&self, &self,
origin: BlockOrigin, origin: BlockOrigin,
@@ -579,7 +545,6 @@ impl<B, E, Block, RA> Client<B, E, Block, RA> where
finalized: bool, finalized: bool,
aux: Vec<(Vec<u8>, Option<Vec<u8>>)>, aux: Vec<(Vec<u8>, Option<Vec<u8>>)>,
) -> error::Result<ImportResult> where ) -> error::Result<ImportResult> where
RA: TaggedTransactionQueue<Block>,
E: CallExecutor<Block, Blake2Hasher> + Send + Sync + Clone, E: CallExecutor<Block, Blake2Hasher> + Send + Sync + Clone,
{ {
let parent_hash = import_headers.post().parent_hash().clone(); let parent_hash = import_headers.post().parent_hash().clone();
@@ -607,7 +572,6 @@ impl<B, E, Block, RA> Client<B, E, Block, RA> where
self.apply_finality(parent_hash, None, last_best, make_notifications)?; self.apply_finality(parent_hash, None, last_best, make_notifications)?;
} }
let tags = self.transaction_tags(parent_hash, &body)?;
let mut transaction = self.backend.begin_operation(BlockId::Hash(parent_hash))?; let mut transaction = self.backend.begin_operation(BlockId::Hash(parent_hash))?;
let (storage_update, changes_update, storage_changes) = match transaction.state()? { let (storage_update, changes_update, storage_changes) = match transaction.state()? {
Some(transaction_state) => { Some(transaction_state) => {
@@ -697,7 +661,6 @@ impl<B, E, Block, RA> Client<B, E, Block, RA> where
origin, origin,
header: import_headers.into_post(), header: import_headers.into_post(),
is_new_best, is_new_best,
tags,
}; };
self.import_notification_sinks.lock() self.import_notification_sinks.lock()
@@ -1055,7 +1018,6 @@ impl<B, E, Block, RA> consensus::BlockImport<Block> for Client<B, E, Block, RA>
B: backend::Backend<Block, Blake2Hasher>, B: backend::Backend<Block, Blake2Hasher>,
E: CallExecutor<Block, Blake2Hasher> + Clone + Send + Sync, E: CallExecutor<Block, Blake2Hasher> + Clone + Send + Sync,
Block: BlockT<Hash=H256>, Block: BlockT<Hash=H256>,
RA: TaggedTransactionQueue<Block>
{ {
type Error = Error; type Error = Error;
@@ -29,7 +29,6 @@ extern crate sr_primitives as runtime_primitives;
extern crate tokio; extern crate tokio;
extern crate parity_codec as codec; extern crate parity_codec as codec;
#[macro_use]
extern crate parity_codec_derive; extern crate parity_codec_derive;
#[macro_use] #[macro_use]
+2 -4
View File
@@ -88,7 +88,6 @@ use client::{
error::Error as ClientError, error::ErrorKind as ClientErrorKind, error::Error as ClientError, error::ErrorKind as ClientErrorKind,
}; };
use client::blockchain::HeaderBackend; use client::blockchain::HeaderBackend;
use client::runtime_api::TaggedTransactionQueue;
use codec::{Encode, Decode}; use codec::{Encode, Decode};
use consensus_common::{BlockImport, ImportBlock, ImportResult, Authorities}; use consensus_common::{BlockImport, ImportBlock, ImportResult, Authorities};
use runtime_primitives::traits::{ use runtime_primitives::traits::{
@@ -890,9 +889,9 @@ impl<B, E, Block: BlockT<Hash=H256>, RA, PRA> BlockImport<Block>
B: Backend<Block, Blake2Hasher> + 'static, B: Backend<Block, Blake2Hasher> + 'static,
E: CallExecutor<Block, Blake2Hasher> + 'static + Clone + Send + Sync, E: CallExecutor<Block, Blake2Hasher> + 'static + Clone + Send + Sync,
DigestFor<Block>: Encode, DigestFor<Block>: Encode,
RA: TaggedTransactionQueue<Block>, RA: Send + Sync,
PRA: ProvideRuntimeApi, PRA: ProvideRuntimeApi,
PRA::Api: GrandpaApi<Block> PRA::Api: GrandpaApi<Block>,
{ {
type Error = ClientError; type Error = ClientError;
@@ -1059,7 +1058,6 @@ impl<B, E, Block: BlockT<Hash=H256>, RA, PRA> Authorities<Block> for GrandpaBloc
where where
B: Backend<Block, Blake2Hasher> + 'static, B: Backend<Block, Blake2Hasher> + 'static,
E: CallExecutor<Block, Blake2Hasher> + 'static + Clone + Send + Sync, E: CallExecutor<Block, Blake2Hasher> + 'static + Clone + Send + Sync,
RA: TaggedTransactionQueue<Block>, // necessary for client to import `BlockImport`.
{ {
type Error = <Client<B, E, Block, RA> as Authorities<Block>>::Error; type Error = <Client<B, E, Block, RA> as Authorities<Block>>::Error;
@@ -432,7 +432,6 @@ mod tests {
origin: BlockOrigin::File, origin: BlockOrigin::File,
header, header,
is_new_best: false, is_new_best: false,
tags: Vec::new(),
}).unwrap(); }).unwrap();
} }
} }
+1
View File
@@ -189,6 +189,7 @@ impl<V: 'static + Verifier<Block>, D> Peer<V, D> {
io.to_disconnect.clone() io.to_disconnect.clone()
} }
#[cfg(test)]
fn with_io<'a, F, U>(&'a self, f: F) -> U where F: FnOnce(&mut TestIo<'a>) -> U { fn with_io<'a, F, U>(&'a self, f: F) -> U where F: FnOnce(&mut TestIo<'a>) -> U {
let mut io = TestIo::new(&self.queue, None); let mut io = TestIo::new(&self.queue, None);
f(&mut io) f(&mut io)
+72 -52
View File
@@ -21,15 +21,16 @@ use serde::{Serialize, de::DeserializeOwned};
use tokio::runtime::TaskExecutor; use tokio::runtime::TaskExecutor;
use chain_spec::{ChainSpec, Properties}; use chain_spec::{ChainSpec, Properties};
use client_db; use client_db;
use client::{self, Client, runtime_api::{TaggedTransactionQueue, Metadata}}; use client::{self, Client, runtime_api::{Metadata, TaggedTransactionQueue}};
use {error, Service, RpcConfig, maybe_start_server, TransactionPoolAdapter}; use {error, Service, RpcConfig, maybe_start_server};
use network::{self, OnDemand, import_queue::ImportQueue}; use network::{self, OnDemand, import_queue::ImportQueue};
use substrate_executor::{NativeExecutor, NativeExecutionDispatch}; use substrate_executor::{NativeExecutor, NativeExecutionDispatch};
use transaction_pool::txpool::{self, Options as TransactionPoolOptions, Pool as TransactionPool}; use transaction_pool::txpool::{self, Options as TransactionPoolOptions, Pool as TransactionPool};
use runtime_primitives::{traits::Block as BlockT, traits::Header as HeaderT, BuildStorage, generic::SignedBlock}; use runtime_primitives::{BuildStorage, traits::{Block as BlockT, Header as HeaderT, ProvideRuntimeApi}, generic::{BlockId, SignedBlock}};
use config::Configuration; use config::Configuration;
use primitives::{Blake2Hasher, H256}; use primitives::{Blake2Hasher, H256};
use rpc; use rpc;
use parking_lot::Mutex;
// Type aliases. // Type aliases.
// These exist mainly to avoid typing `<F as Factory>::Foo` all over the code. // These exist mainly to avoid typing `<F as Factory>::Foo` all over the code.
@@ -118,8 +119,10 @@ impl<T: Serialize + DeserializeOwned + BuildStorage> RuntimeGenesis for T {}
/// Something that can start the RPC service. /// Something that can start the RPC service.
pub trait StartRPC<C: Components> { pub trait StartRPC<C: Components> {
type ServersHandle: Send + Sync;
fn start_rpc( fn start_rpc(
client: Arc<Client<C::Backend, C::Executor, ComponentBlock<C>, C::RuntimeApi>>, client: Arc<ComponentClient<C>>,
chain_name: String, chain_name: String,
impl_name: &'static str, impl_name: &'static str,
impl_version: &'static str, impl_version: &'static str,
@@ -128,15 +131,17 @@ pub trait StartRPC<C: Components> {
properties: Properties, properties: Properties,
task_executor: TaskExecutor, task_executor: TaskExecutor,
transaction_pool: Arc<TransactionPool<C::TransactionPoolApi>>, transaction_pool: Arc<TransactionPool<C::TransactionPoolApi>>,
) -> Result<(Option<rpc::HttpServer>, Option<rpc::WsServer>), error::Error>; ) -> error::Result<Self::ServersHandle>;
} }
impl<T: Components> StartRPC<Self> for T where impl<C: Components> StartRPC<Self> for C where
T::RuntimeApi: Metadata<ComponentBlock<T>>, C::RuntimeApi: Metadata<ComponentBlock<C>>,
for<'de> SignedBlock<ComponentBlock<T>>: ::serde::Deserialize<'de>, for<'de> SignedBlock<ComponentBlock<C>>: ::serde::Deserialize<'de>,
{ {
type ServersHandle = (Option<rpc::HttpServer>, Option<Mutex<rpc::WsServer>>);
fn start_rpc( fn start_rpc(
client: Arc<Client<T::Backend, T::Executor, ComponentBlock<T>, T::RuntimeApi>>, client: Arc<ComponentClient<C>>,
chain_name: String, chain_name: String,
impl_name: &'static str, impl_name: &'static str,
impl_version: &'static str, impl_version: &'static str,
@@ -144,8 +149,8 @@ impl<T: Components> StartRPC<Self> for T where
rpc_ws: Option<SocketAddr>, rpc_ws: Option<SocketAddr>,
properties: Properties, properties: Properties,
task_executor: TaskExecutor, task_executor: TaskExecutor,
transaction_pool: Arc<TransactionPool<T::TransactionPoolApi>>, transaction_pool: Arc<TransactionPool<C::TransactionPoolApi>>,
) -> Result<(Option<rpc::HttpServer>, Option<rpc::WsServer>), error::Error> { ) -> error::Result<Self::ServersHandle> {
let rpc_config = RpcConfig { properties, chain_name, impl_name, impl_version }; let rpc_config = RpcConfig { properties, chain_name, impl_name, impl_version };
let handler = || { let handler = || {
@@ -156,7 +161,7 @@ impl<T: Components> StartRPC<Self> for T where
let author = rpc::apis::author::Author::new( let author = rpc::apis::author::Author::new(
client.clone(), transaction_pool.clone(), subscriptions client.clone(), transaction_pool.clone(), subscriptions
); );
rpc::rpc_handler::<ComponentBlock<T>, ComponentExHash<T>, _, _, _, _>( rpc::rpc_handler::<ComponentBlock<C>, ComponentExHash<C>, _, _, _, _>(
state, state,
chain, chain,
author, author,
@@ -166,45 +171,59 @@ impl<T: Components> StartRPC<Self> for T where
Ok(( Ok((
maybe_start_server(rpc_http, |address| rpc::start_http(address, handler()))?, maybe_start_server(rpc_http, |address| rpc::start_http(address, handler()))?,
maybe_start_server(rpc_ws, |address| rpc::start_ws(address, handler()))?, maybe_start_server(rpc_ws, |address| rpc::start_ws(address, handler()))?.map(Mutex::new),
)) ))
} }
} }
/// Something that can create an instance of `network::Params`. /// Something that can maintain transaction pool on every imported block.
pub trait CreateNetworkParams<C: Components> { pub trait MaintainTransactionPool<C: Components> {
fn create_network_params<S>( fn on_block_imported(
client: Arc<Client<C::Backend, C::Executor, ComponentBlock<C>, C::RuntimeApi>>, id: &BlockId<ComponentBlock<C>>,
roles: network::config::Roles, client: &ComponentClient<C>,
network_config: network::config::NetworkConfiguration, transaction_pool: &TransactionPool<C::TransactionPoolApi>,
on_demand: Option<Arc<OnDemand<FactoryBlock<C::Factory>, NetworkService<C::Factory>>>>, ) -> error::Result<()>;
transaction_pool_adapter: TransactionPoolAdapter<C>,
specialization: S,
) -> network::config::Params<ComponentBlock<C>, S, ComponentExHash<C>>;
} }
impl<T: Components> CreateNetworkParams<Self> for T where impl<C: Components> MaintainTransactionPool<Self> for C where
T::RuntimeApi: TaggedTransactionQueue<ComponentBlock<T>> ComponentClient<C>: ProvideRuntimeApi<Api = C::RuntimeApi>,
C::RuntimeApi: TaggedTransactionQueue<ComponentBlock<C>>,
{ {
fn create_network_params<S>( // TODO [ToDr] Optimize and re-use tags from the pool.
client: Arc<Client<T::Backend, T::Executor, ComponentBlock<T>, T::RuntimeApi>>, fn on_block_imported(
roles: network::config::Roles, id: &BlockId<ComponentBlock<C>>,
network_config: network::config::NetworkConfiguration, client: &ComponentClient<C>,
on_demand: Option<Arc<OnDemand<FactoryBlock<T::Factory>, NetworkService<T::Factory>>>>, transaction_pool: &TransactionPool<C::TransactionPoolApi>,
transaction_pool_adapter: TransactionPoolAdapter<T>, ) -> error::Result<()> {
specialization: S, use runtime_primitives::transaction_validity::TransactionValidity;
) -> network::config::Params<ComponentBlock<T>, S, ComponentExHash<T>> {
network::config::Params { let block = client.block(id)?;
config: network::config::ProtocolConfig { roles }, let tags = match block {
network_config, None => return Ok(()),
chain: client, Some(block) => {
on_demand: on_demand.map(|d| d as Arc<network::OnDemandService<ComponentBlock<T>>>), let mut tags = vec![];
transaction_pool: Arc::new(transaction_pool_adapter), for tx in block.block.extrinsics() {
specialization, let tx = client.runtime_api().validate_transaction(id, &tx)?;
match tx {
TransactionValidity::Valid { mut provides, .. } => {
tags.append(&mut provides);
},
// silently ignore invalid extrinsics,
// cause they might just be inherent
_ => {}
} }
}
tags
}
};
transaction_pool.prune_tags(id, tags).map_err(|e| format!("{:?}", e))?;
Ok(())
} }
} }
/// The super trait that combines all required traits a `Service` needs to implement. /// The super trait that combines all required traits a `Service` needs to implement.
pub trait ServiceTrait<C: Components>: pub trait ServiceTrait<C: Components>:
Deref<Target = Service<C>> Deref<Target = Service<C>>
@@ -212,10 +231,10 @@ pub trait ServiceTrait<C: Components>:
+ Sync + Sync
+ 'static + 'static
+ StartRPC<C> + StartRPC<C>
+ CreateNetworkParams<C> + MaintainTransactionPool<C>
{} {}
impl<C: Components, T> ServiceTrait<C> for T where impl<C: Components, T> ServiceTrait<C> for T where
T: Deref<Target = Service<C>> + Send + Sync + 'static + StartRPC<C> + CreateNetworkParams<C> T: Deref<Target = Service<C>> + Send + Sync + 'static + StartRPC<C> + MaintainTransactionPool<C>
{} {}
/// A collection of types and methods to build a service on top of the substrate service. /// A collection of types and methods to build a service on top of the substrate service.
@@ -245,7 +264,7 @@ pub trait ServiceFactory: 'static + Sized {
/// ImportQueue for light clients /// ImportQueue for light clients
type LightImportQueue: network::import_queue::ImportQueue<Self::Block> + 'static; type LightImportQueue: network::import_queue::ImportQueue<Self::Block> + 'static;
//TODO: replace these with a constructor trait. that TransactionPool implements. //TODO: replace these with a constructor trait. that TransactionPool implements. (#1242)
/// Extrinsic pool constructor for the full client. /// Extrinsic pool constructor for the full client.
fn build_full_transaction_pool(config: TransactionPoolOptions, client: Arc<FullClient<Self>>) fn build_full_transaction_pool(config: TransactionPoolOptions, client: Arc<FullClient<Self>>)
-> Result<TransactionPool<Self::FullTransactionPoolApi>, error::Error>; -> Result<TransactionPool<Self::FullTransactionPoolApi>, error::Error>;
@@ -303,17 +322,18 @@ pub trait Components: Sized + 'static {
type Backend: 'static + client::backend::Backend<FactoryBlock<Self::Factory>, Blake2Hasher>; type Backend: 'static + client::backend::Backend<FactoryBlock<Self::Factory>, Blake2Hasher>;
/// Client executor. /// Client executor.
type Executor: 'static + client::CallExecutor<FactoryBlock<Self::Factory>, Blake2Hasher> + Send + Sync + Clone; type Executor: 'static + client::CallExecutor<FactoryBlock<Self::Factory>, Blake2Hasher> + Send + Sync + Clone;
/// Extrinsic pool type.
type TransactionPoolApi: 'static + txpool::ChainApi<
Hash = <<Self::Factory as ServiceFactory>::Block as BlockT>::Hash,
Block = FactoryBlock<Self::Factory>
>;
/// The type that implements the runtime API. /// The type that implements the runtime API.
type RuntimeApi: Send + Sync; type RuntimeApi: Send + Sync;
/// A type that can start the RPC. /// A type that can start the RPC.
type RPC: StartRPC<Self>; type RPC: StartRPC<Self>;
/// A type that can create the network params. // TODO [ToDr] Traitify transaction pool and allow people to implement their own. (#1242)
type CreateNetworkParams: CreateNetworkParams<Self>; /// A type that can maintain transaction pool.
type TransactionPool: MaintainTransactionPool<Self>;
/// Extrinsic pool type.
type TransactionPoolApi: 'static + txpool::ChainApi<
Hash = <FactoryBlock<Self::Factory> as BlockT>::Hash,
Block = FactoryBlock<Self::Factory>
>;
/// Our Import Queue /// Our Import Queue
type ImportQueue: ImportQueue<FactoryBlock<Self::Factory>> + 'static; type ImportQueue: ImportQueue<FactoryBlock<Self::Factory>> + 'static;
@@ -383,7 +403,7 @@ impl<Factory: ServiceFactory> Components for FullComponents<Factory> {
type ImportQueue = Factory::FullImportQueue; type ImportQueue = Factory::FullImportQueue;
type RuntimeApi = Factory::RuntimeApi; type RuntimeApi = Factory::RuntimeApi;
type RPC = Factory::FullService; type RPC = Factory::FullService;
type CreateNetworkParams = Factory::FullService; type TransactionPool = Factory::FullService;
fn build_client( fn build_client(
config: &FactoryFullConfiguration<Factory>, config: &FactoryFullConfiguration<Factory>,
@@ -458,7 +478,7 @@ impl<Factory: ServiceFactory> Components for LightComponents<Factory> {
type ImportQueue = <Factory as ServiceFactory>::LightImportQueue; type ImportQueue = <Factory as ServiceFactory>::LightImportQueue;
type RuntimeApi = Factory::RuntimeApi; type RuntimeApi = Factory::RuntimeApi;
type RPC = Factory::LightService; type RPC = Factory::LightService;
type CreateNetworkParams = Factory::LightService; type TransactionPool = Factory::LightService;
fn build_client( fn build_client(
config: &FactoryFullConfiguration<Factory>, config: &FactoryFullConfiguration<Factory>,
+1
View File
@@ -24,6 +24,7 @@ error_chain! {
foreign_links { foreign_links {
Io(::std::io::Error) #[doc="IO error"]; Io(::std::io::Error) #[doc="IO error"];
} }
links { links {
Client(client::error::Error, client::error::ErrorKind) #[doc="Client error"]; Client(client::error::Error, client::error::ErrorKind) #[doc="Client error"];
Network(network::error::Error, network::error::ErrorKind) #[doc="Network error"]; Network(network::error::Error, network::error::ErrorKind) #[doc="Network error"];
+28 -30
View File
@@ -63,11 +63,10 @@ use std::collections::HashMap;
#[doc(hidden)] #[doc(hidden)]
pub use std::{ops::Deref, result::Result, sync::Arc}; pub use std::{ops::Deref, result::Result, sync::Arc};
use futures::prelude::*; use futures::prelude::*;
use parking_lot::Mutex;
use keystore::Store as Keystore; use keystore::Store as Keystore;
use client::BlockchainEvents; use client::BlockchainEvents;
use runtime_primitives::traits::{Header, As};
use runtime_primitives::generic::BlockId; use runtime_primitives::generic::BlockId;
use runtime_primitives::traits::{Header, As};
use exit_future::Signal; use exit_future::Signal;
#[doc(hidden)] #[doc(hidden)]
pub use tokio::runtime::TaskExecutor; pub use tokio::runtime::TaskExecutor;
@@ -88,7 +87,7 @@ pub use components::{ServiceFactory, FullBackend, FullExecutor, LightBackend,
FactoryFullConfiguration, RuntimeGenesis, FactoryGenesis, FactoryFullConfiguration, RuntimeGenesis, FactoryGenesis,
ComponentExHash, ComponentExtrinsic, FactoryExtrinsic ComponentExHash, ComponentExtrinsic, FactoryExtrinsic
}; };
use components::{StartRPC, CreateNetworkParams}; use components::{StartRPC, MaintainTransactionPool};
#[doc(hidden)] #[doc(hidden)]
pub use network::OnDemand; pub use network::OnDemand;
@@ -104,8 +103,7 @@ pub struct Service<Components: components::Components> {
signal: Option<Signal>, signal: Option<Signal>,
/// Configuration of this Service /// Configuration of this Service
pub config: FactoryFullConfiguration<Components::Factory>, pub config: FactoryFullConfiguration<Components::Factory>,
_rpc_http: Option<rpc::HttpServer>, _rpc: Box<::std::any::Any + Send + Sync>,
_rpc_ws: Option<Mutex<rpc::WsServer>>, // WsServer is not `Sync`, but the service needs to be.
_telemetry: Option<tel::Telemetry>, _telemetry: Option<tel::Telemetry>,
} }
@@ -121,12 +119,7 @@ pub fn new_client<Factory: components::ServiceFactory>(config: &FactoryFullConfi
Ok(client) Ok(client)
} }
impl<Components> Service<Components> impl<Components: components::Components> Service<Components> {
where
Components: components::Components,
<Components as components::Components>::Executor: std::clone::Clone,
// <Components as components::Components>::RuntimeApi: client::runtime_api::BlockBuilder<<<Components as components::Components>::Factory as ServiceFactory>::Block, Error=client::error::Error, OverlayedChanges=client::runtime_api::OverlayedChanges> + Sync + Send + client::runtime_api::Core<<<Components as components::Components>::Factory as components::ServiceFactory>::Block, primitives::AuthorityId, Error=client::error::Error, OverlayedChanges=client::runtime_api::OverlayedChanges> + client::runtime_api::ConstructRuntimeApi<Block=<<Components as components::Components>::Factory as ServiceFactory>::Block> + client::runtime_api::Metadata<<<Components as components::Components>::Factory as components::ServiceFactory>::Block, Vec<u8>, Error=client::error::Error> + client::runtime_api::TaggedTransactionQueue<<<Components as components::Components>::Factory as components::ServiceFactory>::Block, Error=client::error::Error>,
{
/// Creates a new service. /// Creates a new service.
pub fn new( pub fn new(
mut config: FactoryFullConfiguration<Components::Factory>, mut config: FactoryFullConfiguration<Components::Factory>,
@@ -170,20 +163,20 @@ impl<Components> Service<Components>
let transaction_pool = Arc::new( let transaction_pool = Arc::new(
Components::build_transaction_pool(config.transaction_pool.clone(), client.clone())? Components::build_transaction_pool(config.transaction_pool.clone(), client.clone())?
); );
let transaction_pool_adapter = TransactionPoolAdapter::<Components> { let transaction_pool_adapter = Arc::new(TransactionPoolAdapter::<Components> {
imports_external_transactions: !(config.roles == Roles::LIGHT), imports_external_transactions: !(config.roles == Roles::LIGHT),
pool: transaction_pool.clone(), pool: transaction_pool.clone(),
client: client.clone(), client: client.clone(),
}; });
let network_params = Components::CreateNetworkParams::create_network_params( let network_params = network::config::Params {
client.clone(), config: network::config::ProtocolConfig { roles: config.roles },
config.roles, network_config: config.network.clone(),
config.network.clone(), chain: client.clone(),
on_demand.clone(), on_demand: on_demand.as_ref().map(|d| d.clone() as _),
transaction_pool_adapter, transaction_pool: transaction_pool_adapter.clone() as _,
network_protocol, specialization: network_protocol,
); };
let protocol_id = { let protocol_id = {
let protocol_id_full = config.chain_spec.protocol_id().unwrap_or(DEFAULT_PROTOCOL_ID).as_bytes(); let protocol_id_full = config.chain_spec.protocol_id().unwrap_or(DEFAULT_PROTOCOL_ID).as_bytes();
@@ -206,15 +199,21 @@ impl<Components> Service<Components>
{ {
// block notifications // block notifications
let network = Arc::downgrade(&network); let network = Arc::downgrade(&network);
let txpool = transaction_pool.clone(); let txpool = Arc::downgrade(&transaction_pool);
let wclient = Arc::downgrade(&client);
let events = client.import_notification_stream() let events = client.import_notification_stream()
.for_each(move |notification| { .for_each(move |notification| {
if let Some(network) = network.upgrade() { if let Some(network) = network.upgrade() {
network.on_block_imported(notification.hash, &notification.header); network.on_block_imported(notification.hash, &notification.header);
} }
txpool.prune_tags(&BlockId::hash(notification.hash), notification.tags) if let (Some(txpool), Some(client)) = (txpool.upgrade(), wclient.upgrade()) {
.map_err(|e| warn!("Error removing extrinsics: {:?}", e))?; Components::TransactionPool::on_block_imported(
&BlockId::hash(notification.hash),
&*client,
&*txpool,
).map_err(|e| warn!("Pool error processing new block: {:?}", e))?;
}
Ok(()) Ok(())
}) })
.select(exit.clone()) .select(exit.clone())
@@ -241,7 +240,7 @@ impl<Components> Service<Components>
// RPC // RPC
let (rpc_http, rpc_ws) = Components::RPC::start_rpc( let rpc = Components::RPC::start_rpc(
client.clone(), config.chain_spec.name().to_string(), config.impl_name, client.clone(), config.chain_spec.name().to_string(), config.impl_name,
config.impl_version, config.rpc_http, config.rpc_ws, config.chain_spec.properties(), config.impl_version, config.rpc_http, config.rpc_ws, config.chain_spec.properties(),
task_executor.clone(), transaction_pool.clone() task_executor.clone(), transaction_pool.clone()
@@ -275,15 +274,14 @@ impl<Components> Service<Components>
}; };
Ok(Service { Ok(Service {
client: client, client,
network: Some(network), network: Some(network),
transaction_pool: transaction_pool, transaction_pool,
signal: Some(signal), signal: Some(signal),
keystore: keystore, keystore,
config, config,
exit, exit,
_rpc_http: rpc_http, _rpc: Box::new(rpc),
_rpc_ws: rpc_ws.map(Mutex::new),
_telemetry: telemetry, _telemetry: telemetry,
}) })
} }
+1 -1
View File
@@ -27,7 +27,7 @@ extern crate slog_async;
extern crate slog_json; extern crate slog_json;
#[macro_use] #[macro_use]
extern crate log; extern crate log;
#[macro_use(o, kv)] #[macro_use(o)]
extern crate slog; extern crate slog;
extern crate slog_scope; extern crate slog_scope;
@@ -54,7 +54,7 @@ pub type TransactionFor<A> = Arc<base::Transaction<ExHash<A>, ExtrinsicFor<A>>>;
pub trait ChainApi: Send + Sync { pub trait ChainApi: Send + Sync {
/// Block type. /// Block type.
type Block: traits::Block; type Block: traits::Block;
/// Hash type /// Transaction Hash type
type Hash: hash::Hash + Eq + traits::Member + Serialize; type Hash: hash::Hash + Eq + traits::Member + Serialize;
/// Error type. /// Error type.
type Error: From<error::Error> + error::IntoPoolError; type Error: From<error::Error> + error::IntoPoolError;
-1
View File
@@ -30,7 +30,6 @@ pub extern crate sr_primitives as runtime_primitives;
extern crate srml_metadata; extern crate srml_metadata;
extern crate mashup; extern crate mashup;
#[macro_use] #[macro_use]
extern crate srml_support_procedural; extern crate srml_support_procedural;