diff --git a/substrate/Cargo.lock b/substrate/Cargo.lock index ef24acc3d5..a9d7945194 100644 --- a/substrate/Cargo.lock +++ b/substrate/Cargo.lock @@ -3980,7 +3980,6 @@ dependencies = [ "substrate-service 2.0.0", "substrate-state-machine 2.0.0", "substrate-telemetry 2.0.0", - "sysinfo 0.8.5 (registry+https://github.com/rust-lang/crates.io-index)", "tempdir 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", "time 0.1.42 (registry+https://github.com/rust-lang/crates.io-index)", "tokio 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)", @@ -4523,6 +4522,7 @@ dependencies = [ "substrate-telemetry 2.0.0", "substrate-test-runtime-client 2.0.0", "substrate-transaction-pool 2.0.0", + "sysinfo 0.8.5 (registry+https://github.com/rust-lang/crates.io-index)", "target_info 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "tokio 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)", ] diff --git a/substrate/core/cli/Cargo.toml b/substrate/core/cli/Cargo.toml index 54cde50d1a..6ca50ba5f8 100644 --- a/substrate/core/cli/Cargo.toml +++ b/substrate/core/cli/Cargo.toml @@ -21,7 +21,6 @@ futures = "0.1.17" fdlimit = "0.1" exit-future = "0.1" serde_json = "1.0" -sysinfo = "0.8.0" panic-handler = { package = "substrate-panic-handler", path = "../../core/panic-handler" } client = { package = "substrate-client", path = "../../core/client" } network = { package = "substrate-network", path = "../../core/network" } diff --git a/substrate/core/cli/src/informant.rs b/substrate/core/cli/src/informant.rs index 607ab9ae85..a6aca5edb0 100644 --- a/substrate/core/cli/src/informant.rs +++ b/substrate/core/cli/src/informant.rs @@ -22,10 +22,8 @@ use std::time; use futures::{Future, Stream}; use service::{Service, Components}; use tokio::runtime::TaskExecutor; -use sysinfo::{get_current_pid, ProcessExt, System, SystemExt}; use network::{SyncState, SyncProvider}; use client::{backend::Backend, BlockchainEvents}; -use substrate_telemetry::{telemetry, SUBSTRATE_INFO}; use log::{info, warn}; use runtime_primitives::generic::BlockId; @@ -44,19 +42,14 @@ pub fn build(service: &Service) -> impl Future where C: Components { let network = service.network(); let client = service.client(); - let txpool = service.transaction_pool(); let mut last_number = None; let mut last_update = time::Instant::now(); - let mut sys = System::new(); - let self_pid = get_current_pid(); - let display_notifications = network.status().for_each(move |sync_status| { let info = client.info(); let best_number = info.chain.best_number.saturated_into::(); let best_hash = info.chain.best_hash; - let num_peers = sync_status.num_peers; let speed = move || speed(best_number, last_number, last_update); last_update = time::Instant::now(); let (status, target) = match (sync_status.sync.state, sync_status.sync.best_seen_block) { @@ -65,7 +58,6 @@ where C: Components { (SyncState::Downloading, Some(n)) => (format!("Syncing{}", speed()), format!(", target=#{}", n)), }; last_number = Some(best_number); - let txpool_status = txpool.status(); let finalized_number: u64 = info.chain.finalized_number.saturated_into::(); let bandwidth_download = network.average_download_per_sec(); let bandwidth_upload = network.average_upload_per_sec(); @@ -83,39 +75,6 @@ where C: Components { TransferRateFormat(bandwidth_upload), ); - #[allow(deprecated)] - let backend = (*client).backend(); - let used_state_cache_size = match backend.used_state_cache_size(){ - Some(size) => size, - None => 0, - }; - - // get cpu usage and memory usage of this process - let (cpu_usage, memory) = if sys.refresh_process(self_pid) { - let proc = sys.get_process(self_pid).expect("Above refresh_process succeeds, this should be Some(), qed"); - (proc.cpu_usage(), proc.memory()) - } else { (0.0, 0) }; - - let network_state = network.network_state(); - - telemetry!( - SUBSTRATE_INFO; - "system.interval"; - "network_state" => network_state, - "status" => format!("{}{}", status, target), - "peers" => num_peers, - "height" => best_number, - "best" => ?best_hash, - "txcount" => txpool_status.ready, - "cpu" => cpu_usage, - "memory" => memory, - "finalized_height" => finalized_number, - "finalized_hash" => ?info.chain.finalized_hash, - "bandwidth_download" => bandwidth_download, - "bandwidth_upload" => bandwidth_upload, - "used_state_cache_size" => used_state_cache_size, - ); - Ok(()) }); @@ -155,15 +114,8 @@ where C: Components { Ok(()) }); - let txpool = service.transaction_pool(); - let display_txpool_import = txpool.import_notification_stream().for_each(move |_| { - let status = txpool.status(); - telemetry!(SUBSTRATE_INFO; "txpool.import"; "ready" => status.ready, "future" => status.future); - Ok(()) - }); - - display_notifications.join3(display_block_import, display_txpool_import) - .map(|((), (), ())| ()) + display_notifications.join(display_block_import) + .map(|((), ())| ()) } fn speed(best_number: u64, last_number: Option, last_update: time::Instant) -> String { diff --git a/substrate/core/service/Cargo.toml b/substrate/core/service/Cargo.toml index 471594aad8..21501286d4 100644 --- a/substrate/core/service/Cargo.toml +++ b/substrate/core/service/Cargo.toml @@ -15,6 +15,7 @@ tokio = "0.1.7" exit-future = "0.1" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" +sysinfo = "0.8.0" target_info = "0.1" keystore = { package = "substrate-keystore", path = "../../core/keystore" } sr-io = { path = "../../core/sr-io" } diff --git a/substrate/core/service/src/lib.rs b/substrate/core/service/src/lib.rs index 92ddcd0b58..d1e0067a62 100644 --- a/substrate/core/service/src/lib.rs +++ b/substrate/core/service/src/lib.rs @@ -31,7 +31,7 @@ use std::collections::HashMap; use futures::sync::mpsc; use parking_lot::Mutex; -use client::BlockchainEvents; +use client::{BlockchainEvents, backend::Backend}; use exit_future::Signal; use futures::prelude::*; use keystore::Store as Keystore; @@ -41,6 +41,8 @@ use primitives::Pair; use runtime_primitives::generic::BlockId; use runtime_primitives::traits::{Header, SaturatedConversion}; use substrate_executor::NativeExecutor; +use network::SyncProvider; +use sysinfo::{get_current_pid, ProcessExt, System, SystemExt}; use tel::{telemetry, SUBSTRATE_INFO}; pub use self::error::Error; @@ -305,11 +307,17 @@ impl Service { { // extrinsic notifications let network = Arc::downgrade(&network); + let transaction_pool_ = transaction_pool.clone(); let events = transaction_pool.import_notification_stream() .for_each(move |_| { if let Some(network) = network.upgrade() { network.trigger_repropagate(); } + let status = transaction_pool_.status(); + telemetry!(SUBSTRATE_INFO; "txpool.import"; + "ready" => status.ready, + "future" => status.future + ); Ok(()) }) .select(exit.clone()) @@ -318,6 +326,56 @@ impl Service { task_executor.spawn(events); } + // Periodically notify the telemetry. + let transaction_pool_ = transaction_pool.clone(); + let client_ = client.clone(); + let network_ = network.clone(); + let mut sys = System::new(); + let self_pid = get_current_pid(); + task_executor.spawn(network.status().for_each(move |sync_status| { + let info = client_.info(); + let best_number = info.chain.best_number.saturated_into::(); + let best_hash = info.chain.best_hash; + let num_peers = sync_status.num_peers; + let txpool_status = transaction_pool_.status(); + let finalized_number: u64 = info.chain.finalized_number.saturated_into::(); + let bandwidth_download = network_.average_download_per_sec(); + let bandwidth_upload = network_.average_upload_per_sec(); + + #[allow(deprecated)] + let backend = (*client_).backend(); + let used_state_cache_size = match backend.used_state_cache_size(){ + Some(size) => size, + None => 0, + }; + + // get cpu usage and memory usage of this process + let (cpu_usage, memory) = if sys.refresh_process(self_pid) { + let proc = sys.get_process(self_pid).expect("Above refresh_process succeeds, this should be Some(), qed"); + (proc.cpu_usage(), proc.memory()) + } else { (0.0, 0) }; + + let network_state = network_.network_state(); + + telemetry!( + SUBSTRATE_INFO; + "system.interval"; + "network_state" => network_state, + "peers" => num_peers, + "height" => best_number, + "best" => ?best_hash, + "txcount" => txpool_status.ready, + "cpu" => cpu_usage, + "memory" => memory, + "finalized_height" => finalized_number, + "finalized_hash" => ?info.chain.finalized_hash, + "bandwidth_download" => bandwidth_download, + "bandwidth_upload" => bandwidth_upload, + "used_state_cache_size" => used_state_cache_size, + ); + + Ok(()) + }).select(exit.clone()).then(|_| Ok(()))); // RPC let system_info = rpc::apis::system::SystemInfo {