Service builder clean-up (#3906)

* Rename NewService to Service

* Move new_impl! macro to builder module

* Inline new_impl!

* Minor cleanup

* Inline the offchain_workers() function

* Fix indentation level

* Inline start_rpc

* Remove RpcBuilder trait
This commit is contained in:
Pierre Krieger
2019-10-24 15:11:38 +02:00
committed by Gavin Wood
parent c5bf9007df
commit 002057dcc5
3 changed files with 400 additions and 579 deletions
+392 -240
View File
@@ -14,9 +14,8 @@
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
use crate::{NewService, NetworkStatus, NetworkState, error::{self, Error}, DEFAULT_PROTOCOL_ID};
use crate::{Service, NetworkStatus, NetworkState, error::{self, Error}, DEFAULT_PROTOCOL_ID};
use crate::{SpawnTaskHandle, start_rpc_servers, build_network_future, TransactionPoolAdapter};
use crate::TaskExecutor;
use crate::status_sinks;
use crate::config::Configuration;
use client::{
@@ -33,13 +32,13 @@ use futures03::{
FutureExt as _, TryFutureExt as _,
StreamExt as _, TryStreamExt as _,
};
use keystore::{Store as Keystore, KeyStorePtr};
use keystore::{Store as Keystore};
use log::{info, warn};
use network::{FinalityProofProvider, OnDemand, NetworkService, NetworkStateInfo, DhtEvent};
use network::{config::BoxFinalityProofRequestBuilder, specialization::NetworkSpecialization};
use parking_lot::{Mutex, RwLock};
use primitives::{Blake2Hasher, H256, Hasher};
use rpc::{self, system::SystemInfo};
use rpc;
use sr_primitives::generic::BlockId;
use sr_primitives::traits::{
Block as BlockT, Extrinsic, ProvideRuntimeApi, NumberFor, One, Zero, Header, SaturatedConversion
@@ -69,7 +68,7 @@ use transaction_pool::txpool::{self, ChainApi, Pool as TransactionPool};
/// generics is done when you call `build`.
///
pub struct ServiceBuilder<TBl, TRtApi, TCfg, TGen, TCSExt, TCl, TFchr, TSc, TImpQu, TFprb, TFpp,
TNetP, TExPool, TRpc, TRpcB, Backend>
TNetP, TExPool, TRpc, Backend>
{
config: Configuration<TCfg, TGen, TCSExt>,
client: Arc<TCl>,
@@ -83,7 +82,7 @@ pub struct ServiceBuilder<TBl, TRtApi, TCfg, TGen, TCSExt, TCl, TFchr, TSc, TImp
network_protocol: TNetP,
transaction_pool: Arc<TExPool>,
rpc_extensions: TRpc,
rpc_builder: TRpcB,
remote_backend: Option<Arc<dyn RemoteBlockchain<TBl>>>,
dht_event_tx: Option<mpsc::Sender<DhtEvent>>,
marker: PhantomData<(TBl, TRtApi)>,
}
@@ -134,7 +133,7 @@ type TLightCallExecutor<TBl, TExecDisp> = client::light::call_executor::GenesisC
>,
>;
impl<TCfg, TGen, TCSExt> ServiceBuilder<(), (), TCfg, TGen, TCSExt, (), (), (), (), (), (), (), (), (), (), ()>
impl<TCfg, TGen, TCSExt> ServiceBuilder<(), (), TCfg, TGen, TCSExt, (), (), (), (), (), (), (), (), (), ()>
where TGen: RuntimeGenesis, TCSExt: Extension {
/// Start the service builder with a configuration.
pub fn new_full<TBl: BlockT<Hash=H256>, TRtApi, TExecDisp: NativeExecutionDispatch>(
@@ -154,7 +153,6 @@ where TGen: RuntimeGenesis, TCSExt: Extension {
(),
(),
(),
FullRpcBuilder<TBl, TRtApi, TExecDisp>,
TFullBackend<TBl>,
>, Error> {
let keystore = Keystore::open(config.keystore_path.clone(), config.keystore_password.clone())?;
@@ -190,8 +188,6 @@ where TGen: RuntimeGenesis, TCSExt: Extension {
let client = Arc::new(client);
let rpc_builder = FullRpcBuilder { client: client.clone() };
Ok(ServiceBuilder {
config,
client,
@@ -205,7 +201,7 @@ where TGen: RuntimeGenesis, TCSExt: Extension {
network_protocol: (),
transaction_pool: Arc::new(()),
rpc_extensions: Default::default(),
rpc_builder,
remote_backend: None,
dht_event_tx: None,
marker: PhantomData,
})
@@ -229,7 +225,6 @@ where TGen: RuntimeGenesis, TCSExt: Extension {
(),
(),
(),
LightRpcBuilder<TBl, TRtApi, TExecDisp>,
TLightBackend<TBl>,
>, Error> {
let keystore = Keystore::open(config.keystore_path.clone(), config.keystore_password.clone())?;
@@ -259,11 +254,6 @@ where TGen: RuntimeGenesis, TCSExt: Extension {
&config.chain_spec,
executor,
)?);
let rpc_builder = LightRpcBuilder {
client: client.clone(),
remote_blockchain,
fetcher: fetcher.clone(),
};
Ok(ServiceBuilder {
config,
@@ -278,16 +268,16 @@ where TGen: RuntimeGenesis, TCSExt: Extension {
network_protocol: (),
transaction_pool: Arc::new(()),
rpc_extensions: Default::default(),
rpc_builder,
remote_backend: Some(remote_blockchain),
dht_event_tx: None,
marker: PhantomData,
})
}
}
impl<TBl, TRtApi, TCfg, TGen, TCSExt, TCl, TFchr, TSc, TImpQu, TFprb, TFpp, TNetP, TExPool, TRpc, TRpcB, Backend>
impl<TBl, TRtApi, TCfg, TGen, TCSExt, TCl, TFchr, TSc, TImpQu, TFprb, TFpp, TNetP, TExPool, TRpc, Backend>
ServiceBuilder<TBl, TRtApi, TCfg, TGen, TCSExt, TCl, TFchr, TSc, TImpQu, TFprb, TFpp,
TNetP, TExPool, TRpc, TRpcB, Backend> {
TNetP, TExPool, TRpc, Backend> {
/// Returns a reference to the client that was stored in this builder.
pub fn client(&self) -> &Arc<TCl> {
@@ -311,7 +301,7 @@ impl<TBl, TRtApi, TCfg, TGen, TCSExt, TCl, TFchr, TSc, TImpQu, TFprb, TFpp, TNet
&Configuration<TCfg, TGen, TCSExt>, &Arc<Backend>
) -> Result<Option<USc>, Error>
) -> Result<ServiceBuilder<TBl, TRtApi, TCfg, TGen, TCSExt, TCl, TFchr, USc, TImpQu, TFprb, TFpp,
TNetP, TExPool, TRpc, TRpcB, Backend>, Error> {
TNetP, TExPool, TRpc, Backend>, Error> {
let select_chain = select_chain_builder(&self.config, &self.backend)?;
Ok(ServiceBuilder {
@@ -327,7 +317,7 @@ impl<TBl, TRtApi, TCfg, TGen, TCSExt, TCl, TFchr, TSc, TImpQu, TFprb, TFpp, TNet
network_protocol: self.network_protocol,
transaction_pool: self.transaction_pool,
rpc_extensions: self.rpc_extensions,
rpc_builder: self.rpc_builder,
remote_backend: self.remote_backend,
dht_event_tx: self.dht_event_tx,
marker: self.marker,
})
@@ -338,7 +328,7 @@ impl<TBl, TRtApi, TCfg, TGen, TCSExt, TCl, TFchr, TSc, TImpQu, TFprb, TFpp, TNet
self,
builder: impl FnOnce(&Configuration<TCfg, TGen, TCSExt>, &Arc<Backend>) -> Result<USc, Error>
) -> Result<ServiceBuilder<TBl, TRtApi, TCfg, TGen, TCSExt, TCl, TFchr, USc, TImpQu, TFprb, TFpp,
TNetP, TExPool, TRpc, TRpcB, Backend>, Error> {
TNetP, TExPool, TRpc, Backend>, Error> {
self.with_opt_select_chain(|cfg, b| builder(cfg, b).map(Option::Some))
}
@@ -348,7 +338,7 @@ impl<TBl, TRtApi, TCfg, TGen, TCSExt, TCl, TFchr, TSc, TImpQu, TFprb, TFpp, TNet
builder: impl FnOnce(&Configuration<TCfg, TGen, TCSExt>, Arc<TCl>, Option<TSc>, Arc<TExPool>)
-> Result<UImpQu, Error>
) -> Result<ServiceBuilder<TBl, TRtApi, TCfg, TGen, TCSExt, TCl, TFchr, TSc, UImpQu, TFprb, TFpp,
TNetP, TExPool, TRpc, TRpcB, Backend>, Error>
TNetP, TExPool, TRpc, Backend>, Error>
where TSc: Clone {
let import_queue = builder(
&self.config,
@@ -370,7 +360,7 @@ impl<TBl, TRtApi, TCfg, TGen, TCSExt, TCl, TFchr, TSc, TImpQu, TFprb, TFpp, TNet
network_protocol: self.network_protocol,
transaction_pool: self.transaction_pool,
rpc_extensions: self.rpc_extensions,
rpc_builder: self.rpc_builder,
remote_backend: self.remote_backend,
dht_event_tx: self.dht_event_tx,
marker: self.marker,
})
@@ -381,7 +371,7 @@ impl<TBl, TRtApi, TCfg, TGen, TCSExt, TCl, TFchr, TSc, TImpQu, TFprb, TFpp, TNet
self,
network_protocol_builder: impl FnOnce(&Configuration<TCfg, TGen, TCSExt>) -> Result<UNetP, Error>
) -> Result<ServiceBuilder<TBl, TRtApi, TCfg, TGen, TCSExt, TCl, TFchr, TSc, TImpQu, TFprb, TFpp,
UNetP, TExPool, TRpc, TRpcB, Backend>, Error> {
UNetP, TExPool, TRpc, Backend>, Error> {
let network_protocol = network_protocol_builder(&self.config)?;
Ok(ServiceBuilder {
@@ -397,7 +387,7 @@ impl<TBl, TRtApi, TCfg, TGen, TCSExt, TCl, TFchr, TSc, TImpQu, TFprb, TFpp, TNet
network_protocol,
transaction_pool: self.transaction_pool,
rpc_extensions: self.rpc_extensions,
rpc_builder: self.rpc_builder,
remote_backend: self.remote_backend,
dht_event_tx: self.dht_event_tx,
marker: self.marker,
})
@@ -422,7 +412,6 @@ impl<TBl, TRtApi, TCfg, TGen, TCSExt, TCl, TFchr, TSc, TImpQu, TFprb, TFpp, TNet
TNetP,
TExPool,
TRpc,
TRpcB,
Backend,
>, Error> {
let finality_proof_provider = builder(self.client.clone(), self.backend.clone())?;
@@ -440,7 +429,7 @@ impl<TBl, TRtApi, TCfg, TGen, TCSExt, TCl, TFchr, TSc, TImpQu, TFprb, TFpp, TNet
network_protocol: self.network_protocol,
transaction_pool: self.transaction_pool,
rpc_extensions: self.rpc_extensions,
rpc_builder: self.rpc_builder,
remote_backend: self.remote_backend,
dht_event_tx: self.dht_event_tx,
marker: self.marker,
})
@@ -465,7 +454,6 @@ impl<TBl, TRtApi, TCfg, TGen, TCSExt, TCl, TFchr, TSc, TImpQu, TFprb, TFpp, TNet
TNetP,
TExPool,
TRpc,
TRpcB,
Backend,
>, Error> {
self.with_opt_finality_proof_provider(|client, backend| build(client, backend).map(Option::Some))
@@ -483,7 +471,7 @@ impl<TBl, TRtApi, TCfg, TGen, TCSExt, TCl, TFchr, TSc, TImpQu, TFprb, TFpp, TNet
Arc<TExPool>,
) -> Result<(UImpQu, Option<UFprb>), Error>
) -> Result<ServiceBuilder<TBl, TRtApi, TCfg, TGen, TCSExt, TCl, TFchr, TSc, UImpQu, UFprb, TFpp,
TNetP, TExPool, TRpc, TRpcB, Backend>, Error>
TNetP, TExPool, TRpc, Backend>, Error>
where TSc: Clone, TFchr: Clone {
let (import_queue, fprb) = builder(
&self.config,
@@ -507,7 +495,7 @@ impl<TBl, TRtApi, TCfg, TGen, TCSExt, TCl, TFchr, TSc, TImpQu, TFprb, TFpp, TNet
network_protocol: self.network_protocol,
transaction_pool: self.transaction_pool,
rpc_extensions: self.rpc_extensions,
rpc_builder: self.rpc_builder,
remote_backend: self.remote_backend,
dht_event_tx: self.dht_event_tx,
marker: self.marker,
})
@@ -525,7 +513,7 @@ impl<TBl, TRtApi, TCfg, TGen, TCSExt, TCl, TFchr, TSc, TImpQu, TFprb, TFpp, TNet
Arc<TExPool>,
) -> Result<(UImpQu, UFprb), Error>
) -> Result<ServiceBuilder<TBl, TRtApi, TCfg, TGen, TCSExt, TCl, TFchr, TSc, UImpQu, UFprb, TFpp,
TNetP, TExPool, TRpc, TRpcB, Backend>, Error>
TNetP, TExPool, TRpc, Backend>, Error>
where TSc: Clone, TFchr: Clone {
self.with_import_queue_and_opt_fprb(|cfg, cl, b, f, sc, tx|
builder(cfg, cl, b, f, sc, tx)
@@ -538,7 +526,7 @@ impl<TBl, TRtApi, TCfg, TGen, TCSExt, TCl, TFchr, TSc, TImpQu, TFprb, TFpp, TNet
self,
transaction_pool_builder: impl FnOnce(transaction_pool::txpool::Options, Arc<TCl>) -> Result<UExPool, Error>
) -> Result<ServiceBuilder<TBl, TRtApi, TCfg, TGen, TCSExt, TCl, TFchr, TSc, TImpQu, TFprb, TFpp,
TNetP, UExPool, TRpc, TRpcB, Backend>, Error> {
TNetP, UExPool, TRpc, Backend>, Error> {
let transaction_pool = transaction_pool_builder(self.config.transaction_pool.clone(), self.client.clone())?;
Ok(ServiceBuilder {
@@ -554,7 +542,7 @@ impl<TBl, TRtApi, TCfg, TGen, TCSExt, TCl, TFchr, TSc, TImpQu, TFprb, TFpp, TNet
network_protocol: self.network_protocol,
transaction_pool: Arc::new(transaction_pool),
rpc_extensions: self.rpc_extensions,
rpc_builder: self.rpc_builder,
remote_backend: self.remote_backend,
dht_event_tx: self.dht_event_tx,
marker: self.marker,
})
@@ -565,7 +553,7 @@ impl<TBl, TRtApi, TCfg, TGen, TCSExt, TCl, TFchr, TSc, TImpQu, TFprb, TFpp, TNet
self,
rpc_ext_builder: impl FnOnce(Arc<TCl>, Arc<TExPool>) -> URpc
) -> Result<ServiceBuilder<TBl, TRtApi, TCfg, TGen, TCSExt, TCl, TFchr, TSc, TImpQu, TFprb, TFpp,
TNetP, TExPool, URpc, TRpcB, Backend>, Error> {
TNetP, TExPool, URpc, Backend>, Error> {
let rpc_extensions = rpc_ext_builder(self.client.clone(), self.transaction_pool.clone());
Ok(ServiceBuilder {
@@ -581,114 +569,36 @@ impl<TBl, TRtApi, TCfg, TGen, TCSExt, TCl, TFchr, TSc, TImpQu, TFprb, TFpp, TNet
network_protocol: self.network_protocol,
transaction_pool: self.transaction_pool,
rpc_extensions,
rpc_builder: self.rpc_builder,
remote_backend: self.remote_backend,
dht_event_tx: self.dht_event_tx,
marker: self.marker,
})
}
/// Adds a dht event sender to builder to be used by the network to send dht events to the authority discovery
/// module.
pub fn with_dht_event_tx(
self,
dht_event_tx: mpsc::Sender<DhtEvent>,
) -> Result<ServiceBuilder<TBl, TRtApi, TCfg, TGen, TCSExt, TCl, TFchr, TSc, TImpQu, TFprb, TFpp,
TNetP, TExPool, TRpc, TRpcB, Backend>, Error> {
Ok(ServiceBuilder {
config: self.config,
client: self.client,
backend: self.backend,
keystore: self.keystore,
fetcher: self.fetcher,
select_chain: self.select_chain,
import_queue: self.import_queue,
finality_proof_request_builder: self.finality_proof_request_builder,
finality_proof_provider: self.finality_proof_provider,
network_protocol: self.network_protocol,
transaction_pool: self.transaction_pool,
rpc_extensions: self.rpc_extensions,
rpc_builder: self.rpc_builder,
dht_event_tx: Some(dht_event_tx),
marker: self.marker,
})
}
}
/// RPC handlers builder.
pub trait RpcBuilder<TBl: BlockT, TBackend, TExec, TRtApi> {
/// Build chain RPC handler.
fn build_chain(&self, subscriptions: rpc::Subscriptions) -> rpc::chain::Chain<TBackend, TExec, TBl, TRtApi>;
/// Build state RPC handler.
fn build_state(&self, subscriptions: rpc::Subscriptions) -> rpc::state::State<TBackend, TExec, TBl, TRtApi>;
}
/// RPC handlers builder for full nodes.
pub struct FullRpcBuilder<TBl: BlockT, TRtApi, TExecDisp> {
client: Arc<TFullClient<TBl, TRtApi, TExecDisp>>,
}
impl<TBl, TRtApi, TExecDisp> RpcBuilder<TBl, TFullBackend<TBl>, TFullCallExecutor<TBl, TExecDisp>, TRtApi>
for
FullRpcBuilder<TBl, TRtApi, TExecDisp>
where
TBl: BlockT<Hash=H256>,
TRtApi: 'static + Send + Sync,
TExecDisp: 'static + NativeExecutionDispatch,
TFullClient<TBl, TRtApi, TExecDisp>: ProvideRuntimeApi,
<TFullClient<TBl, TRtApi, TExecDisp> as ProvideRuntimeApi>::Api: runtime_api::Metadata<TBl>,
{
fn build_chain(
&self,
subscriptions: rpc::Subscriptions,
) -> rpc::chain::Chain<TFullBackend<TBl>, TFullCallExecutor<TBl, TExecDisp>, TBl, TRtApi> {
rpc::chain::new_full(self.client.clone(), subscriptions)
}
fn build_state(
&self,
subscriptions: rpc::Subscriptions,
) -> rpc::state::State<TFullBackend<TBl>, TFullCallExecutor<TBl, TExecDisp>, TBl, TRtApi> {
rpc::state::new_full(self.client.clone(), subscriptions)
}
}
/// RPC handlers builder for light nodes.
pub struct LightRpcBuilder<TBl: BlockT<Hash=H256>, TRtApi, TExecDisp> {
client: Arc<TLightClient<TBl, TRtApi, TExecDisp>>,
remote_blockchain: Arc<dyn RemoteBlockchain<TBl>>,
fetcher: Arc<network::OnDemand<TBl>>,
}
impl<TBl, TRtApi, TExecDisp> RpcBuilder<TBl, TLightBackend<TBl>, TLightCallExecutor<TBl, TExecDisp>, TRtApi>
for
LightRpcBuilder<TBl, TRtApi, TExecDisp>
where
TBl: BlockT<Hash=H256>,
TRtApi: 'static + Send + Sync,
TExecDisp: 'static + NativeExecutionDispatch,
{
fn build_chain(
&self,
subscriptions: rpc::Subscriptions,
) -> rpc::chain::Chain<TLightBackend<TBl>, TLightCallExecutor<TBl, TExecDisp>, TBl, TRtApi> {
rpc::chain::new_light(
self.client.clone(),
subscriptions,
self.remote_blockchain.clone(),
self.fetcher.clone(),
)
}
fn build_state(
&self,
subscriptions: rpc::Subscriptions,
) -> rpc::state::State<TLightBackend<TBl>, TLightCallExecutor<TBl, TExecDisp>, TBl, TRtApi> {
rpc::state::new_light(
self.client.clone(),
subscriptions,
self.remote_blockchain.clone(),
self.fetcher.clone(),
)
/// Adds a dht event sender to builder to be used by the network to send dht events to the authority discovery
/// module.
pub fn with_dht_event_tx(
self,
dht_event_tx: mpsc::Sender<DhtEvent>,
) -> Result<ServiceBuilder<TBl, TRtApi, TCfg, TGen, TCSExt, TCl, TFchr, TSc, TImpQu, TFprb, TFpp,
TNetP, TExPool, TRpc, Backend>, Error> {
Ok(ServiceBuilder {
config: self.config,
client: self.client,
backend: self.backend,
keystore: self.keystore,
fetcher: self.fetcher,
select_chain: self.select_chain,
import_queue: self.import_queue,
finality_proof_request_builder: self.finality_proof_request_builder,
finality_proof_provider: self.finality_proof_provider,
network_protocol: self.network_protocol,
transaction_pool: self.transaction_pool,
rpc_extensions: self.rpc_extensions,
remote_backend: self.remote_backend,
dht_event_tx: Some(dht_event_tx),
marker: self.marker,
})
}
}
@@ -736,10 +646,10 @@ pub trait ServiceBuilderRevert {
impl<
TBl, TRtApi, TCfg, TGen, TCSExt, TBackend,
TExec, TFchr, TSc, TImpQu, TFprb, TFpp, TNetP,
TExPool, TRpc, TRpcB, Backend
TExPool, TRpc, Backend
> ServiceBuilderImport for ServiceBuilder<
TBl, TRtApi, TCfg, TGen, TCSExt, Client<TBackend, TExec, TBl, TRtApi>,
TFchr, TSc, TImpQu, TFprb, TFpp, TNetP, TExPool, TRpc, TRpcB, Backend
TFchr, TSc, TImpQu, TFprb, TFpp, TNetP, TExPool, TRpc, Backend
> where
TBl: BlockT<Hash = <Blake2Hasher as Hasher>::Out>,
TBackend: 'static + client::backend::Backend<TBl, Blake2Hasher> + Send,
@@ -759,9 +669,9 @@ impl<
}
}
impl<TBl, TRtApi, TCfg, TGen, TCSExt, TBackend, TExec, TFchr, TSc, TImpQu, TFprb, TFpp, TNetP, TExPool, TRpc, TRpcB>
impl<TBl, TRtApi, TCfg, TGen, TCSExt, TBackend, TExec, TFchr, TSc, TImpQu, TFprb, TFpp, TNetP, TExPool, TRpc>
ServiceBuilderExport for ServiceBuilder<TBl, TRtApi, TCfg, TGen, TCSExt, Client<TBackend, TExec, TBl, TRtApi>,
TFchr, TSc, TImpQu, TFprb, TFpp, TNetP, TExPool, TRpc, TRpcB, TBackend>
TFchr, TSc, TImpQu, TFprb, TFpp, TNetP, TExPool, TRpc, TBackend>
where
TBl: BlockT<Hash = <Blake2Hasher as Hasher>::Out>,
TBackend: 'static + client::backend::Backend<TBl, Blake2Hasher> + Send,
@@ -782,9 +692,9 @@ where
}
}
impl<TBl, TRtApi, TCfg, TGen, TCSExt, TBackend, TExec, TFchr, TSc, TImpQu, TFprb, TFpp, TNetP, TExPool, TRpc, TRpcB>
impl<TBl, TRtApi, TCfg, TGen, TCSExt, TBackend, TExec, TFchr, TSc, TImpQu, TFprb, TFpp, TNetP, TExPool, TRpc>
ServiceBuilderRevert for ServiceBuilder<TBl, TRtApi, TCfg, TGen, TCSExt, Client<TBackend, TExec, TBl, TRtApi>,
TFchr, TSc, TImpQu, TFprb, TFpp, TNetP, TExPool, TRpc, TRpcB, TBackend>
TFchr, TSc, TImpQu, TFprb, TFpp, TNetP, TExPool, TRpc, TBackend>
where
TBl: BlockT<Hash = <Blake2Hasher as Hasher>::Out>,
TBackend: 'static + client::backend::Backend<TBl, Blake2Hasher> + Send,
@@ -801,7 +711,7 @@ where
}
}
impl<TBl, TRtApi, TCfg, TGen, TCSExt, TBackend, TExec, TSc, TImpQu, TNetP, TExPoolApi, TRpc, TRpcB>
impl<TBl, TRtApi, TCfg, TGen, TCSExt, TBackend, TExec, TSc, TImpQu, TNetP, TExPoolApi, TRpc>
ServiceBuilder<
TBl,
TRtApi,
@@ -817,7 +727,6 @@ ServiceBuilder<
TNetP,
TransactionPool<TExPoolApi>,
TRpc,
TRpcB,
TBackend,
> where
Client<TBackend, TExec, TBl, TRtApi>: ProvideRuntimeApi,
@@ -838,10 +747,9 @@ ServiceBuilder<
TNetP: NetworkSpecialization<TBl>,
TExPoolApi: 'static + ChainApi<Block = TBl, Hash = <TBl as BlockT>::Hash>,
TRpc: rpc::RpcExtension<rpc::Metadata> + Clone,
TRpcB: RpcBuilder<TBl, TBackend, TExec, TRtApi>,
{
/// Builds the service.
pub fn build(self) -> Result<NewService<
pub fn build(self) -> Result<Service<
TBl,
Client<TBackend, TExec, TBl, TRtApi>,
TSc,
@@ -858,7 +766,7 @@ ServiceBuilder<
marker: _,
mut config,
client,
fetcher,
fetcher: on_demand,
backend,
keystore,
select_chain,
@@ -868,8 +776,8 @@ ServiceBuilder<
network_protocol,
transaction_pool,
rpc_extensions,
remote_backend,
dht_event_tx,
rpc_builder,
} = self;
session::generate_initial_session_keys(
@@ -877,74 +785,344 @@ ServiceBuilder<
config.dev_key_seed.clone().map(|s| vec![s]).unwrap_or_default()
)?;
new_impl!(
TBl,
config,
move |_| -> Result<_, Error> {
Ok((
client,
fetcher,
backend,
keystore,
select_chain,
import_queue,
finality_proof_request_builder,
finality_proof_provider,
network_protocol,
transaction_pool,
rpc_extensions,
dht_event_tx,
))
let (signal, exit) = exit_future::signal();
// List of asynchronous tasks to spawn. We collect them, then spawn them all at once.
let (to_spawn_tx, to_spawn_rx) =
mpsc::unbounded::<Box<dyn Future<Item = (), Error = ()> + Send>>();
let import_queue = Box::new(import_queue);
let chain_info = client.info().chain;
let version = config.full_version();
info!("Highest known block at #{}", chain_info.best_number);
telemetry!(
SUBSTRATE_INFO;
"node.start";
"height" => chain_info.best_number.saturated_into::<u64>(),
"best" => ?chain_info.best_hash
);
let transaction_pool_adapter = Arc::new(TransactionPoolAdapter {
imports_external_transactions: !config.roles.is_light(),
pool: transaction_pool.clone(),
client: client.clone(),
executor: Arc::new(SpawnTaskHandle { sender: to_spawn_tx.clone(), on_exit: exit.clone() }),
});
let protocol_id = {
let protocol_id_full = match config.chain_spec.protocol_id() {
Some(pid) => pid,
None => {
warn!("Using default protocol ID {:?} because none is configured in the \
chain specs", DEFAULT_PROTOCOL_ID
);
DEFAULT_PROTOCOL_ID
}
}.as_bytes();
network::config::ProtocolId::from(protocol_id_full)
};
let block_announce_validator =
Box::new(consensus_common::block_validation::DefaultBlockAnnounceValidator::new(client.clone()));
let network_params = network::config::Params {
roles: config.roles,
network_config: config.network.clone(),
chain: client.clone(),
finality_proof_provider,
finality_proof_request_builder,
on_demand: on_demand.clone(),
transaction_pool: transaction_pool_adapter.clone() as _,
import_queue,
protocol_id,
specialization: network_protocol,
block_announce_validator,
};
let has_bootnodes = !network_params.network_config.boot_nodes.is_empty();
let network_mut = network::NetworkWorker::new(network_params)?;
let network = network_mut.service().clone();
let network_status_sinks = Arc::new(Mutex::new(status_sinks::StatusSinks::new()));
let offchain_storage = backend.offchain_storage();
let offchain_workers = match (config.offchain_worker, offchain_storage) {
(true, Some(db)) => {
Some(Arc::new(offchain::OffchainWorkers::new(client.clone(), db)))
},
|h, c, tx, r| maintain_transaction_pool(h, c, tx, r),
|n, o, p, ns, v| offchain_workers(n, o, p, ns, v),
|c, ssb, si, te, tp, ext, ks| start_rpc(&rpc_builder, c, ssb, si, te, tp, ext, ks),
(true, None) => {
log::warn!("Offchain workers disabled, due to lack of offchain storage support in backend.");
None
},
_ => None,
};
{
// block notifications
let txpool = Arc::downgrade(&transaction_pool);
let wclient = Arc::downgrade(&client);
let offchain = offchain_workers.as_ref().map(Arc::downgrade);
let to_spawn_tx_ = to_spawn_tx.clone();
let network_state_info: Arc<dyn NetworkStateInfo + Send + Sync> = network.clone();
let is_validator = config.roles.is_authority();
let events = client.import_notification_stream()
.map(|v| Ok::<_, ()>(v)).compat()
.for_each(move |notification| {
let number = *notification.header.number();
let txpool = txpool.upgrade();
if let (Some(txpool), Some(client)) = (txpool.as_ref(), wclient.upgrade()) {
let future = maintain_transaction_pool(
&BlockId::hash(notification.hash),
&client,
&*txpool,
&notification.retracted,
).map_err(|e| warn!("Pool error processing new block: {:?}", e))?;
let _ = to_spawn_tx_.unbounded_send(future);
}
let offchain = offchain.as_ref().and_then(|o| o.upgrade());
if let (Some(txpool), Some(offchain)) = (txpool, offchain) {
let future = offchain.on_block_imported(&number, &txpool, network_state_info.clone(), is_validator)
.map(|()| Ok(()));
let _ = to_spawn_tx_.unbounded_send(Box::new(Compat::new(future)));
}
Ok(())
})
.select(exit.clone())
.then(|_| Ok(()));
let _ = to_spawn_tx.unbounded_send(Box::new(events));
}
{
// extrinsic notifications
let network = Arc::downgrade(&network);
let transaction_pool_ = transaction_pool.clone();
let events = transaction_pool.import_notification_stream()
.map(|v| Ok::<_, ()>(v)).compat()
.for_each(move |_| {
if let Some(network) = network.upgrade() {
network.trigger_repropagate();
}
let status = transaction_pool_.status();
telemetry!(SUBSTRATE_INFO; "txpool.import";
"ready" => status.ready,
"future" => status.future
);
Ok(())
})
.select(exit.clone())
.then(|_| Ok(()));
let _ = to_spawn_tx.unbounded_send(Box::new(events));
}
// Periodically notify the telemetry.
let transaction_pool_ = transaction_pool.clone();
let client_ = client.clone();
let mut sys = System::new();
let self_pid = get_current_pid().ok();
let (state_tx, state_rx) = mpsc::unbounded::<(NetworkStatus<_>, NetworkState)>();
network_status_sinks.lock().push(std::time::Duration::from_millis(5000), state_tx);
let tel_task = state_rx.for_each(move |(net_status, _)| {
let info = client_.info();
let best_number = info.chain.best_number.saturated_into::<u64>();
let best_hash = info.chain.best_hash;
let num_peers = net_status.num_connected_peers;
let txpool_status = transaction_pool_.status();
let finalized_number: u64 = info.chain.finalized_number.saturated_into::<u64>();
let bandwidth_download = net_status.average_download_per_sec;
let bandwidth_upload = net_status.average_upload_per_sec;
let used_state_cache_size = match info.used_state_cache_size {
Some(size) => size,
None => 0,
};
// get cpu usage and memory usage of this process
let (cpu_usage, memory) = if let Some(self_pid) = self_pid {
if sys.refresh_process(self_pid) {
let proc = sys.get_process(self_pid)
.expect("Above refresh_process succeeds, this should be Some(), qed");
(proc.cpu_usage(), proc.memory())
} else { (0.0, 0) }
} else { (0.0, 0) };
telemetry!(
SUBSTRATE_INFO;
"system.interval";
"peers" => num_peers,
"height" => best_number,
"best" => ?best_hash,
"txcount" => txpool_status.ready,
"cpu" => cpu_usage,
"memory" => memory,
"finalized_height" => finalized_number,
"finalized_hash" => ?info.chain.finalized_hash,
"bandwidth_download" => bandwidth_download,
"bandwidth_upload" => bandwidth_upload,
"used_state_cache_size" => used_state_cache_size,
);
Ok(())
}).select(exit.clone()).then(|_| Ok(()));
let _ = to_spawn_tx.unbounded_send(Box::new(tel_task));
// Periodically send the network state to the telemetry.
let (netstat_tx, netstat_rx) = mpsc::unbounded::<(NetworkStatus<_>, NetworkState)>();
network_status_sinks.lock().push(std::time::Duration::from_secs(30), netstat_tx);
let tel_task_2 = netstat_rx.for_each(move |(_, network_state)| {
telemetry!(
SUBSTRATE_INFO;
"system.network_state";
"state" => network_state,
);
Ok(())
}).select(exit.clone()).then(|_| Ok(()));
let _ = to_spawn_tx.unbounded_send(Box::new(tel_task_2));
// RPC
let (system_rpc_tx, system_rpc_rx) = futures03::channel::mpsc::unbounded();
let gen_handler = || {
use rpc::{chain, state, author, system};
let system_info = rpc::system::SystemInfo {
chain_name: config.chain_spec.name().into(),
impl_name: config.impl_name.into(),
impl_version: config.impl_version.into(),
properties: config.chain_spec.properties().clone(),
};
let subscriptions = rpc::Subscriptions::new(Arc::new(SpawnTaskHandle {
sender: to_spawn_tx.clone(),
on_exit: exit.clone()
}));
let (chain, state) = if let (Some(remote_backend), Some(on_demand)) =
(remote_backend.as_ref(), on_demand.as_ref()) {
// Light clients
let chain = rpc::chain::new_light(
client.clone(),
subscriptions.clone(),
remote_backend.clone(),
on_demand.clone()
);
let state = rpc::state::new_light(
client.clone(),
subscriptions.clone(),
remote_backend.clone(),
on_demand.clone()
);
(chain, state)
} else {
// Full nodes
let chain = rpc::chain::new_full(client.clone(), subscriptions.clone());
let state = rpc::state::new_full(client.clone(), subscriptions.clone());
(chain, state)
};
let author = rpc::author::Author::new(
client.clone(),
transaction_pool.clone(),
subscriptions,
keystore.clone(),
);
let system = system::System::new(system_info, system_rpc_tx.clone());
rpc_servers::rpc_handler((
state::StateApi::to_delegate(state),
chain::ChainApi::to_delegate(chain),
author::AuthorApi::to_delegate(author),
system::SystemApi::to_delegate(system),
rpc_extensions.clone(),
))
};
let rpc_handlers = gen_handler();
let rpc = start_rpc_servers(&config, gen_handler)?;
let _ = to_spawn_tx.unbounded_send(Box::new(build_network_future(
config.roles,
network_mut,
client.clone(),
network_status_sinks.clone(),
system_rpc_rx,
has_bootnodes,
dht_event_tx,
)
.map_err(|_| ())
.select(exit.clone())
.then(|_| Ok(()))));
let telemetry_connection_sinks: Arc<Mutex<Vec<mpsc::UnboundedSender<()>>>> = Default::default();
// Telemetry
let telemetry = config.telemetry_endpoints.clone().map(|endpoints| {
let is_authority = config.roles.is_authority();
let network_id = network.local_peer_id().to_base58();
let name = config.name.clone();
let impl_name = config.impl_name.to_owned();
let version = version.clone();
let chain_name = config.chain_spec.name().to_owned();
let telemetry_connection_sinks_ = telemetry_connection_sinks.clone();
let telemetry = tel::init_telemetry(tel::TelemetryConfig {
endpoints,
wasm_external_transport: config.telemetry_external_transport.take(),
});
let future = telemetry.clone()
.map(|ev| Ok::<_, ()>(ev))
.compat()
.for_each(move |event| {
// Safe-guard in case we add more events in the future.
let tel::TelemetryEvent::Connected = event;
telemetry!(SUBSTRATE_INFO; "system.connected";
"name" => name.clone(),
"implementation" => impl_name.clone(),
"version" => version.clone(),
"config" => "",
"chain" => chain_name.clone(),
"authority" => is_authority,
"network_id" => network_id.clone()
);
telemetry_connection_sinks_.lock().retain(|sink| {
sink.unbounded_send(()).is_ok()
});
Ok(())
});
let _ = to_spawn_tx.unbounded_send(Box::new(future
.select(exit.clone())
.then(|_| Ok(()))));
telemetry
});
Ok(Service {
client,
network,
network_status_sinks,
select_chain,
transaction_pool,
exit,
signal: Some(signal),
essential_failed: Arc::new(AtomicBool::new(false)),
to_spawn_tx,
to_spawn_rx,
to_poll: Vec::new(),
rpc_handlers,
_rpc: rpc,
_telemetry: telemetry,
_offchain_workers: offchain_workers,
_telemetry_on_connect_sinks: telemetry_connection_sinks.clone(),
keystore,
marker: PhantomData::<TBl>,
})
}
}
pub(crate) fn start_rpc<Api, Backend, Block, Executor, PoolApi, RpcB>(
rpc_builder: &RpcB,
client: Arc<Client<Backend, Executor, Block, Api>>,
system_send_back: futures03::channel::mpsc::UnboundedSender<rpc::system::Request<Block>>,
rpc_system_info: SystemInfo,
task_executor: TaskExecutor,
transaction_pool: Arc<TransactionPool<PoolApi>>,
rpc_extensions: impl rpc::RpcExtension<rpc::Metadata>,
keystore: KeyStorePtr,
) -> rpc_servers::RpcHandler<rpc::Metadata>
where
Block: BlockT<Hash = <Blake2Hasher as primitives::Hasher>::Out>,
Backend: client::backend::Backend<Block, Blake2Hasher> + 'static,
Client<Backend, Executor, Block, Api>: ProvideRuntimeApi,
<Client<Backend, Executor, Block, Api> as ProvideRuntimeApi>::Api:
runtime_api::Metadata<Block> + session::SessionKeys<Block>,
Api: Send + Sync + 'static,
Executor: client::CallExecutor<Block, Blake2Hasher> + Send + Sync + Clone + 'static,
PoolApi: txpool::ChainApi<Hash = Block::Hash, Block = Block> + 'static,
RpcB: RpcBuilder<Block, Backend, Executor, Api>,
{
use rpc::{chain, state, author, system};
let subscriptions = rpc::Subscriptions::new(task_executor);
let chain = rpc_builder.build_chain(subscriptions.clone());
let state = rpc_builder.build_state(subscriptions.clone());
let author = rpc::author::Author::new(
client,
transaction_pool,
subscriptions,
keystore,
);
let system = system::System::new(rpc_system_info, system_send_back);
rpc_servers::rpc_handler((
state::StateApi::to_delegate(state),
chain::ChainApi::to_delegate(chain),
author::AuthorApi::to_delegate(author),
system::SystemApi::to_delegate(system),
rpc_extensions,
))
}
pub(crate) fn maintain_transaction_pool<Api, Backend, Block, Executor, PoolApi>(
id: &BlockId<Block>,
client: &Arc<Client<Backend, Executor, Block, Api>>,
@@ -997,32 +1175,6 @@ pub(crate) fn maintain_transaction_pool<Api, Backend, Block, Executor, PoolApi>(
})
}
pub(crate) fn offchain_workers<Api, Backend, Block, Executor, PoolApi>(
number: &NumberFor<Block>,
offchain: &offchain::OffchainWorkers<
Client<Backend, Executor, Block, Api>,
<Backend as client::backend::Backend<Block, Blake2Hasher>>::OffchainStorage,
Block
>,
pool: &Arc<TransactionPool<PoolApi>>,
network_state: &Arc<dyn NetworkStateInfo + Send + Sync>,
is_validator: bool,
) -> error::Result<Box<dyn Future<Item = (), Error = ()> + Send>>
where
Block: BlockT<Hash = <Blake2Hasher as primitives::Hasher>::Out>,
Backend: client::backend::Backend<Block, Blake2Hasher> + 'static,
Api: 'static,
<Backend as client::backend::Backend<Block, Blake2Hasher>>::OffchainStorage: 'static,
Client<Backend, Executor, Block, Api>: ProvideRuntimeApi + Send + Sync,
<Client<Backend, Executor, Block, Api> as ProvideRuntimeApi>::Api: offchain::OffchainWorkerApi<Block>,
Executor: client::CallExecutor<Block, Blake2Hasher> + 'static,
PoolApi: txpool::ChainApi<Hash = Block::Hash, Block = Block> + 'static,
{
let future = offchain.on_block_imported(number, pool, network_state.clone(), is_validator)
.map(|()| Ok(()));
Ok(Box::new(Compat::new(future)))
}
#[cfg(test)]
mod tests {
use super::*;
+6 -337
View File
@@ -24,6 +24,7 @@ pub mod config;
pub mod chain_ops;
pub mod error;
mod builder;
mod status_sinks;
use std::io;
@@ -71,7 +72,7 @@ pub use futures::future::Executor;
const DEFAULT_PROTOCOL_ID: &str = "sup";
/// Substrate service.
pub struct NewService<TBl, TCl, TSc, TNetStatus, TNet, TTxPool, TOc> {
pub struct Service<TBl, TCl, TSc, TNetStatus, TNet, TTxPool, TOc> {
client: Arc<TCl>,
select_chain: Option<TSc>,
network: Arc<TNet>,
@@ -129,338 +130,6 @@ impl Executor<Box<dyn Future<Item = (), Error = ()> + Send>> for SpawnTaskHandle
}
}
macro_rules! new_impl {
(
$block:ty,
$config:ident,
$build_components:expr,
$maintain_transaction_pool:expr,
$offchain_workers:expr,
$start_rpc:expr,
) => {{
let (signal, exit) = exit_future::signal();
// List of asynchronous tasks to spawn. We collect them, then spawn them all at once.
let (to_spawn_tx, to_spawn_rx) =
mpsc::unbounded::<Box<dyn Future<Item = (), Error = ()> + Send>>();
// Create all the components.
let (
client,
on_demand,
backend,
keystore,
select_chain,
import_queue,
finality_proof_request_builder,
finality_proof_provider,
network_protocol,
transaction_pool,
rpc_extensions,
dht_event_tx,
) = $build_components(&$config)?;
let import_queue = Box::new(import_queue);
let chain_info = client.info().chain;
let version = $config.full_version();
info!("Highest known block at #{}", chain_info.best_number);
telemetry!(
SUBSTRATE_INFO;
"node.start";
"height" => chain_info.best_number.saturated_into::<u64>(),
"best" => ?chain_info.best_hash
);
let transaction_pool_adapter = Arc::new(TransactionPoolAdapter {
imports_external_transactions: !$config.roles.is_light(),
pool: transaction_pool.clone(),
client: client.clone(),
executor: Arc::new(SpawnTaskHandle { sender: to_spawn_tx.clone(), on_exit: exit.clone() }),
});
let protocol_id = {
let protocol_id_full = match $config.chain_spec.protocol_id() {
Some(pid) => pid,
None => {
warn!("Using default protocol ID {:?} because none is configured in the \
chain specs", DEFAULT_PROTOCOL_ID
);
DEFAULT_PROTOCOL_ID
}
}.as_bytes();
network::config::ProtocolId::from(protocol_id_full)
};
let block_announce_validator =
Box::new(consensus_common::block_validation::DefaultBlockAnnounceValidator::new(client.clone()));
let network_params = network::config::Params {
roles: $config.roles,
network_config: $config.network.clone(),
chain: client.clone(),
finality_proof_provider,
finality_proof_request_builder,
on_demand,
transaction_pool: transaction_pool_adapter.clone() as _,
import_queue,
protocol_id,
specialization: network_protocol,
block_announce_validator,
};
let has_bootnodes = !network_params.network_config.boot_nodes.is_empty();
let network_mut = network::NetworkWorker::new(network_params)?;
let network = network_mut.service().clone();
let network_status_sinks = Arc::new(Mutex::new(status_sinks::StatusSinks::new()));
let offchain_storage = backend.offchain_storage();
let offchain_workers = match ($config.offchain_worker, offchain_storage) {
(true, Some(db)) => {
Some(Arc::new(offchain::OffchainWorkers::new(client.clone(), db)))
},
(true, None) => {
log::warn!("Offchain workers disabled, due to lack of offchain storage support in backend.");
None
},
_ => None,
};
{
// block notifications
let txpool = Arc::downgrade(&transaction_pool);
let wclient = Arc::downgrade(&client);
let offchain = offchain_workers.as_ref().map(Arc::downgrade);
let to_spawn_tx_ = to_spawn_tx.clone();
let network_state_info: Arc<dyn NetworkStateInfo + Send + Sync> = network.clone();
let is_validator = $config.roles.is_authority();
let events = client.import_notification_stream()
.map(|v| Ok::<_, ()>(v)).compat()
.for_each(move |notification| {
let number = *notification.header.number();
let txpool = txpool.upgrade();
if let (Some(txpool), Some(client)) = (txpool.as_ref(), wclient.upgrade()) {
let future = $maintain_transaction_pool(
&BlockId::hash(notification.hash),
&client,
&*txpool,
&notification.retracted,
).map_err(|e| warn!("Pool error processing new block: {:?}", e))?;
let _ = to_spawn_tx_.unbounded_send(future);
}
let offchain = offchain.as_ref().and_then(|o| o.upgrade());
if let (Some(txpool), Some(offchain)) = (txpool, offchain) {
let future = $offchain_workers(
&number,
&offchain,
&txpool,
&network_state_info,
is_validator,
).map_err(|e| warn!("Offchain workers error processing new block: {:?}", e))?;
let _ = to_spawn_tx_.unbounded_send(future);
}
Ok(())
})
.select(exit.clone())
.then(|_| Ok(()));
let _ = to_spawn_tx.unbounded_send(Box::new(events));
}
{
// extrinsic notifications
let network = Arc::downgrade(&network);
let transaction_pool_ = transaction_pool.clone();
let events = transaction_pool.import_notification_stream()
.map(|v| Ok::<_, ()>(v)).compat()
.for_each(move |_| {
if let Some(network) = network.upgrade() {
network.trigger_repropagate();
}
let status = transaction_pool_.status();
telemetry!(SUBSTRATE_INFO; "txpool.import";
"ready" => status.ready,
"future" => status.future
);
Ok(())
})
.select(exit.clone())
.then(|_| Ok(()));
let _ = to_spawn_tx.unbounded_send(Box::new(events));
}
// Periodically notify the telemetry.
let transaction_pool_ = transaction_pool.clone();
let client_ = client.clone();
let mut sys = System::new();
let self_pid = get_current_pid().ok();
let (state_tx, state_rx) = mpsc::unbounded::<(NetworkStatus<_>, NetworkState)>();
network_status_sinks.lock().push(std::time::Duration::from_millis(5000), state_tx);
let tel_task = state_rx.for_each(move |(net_status, _)| {
let info = client_.info();
let best_number = info.chain.best_number.saturated_into::<u64>();
let best_hash = info.chain.best_hash;
let num_peers = net_status.num_connected_peers;
let txpool_status = transaction_pool_.status();
let finalized_number: u64 = info.chain.finalized_number.saturated_into::<u64>();
let bandwidth_download = net_status.average_download_per_sec;
let bandwidth_upload = net_status.average_upload_per_sec;
let used_state_cache_size = match info.used_state_cache_size {
Some(size) => size,
None => 0,
};
// get cpu usage and memory usage of this process
let (cpu_usage, memory) = if let Some(self_pid) = self_pid {
if sys.refresh_process(self_pid) {
let proc = sys.get_process(self_pid)
.expect("Above refresh_process succeeds, this should be Some(), qed");
(proc.cpu_usage(), proc.memory())
} else { (0.0, 0) }
} else { (0.0, 0) };
telemetry!(
SUBSTRATE_INFO;
"system.interval";
"peers" => num_peers,
"height" => best_number,
"best" => ?best_hash,
"txcount" => txpool_status.ready,
"cpu" => cpu_usage,
"memory" => memory,
"finalized_height" => finalized_number,
"finalized_hash" => ?info.chain.finalized_hash,
"bandwidth_download" => bandwidth_download,
"bandwidth_upload" => bandwidth_upload,
"used_state_cache_size" => used_state_cache_size,
);
Ok(())
}).select(exit.clone()).then(|_| Ok(()));
let _ = to_spawn_tx.unbounded_send(Box::new(tel_task));
// Periodically send the network state to the telemetry.
let (netstat_tx, netstat_rx) = mpsc::unbounded::<(NetworkStatus<_>, NetworkState)>();
network_status_sinks.lock().push(std::time::Duration::from_secs(30), netstat_tx);
let tel_task_2 = netstat_rx.for_each(move |(_, network_state)| {
telemetry!(
SUBSTRATE_INFO;
"system.network_state";
"state" => network_state,
);
Ok(())
}).select(exit.clone()).then(|_| Ok(()));
let _ = to_spawn_tx.unbounded_send(Box::new(tel_task_2));
// RPC
let (system_rpc_tx, system_rpc_rx) = futures03::channel::mpsc::unbounded();
let gen_handler = || {
let system_info = rpc::system::SystemInfo {
chain_name: $config.chain_spec.name().into(),
impl_name: $config.impl_name.into(),
impl_version: $config.impl_version.into(),
properties: $config.chain_spec.properties().clone(),
};
$start_rpc(
client.clone(),
//light_components.clone(),
system_rpc_tx.clone(),
system_info.clone(),
Arc::new(SpawnTaskHandle { sender: to_spawn_tx.clone(), on_exit: exit.clone() }),
transaction_pool.clone(),
rpc_extensions.clone(),
keystore.clone(),
)
};
let rpc_handlers = gen_handler();
let rpc = start_rpc_servers(&$config, gen_handler)?;
let _ = to_spawn_tx.unbounded_send(Box::new(build_network_future(
$config.roles,
network_mut,
client.clone(),
network_status_sinks.clone(),
system_rpc_rx,
has_bootnodes,
dht_event_tx,
)
.map_err(|_| ())
.select(exit.clone())
.then(|_| Ok(()))));
let telemetry_connection_sinks: Arc<Mutex<Vec<mpsc::UnboundedSender<()>>>> = Default::default();
// Telemetry
let telemetry = $config.telemetry_endpoints.clone().map(|endpoints| {
let is_authority = $config.roles.is_authority();
let network_id = network.local_peer_id().to_base58();
let name = $config.name.clone();
let impl_name = $config.impl_name.to_owned();
let version = version.clone();
let chain_name = $config.chain_spec.name().to_owned();
let telemetry_connection_sinks_ = telemetry_connection_sinks.clone();
let telemetry = tel::init_telemetry(tel::TelemetryConfig {
endpoints,
wasm_external_transport: $config.telemetry_external_transport.take(),
});
let future = telemetry.clone()
.map(|ev| Ok::<_, ()>(ev))
.compat()
.for_each(move |event| {
// Safe-guard in case we add more events in the future.
let tel::TelemetryEvent::Connected = event;
telemetry!(SUBSTRATE_INFO; "system.connected";
"name" => name.clone(),
"implementation" => impl_name.clone(),
"version" => version.clone(),
"config" => "",
"chain" => chain_name.clone(),
"authority" => is_authority,
"network_id" => network_id.clone()
);
telemetry_connection_sinks_.lock().retain(|sink| {
sink.unbounded_send(()).is_ok()
});
Ok(())
});
let _ = to_spawn_tx.unbounded_send(Box::new(future
.select(exit.clone())
.then(|_| Ok(()))));
telemetry
});
Ok(NewService {
client,
network,
network_status_sinks,
select_chain,
transaction_pool,
exit,
signal: Some(signal),
essential_failed: Arc::new(AtomicBool::new(false)),
to_spawn_tx,
to_spawn_rx,
to_poll: Vec::new(),
rpc_handlers,
_rpc: rpc,
_telemetry: telemetry,
_offchain_workers: offchain_workers,
_telemetry_on_connect_sinks: telemetry_connection_sinks.clone(),
keystore,
marker: PhantomData::<$block>,
})
}}
}
mod builder;
/// Abstraction over a Substrate service.
pub trait AbstractService: 'static + Future<Item = (), Error = Error> +
Executor<Box<dyn Future<Item = (), Error = ()> + Send>> + Send {
@@ -530,7 +199,7 @@ pub trait AbstractService: 'static + Future<Item = (), Error = Error> +
}
impl<TBl, TBackend, TExec, TRtApi, TSc, TNetSpec, TExPoolApi, TOc> AbstractService for
NewService<TBl, Client<TBackend, TExec, TBl, TRtApi>, TSc, NetworkStatus<TBl>,
Service<TBl, Client<TBackend, TExec, TBl, TRtApi>, TSc, NetworkStatus<TBl>,
NetworkService<TBl, TNetSpec, H256>, TransactionPool<TExPoolApi>, TOc>
where
TBl: BlockT<Hash = H256>,
@@ -619,7 +288,7 @@ where
}
impl<TBl, TCl, TSc, TNetStatus, TNet, TTxPool, TOc> Future for
NewService<TBl, TCl, TSc, TNetStatus, TNet, TTxPool, TOc>
Service<TBl, TCl, TSc, TNetStatus, TNet, TTxPool, TOc>
{
type Item = ();
type Error = Error;
@@ -652,7 +321,7 @@ impl<TBl, TCl, TSc, TNetStatus, TNet, TTxPool, TOc> Future for
}
impl<TBl, TCl, TSc, TNetStatus, TNet, TTxPool, TOc> Executor<Box<dyn Future<Item = (), Error = ()> + Send>> for
NewService<TBl, TCl, TSc, TNetStatus, TNet, TTxPool, TOc>
Service<TBl, TCl, TSc, TNetStatus, TNet, TTxPool, TOc>
{
fn execute(
&self,
@@ -819,7 +488,7 @@ pub struct NetworkStatus<B: BlockT> {
}
impl<TBl, TCl, TSc, TNetStatus, TNet, TTxPool, TOc> Drop for
NewService<TBl, TCl, TSc, TNetStatus, TNet, TTxPool, TOc>
Service<TBl, TCl, TSc, TNetStatus, TNet, TTxPool, TOc>
{
fn drop(&mut self) {
debug!(target: "service", "Substrate service shutdown");
+2 -2
View File
@@ -33,7 +33,7 @@ use transaction_pool::{self, txpool::{Pool as TransactionPool}};
use inherents::InherentDataProviders;
use network::construct_simple_protocol;
use substrate_service::{NewService, NetworkStatus};
use substrate_service::{Service, NetworkStatus};
use client::{Client, LocalCallExecutor};
use client_db::Backend;
use sr_primitives::traits::Block as BlockT;
@@ -248,7 +248,7 @@ pub type NodeConfiguration<C> = Configuration<C, GenesisConfig, crate::chain_spe
/// Builds a new service for a full client.
pub fn new_full<C: Send + Default + 'static>(config: NodeConfiguration<C>)
-> Result<
NewService<
Service<
ConcreteBlock,
ConcreteClient,
LongestChain<ConcreteBackend, ConcreteBlock>,