diff --git a/substrate/core/service/src/builder.rs b/substrate/core/service/src/builder.rs index cf91d1b267..b2a6cc731a 100644 --- a/substrate/core/service/src/builder.rs +++ b/substrate/core/service/src/builder.rs @@ -14,9 +14,8 @@ // You should have received a copy of the GNU General Public License // along with Substrate. If not, see . -use crate::{NewService, NetworkStatus, NetworkState, error::{self, Error}, DEFAULT_PROTOCOL_ID}; +use crate::{Service, NetworkStatus, NetworkState, error::{self, Error}, DEFAULT_PROTOCOL_ID}; use crate::{SpawnTaskHandle, start_rpc_servers, build_network_future, TransactionPoolAdapter}; -use crate::TaskExecutor; use crate::status_sinks; use crate::config::Configuration; use client::{ @@ -33,13 +32,13 @@ use futures03::{ FutureExt as _, TryFutureExt as _, StreamExt as _, TryStreamExt as _, }; -use keystore::{Store as Keystore, KeyStorePtr}; +use keystore::{Store as Keystore}; use log::{info, warn}; use network::{FinalityProofProvider, OnDemand, NetworkService, NetworkStateInfo, DhtEvent}; use network::{config::BoxFinalityProofRequestBuilder, specialization::NetworkSpecialization}; use parking_lot::{Mutex, RwLock}; use primitives::{Blake2Hasher, H256, Hasher}; -use rpc::{self, system::SystemInfo}; +use rpc; use sr_primitives::generic::BlockId; use sr_primitives::traits::{ Block as BlockT, Extrinsic, ProvideRuntimeApi, NumberFor, One, Zero, Header, SaturatedConversion @@ -69,7 +68,7 @@ use transaction_pool::txpool::{self, ChainApi, Pool as TransactionPool}; /// generics is done when you call `build`. /// pub struct ServiceBuilder + TNetP, TExPool, TRpc, Backend> { config: Configuration, client: Arc, @@ -83,7 +82,7 @@ pub struct ServiceBuilder, rpc_extensions: TRpc, - rpc_builder: TRpcB, + remote_backend: Option>>, dht_event_tx: Option>, marker: PhantomData<(TBl, TRtApi)>, } @@ -134,7 +133,7 @@ type TLightCallExecutor = client::light::call_executor::GenesisC >, >; -impl ServiceBuilder<(), (), TCfg, TGen, TCSExt, (), (), (), (), (), (), (), (), (), (), ()> +impl ServiceBuilder<(), (), TCfg, TGen, TCSExt, (), (), (), (), (), (), (), (), (), ()> where TGen: RuntimeGenesis, TCSExt: Extension { /// Start the service builder with a configuration. pub fn new_full, TRtApi, TExecDisp: NativeExecutionDispatch>( @@ -154,7 +153,6 @@ where TGen: RuntimeGenesis, TCSExt: Extension { (), (), (), - FullRpcBuilder, TFullBackend, >, Error> { let keystore = Keystore::open(config.keystore_path.clone(), config.keystore_password.clone())?; @@ -190,8 +188,6 @@ where TGen: RuntimeGenesis, TCSExt: Extension { let client = Arc::new(client); - let rpc_builder = FullRpcBuilder { client: client.clone() }; - Ok(ServiceBuilder { config, client, @@ -205,7 +201,7 @@ where TGen: RuntimeGenesis, TCSExt: Extension { network_protocol: (), transaction_pool: Arc::new(()), rpc_extensions: Default::default(), - rpc_builder, + remote_backend: None, dht_event_tx: None, marker: PhantomData, }) @@ -229,7 +225,6 @@ where TGen: RuntimeGenesis, TCSExt: Extension { (), (), (), - LightRpcBuilder, TLightBackend, >, Error> { let keystore = Keystore::open(config.keystore_path.clone(), config.keystore_password.clone())?; @@ -259,11 +254,6 @@ where TGen: RuntimeGenesis, TCSExt: Extension { &config.chain_spec, executor, )?); - let rpc_builder = LightRpcBuilder { - client: client.clone(), - remote_blockchain, - fetcher: fetcher.clone(), - }; Ok(ServiceBuilder { config, @@ -278,16 +268,16 @@ where TGen: RuntimeGenesis, TCSExt: Extension { network_protocol: (), transaction_pool: Arc::new(()), rpc_extensions: Default::default(), - rpc_builder, + remote_backend: Some(remote_blockchain), dht_event_tx: None, marker: PhantomData, }) } } -impl +impl ServiceBuilder { + TNetP, TExPool, TRpc, Backend> { /// Returns a reference to the client that was stored in this builder. pub fn client(&self) -> &Arc { @@ -311,7 +301,7 @@ impl, &Arc ) -> Result, Error> ) -> Result, Error> { + TNetP, TExPool, TRpc, Backend>, Error> { let select_chain = select_chain_builder(&self.config, &self.backend)?; Ok(ServiceBuilder { @@ -327,7 +317,7 @@ impl, &Arc) -> Result ) -> Result, Error> { + TNetP, TExPool, TRpc, Backend>, Error> { self.with_opt_select_chain(|cfg, b| builder(cfg, b).map(Option::Some)) } @@ -348,7 +338,7 @@ impl, Arc, Option, Arc) -> Result ) -> Result, Error> + TNetP, TExPool, TRpc, Backend>, Error> where TSc: Clone { let import_queue = builder( &self.config, @@ -370,7 +360,7 @@ impl) -> Result ) -> Result, Error> { + UNetP, TExPool, TRpc, Backend>, Error> { let network_protocol = network_protocol_builder(&self.config)?; Ok(ServiceBuilder { @@ -397,7 +387,7 @@ impl, Error> { let finality_proof_provider = builder(self.client.clone(), self.backend.clone())?; @@ -440,7 +429,7 @@ impl, Error> { self.with_opt_finality_proof_provider(|client, backend| build(client, backend).map(Option::Some)) @@ -483,7 +471,7 @@ impl, ) -> Result<(UImpQu, Option), Error> ) -> Result, Error> + TNetP, TExPool, TRpc, Backend>, Error> where TSc: Clone, TFchr: Clone { let (import_queue, fprb) = builder( &self.config, @@ -507,7 +495,7 @@ impl, ) -> Result<(UImpQu, UFprb), Error> ) -> Result, Error> + TNetP, TExPool, TRpc, Backend>, Error> where TSc: Clone, TFchr: Clone { self.with_import_queue_and_opt_fprb(|cfg, cl, b, f, sc, tx| builder(cfg, cl, b, f, sc, tx) @@ -538,7 +526,7 @@ impl) -> Result ) -> Result, Error> { + TNetP, UExPool, TRpc, Backend>, Error> { let transaction_pool = transaction_pool_builder(self.config.transaction_pool.clone(), self.client.clone())?; Ok(ServiceBuilder { @@ -554,7 +542,7 @@ impl, Arc) -> URpc ) -> Result, Error> { + TNetP, TExPool, URpc, Backend>, Error> { let rpc_extensions = rpc_ext_builder(self.client.clone(), self.transaction_pool.clone()); Ok(ServiceBuilder { @@ -581,114 +569,36 @@ impl, - ) -> Result, Error> { - Ok(ServiceBuilder { - config: self.config, - client: self.client, - backend: self.backend, - keystore: self.keystore, - fetcher: self.fetcher, - select_chain: self.select_chain, - import_queue: self.import_queue, - finality_proof_request_builder: self.finality_proof_request_builder, - finality_proof_provider: self.finality_proof_provider, - network_protocol: self.network_protocol, - transaction_pool: self.transaction_pool, - rpc_extensions: self.rpc_extensions, - rpc_builder: self.rpc_builder, - dht_event_tx: Some(dht_event_tx), - marker: self.marker, - }) - } -} - -/// RPC handlers builder. -pub trait RpcBuilder { - /// Build chain RPC handler. - fn build_chain(&self, subscriptions: rpc::Subscriptions) -> rpc::chain::Chain; - /// Build state RPC handler. - fn build_state(&self, subscriptions: rpc::Subscriptions) -> rpc::state::State; -} - -/// RPC handlers builder for full nodes. -pub struct FullRpcBuilder { - client: Arc>, -} - -impl RpcBuilder, TFullCallExecutor, TRtApi> - for - FullRpcBuilder - where - TBl: BlockT, - TRtApi: 'static + Send + Sync, - TExecDisp: 'static + NativeExecutionDispatch, - TFullClient: ProvideRuntimeApi, - as ProvideRuntimeApi>::Api: runtime_api::Metadata, -{ - fn build_chain( - &self, - subscriptions: rpc::Subscriptions, - ) -> rpc::chain::Chain, TFullCallExecutor, TBl, TRtApi> { - rpc::chain::new_full(self.client.clone(), subscriptions) - } - - fn build_state( - &self, - subscriptions: rpc::Subscriptions, - ) -> rpc::state::State, TFullCallExecutor, TBl, TRtApi> { - rpc::state::new_full(self.client.clone(), subscriptions) - } -} - -/// RPC handlers builder for light nodes. -pub struct LightRpcBuilder, TRtApi, TExecDisp> { - client: Arc>, - remote_blockchain: Arc>, - fetcher: Arc>, -} - -impl RpcBuilder, TLightCallExecutor, TRtApi> - for - LightRpcBuilder - where - TBl: BlockT, - TRtApi: 'static + Send + Sync, - TExecDisp: 'static + NativeExecutionDispatch, -{ - fn build_chain( - &self, - subscriptions: rpc::Subscriptions, - ) -> rpc::chain::Chain, TLightCallExecutor, TBl, TRtApi> { - rpc::chain::new_light( - self.client.clone(), - subscriptions, - self.remote_blockchain.clone(), - self.fetcher.clone(), - ) - } - - fn build_state( - &self, - subscriptions: rpc::Subscriptions, - ) -> rpc::state::State, TLightCallExecutor, TBl, TRtApi> { - rpc::state::new_light( - self.client.clone(), - subscriptions, - self.remote_blockchain.clone(), - self.fetcher.clone(), - ) + /// Adds a dht event sender to builder to be used by the network to send dht events to the authority discovery + /// module. + pub fn with_dht_event_tx( + self, + dht_event_tx: mpsc::Sender, + ) -> Result, Error> { + Ok(ServiceBuilder { + config: self.config, + client: self.client, + backend: self.backend, + keystore: self.keystore, + fetcher: self.fetcher, + select_chain: self.select_chain, + import_queue: self.import_queue, + finality_proof_request_builder: self.finality_proof_request_builder, + finality_proof_provider: self.finality_proof_provider, + network_protocol: self.network_protocol, + transaction_pool: self.transaction_pool, + rpc_extensions: self.rpc_extensions, + remote_backend: self.remote_backend, + dht_event_tx: Some(dht_event_tx), + marker: self.marker, + }) } } @@ -736,10 +646,10 @@ pub trait ServiceBuilderRevert { impl< TBl, TRtApi, TCfg, TGen, TCSExt, TBackend, TExec, TFchr, TSc, TImpQu, TFprb, TFpp, TNetP, - TExPool, TRpc, TRpcB, Backend + TExPool, TRpc, Backend > ServiceBuilderImport for ServiceBuilder< TBl, TRtApi, TCfg, TGen, TCSExt, Client, - TFchr, TSc, TImpQu, TFprb, TFpp, TNetP, TExPool, TRpc, TRpcB, Backend + TFchr, TSc, TImpQu, TFprb, TFpp, TNetP, TExPool, TRpc, Backend > where TBl: BlockT::Out>, TBackend: 'static + client::backend::Backend + Send, @@ -759,9 +669,9 @@ impl< } } -impl +impl ServiceBuilderExport for ServiceBuilder, - TFchr, TSc, TImpQu, TFprb, TFpp, TNetP, TExPool, TRpc, TRpcB, TBackend> + TFchr, TSc, TImpQu, TFprb, TFpp, TNetP, TExPool, TRpc, TBackend> where TBl: BlockT::Out>, TBackend: 'static + client::backend::Backend + Send, @@ -782,9 +692,9 @@ where } } -impl +impl ServiceBuilderRevert for ServiceBuilder, - TFchr, TSc, TImpQu, TFprb, TFpp, TNetP, TExPool, TRpc, TRpcB, TBackend> + TFchr, TSc, TImpQu, TFprb, TFpp, TNetP, TExPool, TRpc, TBackend> where TBl: BlockT::Out>, TBackend: 'static + client::backend::Backend + Send, @@ -801,7 +711,7 @@ where } } -impl +impl ServiceBuilder< TBl, TRtApi, @@ -817,7 +727,6 @@ ServiceBuilder< TNetP, TransactionPool, TRpc, - TRpcB, TBackend, > where Client: ProvideRuntimeApi, @@ -838,10 +747,9 @@ ServiceBuilder< TNetP: NetworkSpecialization, TExPoolApi: 'static + ChainApi::Hash>, TRpc: rpc::RpcExtension + Clone, - TRpcB: RpcBuilder, { /// Builds the service. - pub fn build(self) -> Result Result, TSc, @@ -858,7 +766,7 @@ ServiceBuilder< marker: _, mut config, client, - fetcher, + fetcher: on_demand, backend, keystore, select_chain, @@ -868,8 +776,8 @@ ServiceBuilder< network_protocol, transaction_pool, rpc_extensions, + remote_backend, dht_event_tx, - rpc_builder, } = self; session::generate_initial_session_keys( @@ -877,74 +785,344 @@ ServiceBuilder< config.dev_key_seed.clone().map(|s| vec![s]).unwrap_or_default() )?; - new_impl!( - TBl, - config, - move |_| -> Result<_, Error> { - Ok(( - client, - fetcher, - backend, - keystore, - select_chain, - import_queue, - finality_proof_request_builder, - finality_proof_provider, - network_protocol, - transaction_pool, - rpc_extensions, - dht_event_tx, - )) + let (signal, exit) = exit_future::signal(); + + // List of asynchronous tasks to spawn. We collect them, then spawn them all at once. + let (to_spawn_tx, to_spawn_rx) = + mpsc::unbounded:: + Send>>(); + + let import_queue = Box::new(import_queue); + let chain_info = client.info().chain; + + let version = config.full_version(); + info!("Highest known block at #{}", chain_info.best_number); + telemetry!( + SUBSTRATE_INFO; + "node.start"; + "height" => chain_info.best_number.saturated_into::(), + "best" => ?chain_info.best_hash + ); + + let transaction_pool_adapter = Arc::new(TransactionPoolAdapter { + imports_external_transactions: !config.roles.is_light(), + pool: transaction_pool.clone(), + client: client.clone(), + executor: Arc::new(SpawnTaskHandle { sender: to_spawn_tx.clone(), on_exit: exit.clone() }), + }); + + let protocol_id = { + let protocol_id_full = match config.chain_spec.protocol_id() { + Some(pid) => pid, + None => { + warn!("Using default protocol ID {:?} because none is configured in the \ + chain specs", DEFAULT_PROTOCOL_ID + ); + DEFAULT_PROTOCOL_ID + } + }.as_bytes(); + network::config::ProtocolId::from(protocol_id_full) + }; + + let block_announce_validator = + Box::new(consensus_common::block_validation::DefaultBlockAnnounceValidator::new(client.clone())); + + let network_params = network::config::Params { + roles: config.roles, + network_config: config.network.clone(), + chain: client.clone(), + finality_proof_provider, + finality_proof_request_builder, + on_demand: on_demand.clone(), + transaction_pool: transaction_pool_adapter.clone() as _, + import_queue, + protocol_id, + specialization: network_protocol, + block_announce_validator, + }; + + let has_bootnodes = !network_params.network_config.boot_nodes.is_empty(); + let network_mut = network::NetworkWorker::new(network_params)?; + let network = network_mut.service().clone(); + let network_status_sinks = Arc::new(Mutex::new(status_sinks::StatusSinks::new())); + + let offchain_storage = backend.offchain_storage(); + let offchain_workers = match (config.offchain_worker, offchain_storage) { + (true, Some(db)) => { + Some(Arc::new(offchain::OffchainWorkers::new(client.clone(), db))) }, - |h, c, tx, r| maintain_transaction_pool(h, c, tx, r), - |n, o, p, ns, v| offchain_workers(n, o, p, ns, v), - |c, ssb, si, te, tp, ext, ks| start_rpc(&rpc_builder, c, ssb, si, te, tp, ext, ks), + (true, None) => { + log::warn!("Offchain workers disabled, due to lack of offchain storage support in backend."); + None + }, + _ => None, + }; + + { + // block notifications + let txpool = Arc::downgrade(&transaction_pool); + let wclient = Arc::downgrade(&client); + let offchain = offchain_workers.as_ref().map(Arc::downgrade); + let to_spawn_tx_ = to_spawn_tx.clone(); + let network_state_info: Arc = network.clone(); + let is_validator = config.roles.is_authority(); + + let events = client.import_notification_stream() + .map(|v| Ok::<_, ()>(v)).compat() + .for_each(move |notification| { + let number = *notification.header.number(); + let txpool = txpool.upgrade(); + + if let (Some(txpool), Some(client)) = (txpool.as_ref(), wclient.upgrade()) { + let future = maintain_transaction_pool( + &BlockId::hash(notification.hash), + &client, + &*txpool, + ¬ification.retracted, + ).map_err(|e| warn!("Pool error processing new block: {:?}", e))?; + let _ = to_spawn_tx_.unbounded_send(future); + } + + let offchain = offchain.as_ref().and_then(|o| o.upgrade()); + if let (Some(txpool), Some(offchain)) = (txpool, offchain) { + let future = offchain.on_block_imported(&number, &txpool, network_state_info.clone(), is_validator) + .map(|()| Ok(())); + let _ = to_spawn_tx_.unbounded_send(Box::new(Compat::new(future))); + } + + Ok(()) + }) + .select(exit.clone()) + .then(|_| Ok(())); + let _ = to_spawn_tx.unbounded_send(Box::new(events)); + } + + { + // extrinsic notifications + let network = Arc::downgrade(&network); + let transaction_pool_ = transaction_pool.clone(); + let events = transaction_pool.import_notification_stream() + .map(|v| Ok::<_, ()>(v)).compat() + .for_each(move |_| { + if let Some(network) = network.upgrade() { + network.trigger_repropagate(); + } + let status = transaction_pool_.status(); + telemetry!(SUBSTRATE_INFO; "txpool.import"; + "ready" => status.ready, + "future" => status.future + ); + Ok(()) + }) + .select(exit.clone()) + .then(|_| Ok(())); + + let _ = to_spawn_tx.unbounded_send(Box::new(events)); + } + + // Periodically notify the telemetry. + let transaction_pool_ = transaction_pool.clone(); + let client_ = client.clone(); + let mut sys = System::new(); + let self_pid = get_current_pid().ok(); + let (state_tx, state_rx) = mpsc::unbounded::<(NetworkStatus<_>, NetworkState)>(); + network_status_sinks.lock().push(std::time::Duration::from_millis(5000), state_tx); + let tel_task = state_rx.for_each(move |(net_status, _)| { + let info = client_.info(); + let best_number = info.chain.best_number.saturated_into::(); + let best_hash = info.chain.best_hash; + let num_peers = net_status.num_connected_peers; + let txpool_status = transaction_pool_.status(); + let finalized_number: u64 = info.chain.finalized_number.saturated_into::(); + let bandwidth_download = net_status.average_download_per_sec; + let bandwidth_upload = net_status.average_upload_per_sec; + + let used_state_cache_size = match info.used_state_cache_size { + Some(size) => size, + None => 0, + }; + + // get cpu usage and memory usage of this process + let (cpu_usage, memory) = if let Some(self_pid) = self_pid { + if sys.refresh_process(self_pid) { + let proc = sys.get_process(self_pid) + .expect("Above refresh_process succeeds, this should be Some(), qed"); + (proc.cpu_usage(), proc.memory()) + } else { (0.0, 0) } + } else { (0.0, 0) }; + + telemetry!( + SUBSTRATE_INFO; + "system.interval"; + "peers" => num_peers, + "height" => best_number, + "best" => ?best_hash, + "txcount" => txpool_status.ready, + "cpu" => cpu_usage, + "memory" => memory, + "finalized_height" => finalized_number, + "finalized_hash" => ?info.chain.finalized_hash, + "bandwidth_download" => bandwidth_download, + "bandwidth_upload" => bandwidth_upload, + "used_state_cache_size" => used_state_cache_size, + ); + + Ok(()) + }).select(exit.clone()).then(|_| Ok(())); + let _ = to_spawn_tx.unbounded_send(Box::new(tel_task)); + + // Periodically send the network state to the telemetry. + let (netstat_tx, netstat_rx) = mpsc::unbounded::<(NetworkStatus<_>, NetworkState)>(); + network_status_sinks.lock().push(std::time::Duration::from_secs(30), netstat_tx); + let tel_task_2 = netstat_rx.for_each(move |(_, network_state)| { + telemetry!( + SUBSTRATE_INFO; + "system.network_state"; + "state" => network_state, + ); + Ok(()) + }).select(exit.clone()).then(|_| Ok(())); + let _ = to_spawn_tx.unbounded_send(Box::new(tel_task_2)); + + // RPC + let (system_rpc_tx, system_rpc_rx) = futures03::channel::mpsc::unbounded(); + let gen_handler = || { + use rpc::{chain, state, author, system}; + + let system_info = rpc::system::SystemInfo { + chain_name: config.chain_spec.name().into(), + impl_name: config.impl_name.into(), + impl_version: config.impl_version.into(), + properties: config.chain_spec.properties().clone(), + }; + + let subscriptions = rpc::Subscriptions::new(Arc::new(SpawnTaskHandle { + sender: to_spawn_tx.clone(), + on_exit: exit.clone() + })); + + let (chain, state) = if let (Some(remote_backend), Some(on_demand)) = + (remote_backend.as_ref(), on_demand.as_ref()) { + // Light clients + let chain = rpc::chain::new_light( + client.clone(), + subscriptions.clone(), + remote_backend.clone(), + on_demand.clone() + ); + let state = rpc::state::new_light( + client.clone(), + subscriptions.clone(), + remote_backend.clone(), + on_demand.clone() + ); + (chain, state) + + } else { + // Full nodes + let chain = rpc::chain::new_full(client.clone(), subscriptions.clone()); + let state = rpc::state::new_full(client.clone(), subscriptions.clone()); + (chain, state) + }; + + let author = rpc::author::Author::new( + client.clone(), + transaction_pool.clone(), + subscriptions, + keystore.clone(), + ); + let system = system::System::new(system_info, system_rpc_tx.clone()); + + rpc_servers::rpc_handler(( + state::StateApi::to_delegate(state), + chain::ChainApi::to_delegate(chain), + author::AuthorApi::to_delegate(author), + system::SystemApi::to_delegate(system), + rpc_extensions.clone(), + )) + }; + let rpc_handlers = gen_handler(); + let rpc = start_rpc_servers(&config, gen_handler)?; + + + let _ = to_spawn_tx.unbounded_send(Box::new(build_network_future( + config.roles, + network_mut, + client.clone(), + network_status_sinks.clone(), + system_rpc_rx, + has_bootnodes, + dht_event_tx, ) + .map_err(|_| ()) + .select(exit.clone()) + .then(|_| Ok(())))); + + let telemetry_connection_sinks: Arc>>> = Default::default(); + + // Telemetry + let telemetry = config.telemetry_endpoints.clone().map(|endpoints| { + let is_authority = config.roles.is_authority(); + let network_id = network.local_peer_id().to_base58(); + let name = config.name.clone(); + let impl_name = config.impl_name.to_owned(); + let version = version.clone(); + let chain_name = config.chain_spec.name().to_owned(); + let telemetry_connection_sinks_ = telemetry_connection_sinks.clone(); + let telemetry = tel::init_telemetry(tel::TelemetryConfig { + endpoints, + wasm_external_transport: config.telemetry_external_transport.take(), + }); + let future = telemetry.clone() + .map(|ev| Ok::<_, ()>(ev)) + .compat() + .for_each(move |event| { + // Safe-guard in case we add more events in the future. + let tel::TelemetryEvent::Connected = event; + + telemetry!(SUBSTRATE_INFO; "system.connected"; + "name" => name.clone(), + "implementation" => impl_name.clone(), + "version" => version.clone(), + "config" => "", + "chain" => chain_name.clone(), + "authority" => is_authority, + "network_id" => network_id.clone() + ); + + telemetry_connection_sinks_.lock().retain(|sink| { + sink.unbounded_send(()).is_ok() + }); + Ok(()) + }); + let _ = to_spawn_tx.unbounded_send(Box::new(future + .select(exit.clone()) + .then(|_| Ok(())))); + telemetry + }); + + Ok(Service { + client, + network, + network_status_sinks, + select_chain, + transaction_pool, + exit, + signal: Some(signal), + essential_failed: Arc::new(AtomicBool::new(false)), + to_spawn_tx, + to_spawn_rx, + to_poll: Vec::new(), + rpc_handlers, + _rpc: rpc, + _telemetry: telemetry, + _offchain_workers: offchain_workers, + _telemetry_on_connect_sinks: telemetry_connection_sinks.clone(), + keystore, + marker: PhantomData::, + }) } } -pub(crate) fn start_rpc( - rpc_builder: &RpcB, - client: Arc>, - system_send_back: futures03::channel::mpsc::UnboundedSender>, - rpc_system_info: SystemInfo, - task_executor: TaskExecutor, - transaction_pool: Arc>, - rpc_extensions: impl rpc::RpcExtension, - keystore: KeyStorePtr, -) -> rpc_servers::RpcHandler -where - Block: BlockT::Out>, - Backend: client::backend::Backend + 'static, - Client: ProvideRuntimeApi, - as ProvideRuntimeApi>::Api: - runtime_api::Metadata + session::SessionKeys, - Api: Send + Sync + 'static, - Executor: client::CallExecutor + Send + Sync + Clone + 'static, - PoolApi: txpool::ChainApi + 'static, - RpcB: RpcBuilder, -{ - use rpc::{chain, state, author, system}; - let subscriptions = rpc::Subscriptions::new(task_executor); - let chain = rpc_builder.build_chain(subscriptions.clone()); - let state = rpc_builder.build_state(subscriptions.clone()); - let author = rpc::author::Author::new( - client, - transaction_pool, - subscriptions, - keystore, - ); - let system = system::System::new(rpc_system_info, system_send_back); - - rpc_servers::rpc_handler(( - state::StateApi::to_delegate(state), - chain::ChainApi::to_delegate(chain), - author::AuthorApi::to_delegate(author), - system::SystemApi::to_delegate(system), - rpc_extensions, - )) -} - pub(crate) fn maintain_transaction_pool( id: &BlockId, client: &Arc>, @@ -997,32 +1175,6 @@ pub(crate) fn maintain_transaction_pool( }) } -pub(crate) fn offchain_workers( - number: &NumberFor, - offchain: &offchain::OffchainWorkers< - Client, - >::OffchainStorage, - Block - >, - pool: &Arc>, - network_state: &Arc, - is_validator: bool, -) -> error::Result + Send>> -where - Block: BlockT::Out>, - Backend: client::backend::Backend + 'static, - Api: 'static, - >::OffchainStorage: 'static, - Client: ProvideRuntimeApi + Send + Sync, - as ProvideRuntimeApi>::Api: offchain::OffchainWorkerApi, - Executor: client::CallExecutor + 'static, - PoolApi: txpool::ChainApi + 'static, -{ - let future = offchain.on_block_imported(number, pool, network_state.clone(), is_validator) - .map(|()| Ok(())); - Ok(Box::new(Compat::new(future))) -} - #[cfg(test)] mod tests { use super::*; diff --git a/substrate/core/service/src/lib.rs b/substrate/core/service/src/lib.rs index 334a001a48..3057b8ce4d 100644 --- a/substrate/core/service/src/lib.rs +++ b/substrate/core/service/src/lib.rs @@ -24,6 +24,7 @@ pub mod config; pub mod chain_ops; pub mod error; +mod builder; mod status_sinks; use std::io; @@ -71,7 +72,7 @@ pub use futures::future::Executor; const DEFAULT_PROTOCOL_ID: &str = "sup"; /// Substrate service. -pub struct NewService { +pub struct Service { client: Arc, select_chain: Option, network: Arc, @@ -129,338 +130,6 @@ impl Executor + Send>> for SpawnTaskHandle } } -macro_rules! new_impl { - ( - $block:ty, - $config:ident, - $build_components:expr, - $maintain_transaction_pool:expr, - $offchain_workers:expr, - $start_rpc:expr, - ) => {{ - let (signal, exit) = exit_future::signal(); - - // List of asynchronous tasks to spawn. We collect them, then spawn them all at once. - let (to_spawn_tx, to_spawn_rx) = - mpsc::unbounded:: + Send>>(); - - // Create all the components. - let ( - client, - on_demand, - backend, - keystore, - select_chain, - import_queue, - finality_proof_request_builder, - finality_proof_provider, - network_protocol, - transaction_pool, - rpc_extensions, - dht_event_tx, - ) = $build_components(&$config)?; - let import_queue = Box::new(import_queue); - let chain_info = client.info().chain; - - let version = $config.full_version(); - info!("Highest known block at #{}", chain_info.best_number); - telemetry!( - SUBSTRATE_INFO; - "node.start"; - "height" => chain_info.best_number.saturated_into::(), - "best" => ?chain_info.best_hash - ); - - let transaction_pool_adapter = Arc::new(TransactionPoolAdapter { - imports_external_transactions: !$config.roles.is_light(), - pool: transaction_pool.clone(), - client: client.clone(), - executor: Arc::new(SpawnTaskHandle { sender: to_spawn_tx.clone(), on_exit: exit.clone() }), - }); - - let protocol_id = { - let protocol_id_full = match $config.chain_spec.protocol_id() { - Some(pid) => pid, - None => { - warn!("Using default protocol ID {:?} because none is configured in the \ - chain specs", DEFAULT_PROTOCOL_ID - ); - DEFAULT_PROTOCOL_ID - } - }.as_bytes(); - network::config::ProtocolId::from(protocol_id_full) - }; - - let block_announce_validator = - Box::new(consensus_common::block_validation::DefaultBlockAnnounceValidator::new(client.clone())); - - let network_params = network::config::Params { - roles: $config.roles, - network_config: $config.network.clone(), - chain: client.clone(), - finality_proof_provider, - finality_proof_request_builder, - on_demand, - transaction_pool: transaction_pool_adapter.clone() as _, - import_queue, - protocol_id, - specialization: network_protocol, - block_announce_validator, - }; - - let has_bootnodes = !network_params.network_config.boot_nodes.is_empty(); - let network_mut = network::NetworkWorker::new(network_params)?; - let network = network_mut.service().clone(); - let network_status_sinks = Arc::new(Mutex::new(status_sinks::StatusSinks::new())); - - let offchain_storage = backend.offchain_storage(); - let offchain_workers = match ($config.offchain_worker, offchain_storage) { - (true, Some(db)) => { - Some(Arc::new(offchain::OffchainWorkers::new(client.clone(), db))) - }, - (true, None) => { - log::warn!("Offchain workers disabled, due to lack of offchain storage support in backend."); - None - }, - _ => None, - }; - - { - // block notifications - let txpool = Arc::downgrade(&transaction_pool); - let wclient = Arc::downgrade(&client); - let offchain = offchain_workers.as_ref().map(Arc::downgrade); - let to_spawn_tx_ = to_spawn_tx.clone(); - let network_state_info: Arc = network.clone(); - let is_validator = $config.roles.is_authority(); - - let events = client.import_notification_stream() - .map(|v| Ok::<_, ()>(v)).compat() - .for_each(move |notification| { - let number = *notification.header.number(); - let txpool = txpool.upgrade(); - - if let (Some(txpool), Some(client)) = (txpool.as_ref(), wclient.upgrade()) { - let future = $maintain_transaction_pool( - &BlockId::hash(notification.hash), - &client, - &*txpool, - ¬ification.retracted, - ).map_err(|e| warn!("Pool error processing new block: {:?}", e))?; - let _ = to_spawn_tx_.unbounded_send(future); - } - - let offchain = offchain.as_ref().and_then(|o| o.upgrade()); - if let (Some(txpool), Some(offchain)) = (txpool, offchain) { - let future = $offchain_workers( - &number, - &offchain, - &txpool, - &network_state_info, - is_validator, - ).map_err(|e| warn!("Offchain workers error processing new block: {:?}", e))?; - let _ = to_spawn_tx_.unbounded_send(future); - } - - Ok(()) - }) - .select(exit.clone()) - .then(|_| Ok(())); - let _ = to_spawn_tx.unbounded_send(Box::new(events)); - } - - { - // extrinsic notifications - let network = Arc::downgrade(&network); - let transaction_pool_ = transaction_pool.clone(); - let events = transaction_pool.import_notification_stream() - .map(|v| Ok::<_, ()>(v)).compat() - .for_each(move |_| { - if let Some(network) = network.upgrade() { - network.trigger_repropagate(); - } - let status = transaction_pool_.status(); - telemetry!(SUBSTRATE_INFO; "txpool.import"; - "ready" => status.ready, - "future" => status.future - ); - Ok(()) - }) - .select(exit.clone()) - .then(|_| Ok(())); - - let _ = to_spawn_tx.unbounded_send(Box::new(events)); - } - - // Periodically notify the telemetry. - let transaction_pool_ = transaction_pool.clone(); - let client_ = client.clone(); - let mut sys = System::new(); - let self_pid = get_current_pid().ok(); - let (state_tx, state_rx) = mpsc::unbounded::<(NetworkStatus<_>, NetworkState)>(); - network_status_sinks.lock().push(std::time::Duration::from_millis(5000), state_tx); - let tel_task = state_rx.for_each(move |(net_status, _)| { - let info = client_.info(); - let best_number = info.chain.best_number.saturated_into::(); - let best_hash = info.chain.best_hash; - let num_peers = net_status.num_connected_peers; - let txpool_status = transaction_pool_.status(); - let finalized_number: u64 = info.chain.finalized_number.saturated_into::(); - let bandwidth_download = net_status.average_download_per_sec; - let bandwidth_upload = net_status.average_upload_per_sec; - - let used_state_cache_size = match info.used_state_cache_size { - Some(size) => size, - None => 0, - }; - - // get cpu usage and memory usage of this process - let (cpu_usage, memory) = if let Some(self_pid) = self_pid { - if sys.refresh_process(self_pid) { - let proc = sys.get_process(self_pid) - .expect("Above refresh_process succeeds, this should be Some(), qed"); - (proc.cpu_usage(), proc.memory()) - } else { (0.0, 0) } - } else { (0.0, 0) }; - - telemetry!( - SUBSTRATE_INFO; - "system.interval"; - "peers" => num_peers, - "height" => best_number, - "best" => ?best_hash, - "txcount" => txpool_status.ready, - "cpu" => cpu_usage, - "memory" => memory, - "finalized_height" => finalized_number, - "finalized_hash" => ?info.chain.finalized_hash, - "bandwidth_download" => bandwidth_download, - "bandwidth_upload" => bandwidth_upload, - "used_state_cache_size" => used_state_cache_size, - ); - - Ok(()) - }).select(exit.clone()).then(|_| Ok(())); - let _ = to_spawn_tx.unbounded_send(Box::new(tel_task)); - - // Periodically send the network state to the telemetry. - let (netstat_tx, netstat_rx) = mpsc::unbounded::<(NetworkStatus<_>, NetworkState)>(); - network_status_sinks.lock().push(std::time::Duration::from_secs(30), netstat_tx); - let tel_task_2 = netstat_rx.for_each(move |(_, network_state)| { - telemetry!( - SUBSTRATE_INFO; - "system.network_state"; - "state" => network_state, - ); - Ok(()) - }).select(exit.clone()).then(|_| Ok(())); - let _ = to_spawn_tx.unbounded_send(Box::new(tel_task_2)); - - // RPC - let (system_rpc_tx, system_rpc_rx) = futures03::channel::mpsc::unbounded(); - let gen_handler = || { - let system_info = rpc::system::SystemInfo { - chain_name: $config.chain_spec.name().into(), - impl_name: $config.impl_name.into(), - impl_version: $config.impl_version.into(), - properties: $config.chain_spec.properties().clone(), - }; - $start_rpc( - client.clone(), - //light_components.clone(), - system_rpc_tx.clone(), - system_info.clone(), - Arc::new(SpawnTaskHandle { sender: to_spawn_tx.clone(), on_exit: exit.clone() }), - transaction_pool.clone(), - rpc_extensions.clone(), - keystore.clone(), - ) - }; - let rpc_handlers = gen_handler(); - let rpc = start_rpc_servers(&$config, gen_handler)?; - - - let _ = to_spawn_tx.unbounded_send(Box::new(build_network_future( - $config.roles, - network_mut, - client.clone(), - network_status_sinks.clone(), - system_rpc_rx, - has_bootnodes, - dht_event_tx, - ) - .map_err(|_| ()) - .select(exit.clone()) - .then(|_| Ok(())))); - - let telemetry_connection_sinks: Arc>>> = Default::default(); - - // Telemetry - let telemetry = $config.telemetry_endpoints.clone().map(|endpoints| { - let is_authority = $config.roles.is_authority(); - let network_id = network.local_peer_id().to_base58(); - let name = $config.name.clone(); - let impl_name = $config.impl_name.to_owned(); - let version = version.clone(); - let chain_name = $config.chain_spec.name().to_owned(); - let telemetry_connection_sinks_ = telemetry_connection_sinks.clone(); - let telemetry = tel::init_telemetry(tel::TelemetryConfig { - endpoints, - wasm_external_transport: $config.telemetry_external_transport.take(), - }); - let future = telemetry.clone() - .map(|ev| Ok::<_, ()>(ev)) - .compat() - .for_each(move |event| { - // Safe-guard in case we add more events in the future. - let tel::TelemetryEvent::Connected = event; - - telemetry!(SUBSTRATE_INFO; "system.connected"; - "name" => name.clone(), - "implementation" => impl_name.clone(), - "version" => version.clone(), - "config" => "", - "chain" => chain_name.clone(), - "authority" => is_authority, - "network_id" => network_id.clone() - ); - - telemetry_connection_sinks_.lock().retain(|sink| { - sink.unbounded_send(()).is_ok() - }); - Ok(()) - }); - let _ = to_spawn_tx.unbounded_send(Box::new(future - .select(exit.clone()) - .then(|_| Ok(())))); - telemetry - }); - - Ok(NewService { - client, - network, - network_status_sinks, - select_chain, - transaction_pool, - exit, - signal: Some(signal), - essential_failed: Arc::new(AtomicBool::new(false)), - to_spawn_tx, - to_spawn_rx, - to_poll: Vec::new(), - rpc_handlers, - _rpc: rpc, - _telemetry: telemetry, - _offchain_workers: offchain_workers, - _telemetry_on_connect_sinks: telemetry_connection_sinks.clone(), - keystore, - marker: PhantomData::<$block>, - }) - }} -} - -mod builder; - /// Abstraction over a Substrate service. pub trait AbstractService: 'static + Future + Executor + Send>> + Send { @@ -530,7 +199,7 @@ pub trait AbstractService: 'static + Future + } impl AbstractService for - NewService, TSc, NetworkStatus, + Service, TSc, NetworkStatus, NetworkService, TransactionPool, TOc> where TBl: BlockT, @@ -619,7 +288,7 @@ where } impl Future for - NewService + Service { type Item = (); type Error = Error; @@ -652,7 +321,7 @@ impl Future for } impl Executor + Send>> for - NewService + Service { fn execute( &self, @@ -819,7 +488,7 @@ pub struct NetworkStatus { } impl Drop for - NewService + Service { fn drop(&mut self) { debug!(target: "service", "Substrate service shutdown"); diff --git a/substrate/node/cli/src/service.rs b/substrate/node/cli/src/service.rs index 03d31c4392..78b978b72b 100644 --- a/substrate/node/cli/src/service.rs +++ b/substrate/node/cli/src/service.rs @@ -33,7 +33,7 @@ use transaction_pool::{self, txpool::{Pool as TransactionPool}}; use inherents::InherentDataProviders; use network::construct_simple_protocol; -use substrate_service::{NewService, NetworkStatus}; +use substrate_service::{Service, NetworkStatus}; use client::{Client, LocalCallExecutor}; use client_db::Backend; use sr_primitives::traits::Block as BlockT; @@ -248,7 +248,7 @@ pub type NodeConfiguration = Configuration(config: NodeConfiguration) -> Result< - NewService< + Service< ConcreteBlock, ConcreteClient, LongestChain,