mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-05-30 15:11:02 +00:00
Move the telemetry information to service (#2846)
* Move the telemetry information to service * Proper exit
This commit is contained in:
committed by
Gavin Wood
parent
ce302390dd
commit
68f4d11df3
Generated
+1
-1
@@ -3980,7 +3980,6 @@ dependencies = [
|
|||||||
"substrate-service 2.0.0",
|
"substrate-service 2.0.0",
|
||||||
"substrate-state-machine 2.0.0",
|
"substrate-state-machine 2.0.0",
|
||||||
"substrate-telemetry 2.0.0",
|
"substrate-telemetry 2.0.0",
|
||||||
"sysinfo 0.8.5 (registry+https://github.com/rust-lang/crates.io-index)",
|
|
||||||
"tempdir 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)",
|
"tempdir 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
"time 0.1.42 (registry+https://github.com/rust-lang/crates.io-index)",
|
"time 0.1.42 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
"tokio 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)",
|
"tokio 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
@@ -4523,6 +4522,7 @@ dependencies = [
|
|||||||
"substrate-telemetry 2.0.0",
|
"substrate-telemetry 2.0.0",
|
||||||
"substrate-test-runtime-client 2.0.0",
|
"substrate-test-runtime-client 2.0.0",
|
||||||
"substrate-transaction-pool 2.0.0",
|
"substrate-transaction-pool 2.0.0",
|
||||||
|
"sysinfo 0.8.5 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
"target_info 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
"target_info 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
"tokio 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)",
|
"tokio 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
]
|
]
|
||||||
|
|||||||
@@ -21,7 +21,6 @@ futures = "0.1.17"
|
|||||||
fdlimit = "0.1"
|
fdlimit = "0.1"
|
||||||
exit-future = "0.1"
|
exit-future = "0.1"
|
||||||
serde_json = "1.0"
|
serde_json = "1.0"
|
||||||
sysinfo = "0.8.0"
|
|
||||||
panic-handler = { package = "substrate-panic-handler", path = "../../core/panic-handler" }
|
panic-handler = { package = "substrate-panic-handler", path = "../../core/panic-handler" }
|
||||||
client = { package = "substrate-client", path = "../../core/client" }
|
client = { package = "substrate-client", path = "../../core/client" }
|
||||||
network = { package = "substrate-network", path = "../../core/network" }
|
network = { package = "substrate-network", path = "../../core/network" }
|
||||||
|
|||||||
@@ -22,10 +22,8 @@ use std::time;
|
|||||||
use futures::{Future, Stream};
|
use futures::{Future, Stream};
|
||||||
use service::{Service, Components};
|
use service::{Service, Components};
|
||||||
use tokio::runtime::TaskExecutor;
|
use tokio::runtime::TaskExecutor;
|
||||||
use sysinfo::{get_current_pid, ProcessExt, System, SystemExt};
|
|
||||||
use network::{SyncState, SyncProvider};
|
use network::{SyncState, SyncProvider};
|
||||||
use client::{backend::Backend, BlockchainEvents};
|
use client::{backend::Backend, BlockchainEvents};
|
||||||
use substrate_telemetry::{telemetry, SUBSTRATE_INFO};
|
|
||||||
use log::{info, warn};
|
use log::{info, warn};
|
||||||
|
|
||||||
use runtime_primitives::generic::BlockId;
|
use runtime_primitives::generic::BlockId;
|
||||||
@@ -44,19 +42,14 @@ pub fn build<C>(service: &Service<C>) -> impl Future<Item = (), Error = ()>
|
|||||||
where C: Components {
|
where C: Components {
|
||||||
let network = service.network();
|
let network = service.network();
|
||||||
let client = service.client();
|
let client = service.client();
|
||||||
let txpool = service.transaction_pool();
|
|
||||||
let mut last_number = None;
|
let mut last_number = None;
|
||||||
let mut last_update = time::Instant::now();
|
let mut last_update = time::Instant::now();
|
||||||
|
|
||||||
let mut sys = System::new();
|
|
||||||
let self_pid = get_current_pid();
|
|
||||||
|
|
||||||
let display_notifications = network.status().for_each(move |sync_status| {
|
let display_notifications = network.status().for_each(move |sync_status| {
|
||||||
|
|
||||||
let info = client.info();
|
let info = client.info();
|
||||||
let best_number = info.chain.best_number.saturated_into::<u64>();
|
let best_number = info.chain.best_number.saturated_into::<u64>();
|
||||||
let best_hash = info.chain.best_hash;
|
let best_hash = info.chain.best_hash;
|
||||||
let num_peers = sync_status.num_peers;
|
|
||||||
let speed = move || speed(best_number, last_number, last_update);
|
let speed = move || speed(best_number, last_number, last_update);
|
||||||
last_update = time::Instant::now();
|
last_update = time::Instant::now();
|
||||||
let (status, target) = match (sync_status.sync.state, sync_status.sync.best_seen_block) {
|
let (status, target) = match (sync_status.sync.state, sync_status.sync.best_seen_block) {
|
||||||
@@ -65,7 +58,6 @@ where C: Components {
|
|||||||
(SyncState::Downloading, Some(n)) => (format!("Syncing{}", speed()), format!(", target=#{}", n)),
|
(SyncState::Downloading, Some(n)) => (format!("Syncing{}", speed()), format!(", target=#{}", n)),
|
||||||
};
|
};
|
||||||
last_number = Some(best_number);
|
last_number = Some(best_number);
|
||||||
let txpool_status = txpool.status();
|
|
||||||
let finalized_number: u64 = info.chain.finalized_number.saturated_into::<u64>();
|
let finalized_number: u64 = info.chain.finalized_number.saturated_into::<u64>();
|
||||||
let bandwidth_download = network.average_download_per_sec();
|
let bandwidth_download = network.average_download_per_sec();
|
||||||
let bandwidth_upload = network.average_upload_per_sec();
|
let bandwidth_upload = network.average_upload_per_sec();
|
||||||
@@ -83,39 +75,6 @@ where C: Components {
|
|||||||
TransferRateFormat(bandwidth_upload),
|
TransferRateFormat(bandwidth_upload),
|
||||||
);
|
);
|
||||||
|
|
||||||
#[allow(deprecated)]
|
|
||||||
let backend = (*client).backend();
|
|
||||||
let used_state_cache_size = match backend.used_state_cache_size(){
|
|
||||||
Some(size) => size,
|
|
||||||
None => 0,
|
|
||||||
};
|
|
||||||
|
|
||||||
// get cpu usage and memory usage of this process
|
|
||||||
let (cpu_usage, memory) = if sys.refresh_process(self_pid) {
|
|
||||||
let proc = sys.get_process(self_pid).expect("Above refresh_process succeeds, this should be Some(), qed");
|
|
||||||
(proc.cpu_usage(), proc.memory())
|
|
||||||
} else { (0.0, 0) };
|
|
||||||
|
|
||||||
let network_state = network.network_state();
|
|
||||||
|
|
||||||
telemetry!(
|
|
||||||
SUBSTRATE_INFO;
|
|
||||||
"system.interval";
|
|
||||||
"network_state" => network_state,
|
|
||||||
"status" => format!("{}{}", status, target),
|
|
||||||
"peers" => num_peers,
|
|
||||||
"height" => best_number,
|
|
||||||
"best" => ?best_hash,
|
|
||||||
"txcount" => txpool_status.ready,
|
|
||||||
"cpu" => cpu_usage,
|
|
||||||
"memory" => memory,
|
|
||||||
"finalized_height" => finalized_number,
|
|
||||||
"finalized_hash" => ?info.chain.finalized_hash,
|
|
||||||
"bandwidth_download" => bandwidth_download,
|
|
||||||
"bandwidth_upload" => bandwidth_upload,
|
|
||||||
"used_state_cache_size" => used_state_cache_size,
|
|
||||||
);
|
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
});
|
});
|
||||||
|
|
||||||
@@ -155,15 +114,8 @@ where C: Components {
|
|||||||
Ok(())
|
Ok(())
|
||||||
});
|
});
|
||||||
|
|
||||||
let txpool = service.transaction_pool();
|
display_notifications.join(display_block_import)
|
||||||
let display_txpool_import = txpool.import_notification_stream().for_each(move |_| {
|
.map(|((), ())| ())
|
||||||
let status = txpool.status();
|
|
||||||
telemetry!(SUBSTRATE_INFO; "txpool.import"; "ready" => status.ready, "future" => status.future);
|
|
||||||
Ok(())
|
|
||||||
});
|
|
||||||
|
|
||||||
display_notifications.join3(display_block_import, display_txpool_import)
|
|
||||||
.map(|((), (), ())| ())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn speed(best_number: u64, last_number: Option<u64>, last_update: time::Instant) -> String {
|
fn speed(best_number: u64, last_number: Option<u64>, last_update: time::Instant) -> String {
|
||||||
|
|||||||
@@ -15,6 +15,7 @@ tokio = "0.1.7"
|
|||||||
exit-future = "0.1"
|
exit-future = "0.1"
|
||||||
serde = { version = "1.0", features = ["derive"] }
|
serde = { version = "1.0", features = ["derive"] }
|
||||||
serde_json = "1.0"
|
serde_json = "1.0"
|
||||||
|
sysinfo = "0.8.0"
|
||||||
target_info = "0.1"
|
target_info = "0.1"
|
||||||
keystore = { package = "substrate-keystore", path = "../../core/keystore" }
|
keystore = { package = "substrate-keystore", path = "../../core/keystore" }
|
||||||
sr-io = { path = "../../core/sr-io" }
|
sr-io = { path = "../../core/sr-io" }
|
||||||
|
|||||||
@@ -31,7 +31,7 @@ use std::collections::HashMap;
|
|||||||
use futures::sync::mpsc;
|
use futures::sync::mpsc;
|
||||||
use parking_lot::Mutex;
|
use parking_lot::Mutex;
|
||||||
|
|
||||||
use client::BlockchainEvents;
|
use client::{BlockchainEvents, backend::Backend};
|
||||||
use exit_future::Signal;
|
use exit_future::Signal;
|
||||||
use futures::prelude::*;
|
use futures::prelude::*;
|
||||||
use keystore::Store as Keystore;
|
use keystore::Store as Keystore;
|
||||||
@@ -41,6 +41,8 @@ use primitives::Pair;
|
|||||||
use runtime_primitives::generic::BlockId;
|
use runtime_primitives::generic::BlockId;
|
||||||
use runtime_primitives::traits::{Header, SaturatedConversion};
|
use runtime_primitives::traits::{Header, SaturatedConversion};
|
||||||
use substrate_executor::NativeExecutor;
|
use substrate_executor::NativeExecutor;
|
||||||
|
use network::SyncProvider;
|
||||||
|
use sysinfo::{get_current_pid, ProcessExt, System, SystemExt};
|
||||||
use tel::{telemetry, SUBSTRATE_INFO};
|
use tel::{telemetry, SUBSTRATE_INFO};
|
||||||
|
|
||||||
pub use self::error::Error;
|
pub use self::error::Error;
|
||||||
@@ -305,11 +307,17 @@ impl<Components: components::Components> Service<Components> {
|
|||||||
{
|
{
|
||||||
// extrinsic notifications
|
// extrinsic notifications
|
||||||
let network = Arc::downgrade(&network);
|
let network = Arc::downgrade(&network);
|
||||||
|
let transaction_pool_ = transaction_pool.clone();
|
||||||
let events = transaction_pool.import_notification_stream()
|
let events = transaction_pool.import_notification_stream()
|
||||||
.for_each(move |_| {
|
.for_each(move |_| {
|
||||||
if let Some(network) = network.upgrade() {
|
if let Some(network) = network.upgrade() {
|
||||||
network.trigger_repropagate();
|
network.trigger_repropagate();
|
||||||
}
|
}
|
||||||
|
let status = transaction_pool_.status();
|
||||||
|
telemetry!(SUBSTRATE_INFO; "txpool.import";
|
||||||
|
"ready" => status.ready,
|
||||||
|
"future" => status.future
|
||||||
|
);
|
||||||
Ok(())
|
Ok(())
|
||||||
})
|
})
|
||||||
.select(exit.clone())
|
.select(exit.clone())
|
||||||
@@ -318,6 +326,56 @@ impl<Components: components::Components> Service<Components> {
|
|||||||
task_executor.spawn(events);
|
task_executor.spawn(events);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Periodically notify the telemetry.
|
||||||
|
let transaction_pool_ = transaction_pool.clone();
|
||||||
|
let client_ = client.clone();
|
||||||
|
let network_ = network.clone();
|
||||||
|
let mut sys = System::new();
|
||||||
|
let self_pid = get_current_pid();
|
||||||
|
task_executor.spawn(network.status().for_each(move |sync_status| {
|
||||||
|
let info = client_.info();
|
||||||
|
let best_number = info.chain.best_number.saturated_into::<u64>();
|
||||||
|
let best_hash = info.chain.best_hash;
|
||||||
|
let num_peers = sync_status.num_peers;
|
||||||
|
let txpool_status = transaction_pool_.status();
|
||||||
|
let finalized_number: u64 = info.chain.finalized_number.saturated_into::<u64>();
|
||||||
|
let bandwidth_download = network_.average_download_per_sec();
|
||||||
|
let bandwidth_upload = network_.average_upload_per_sec();
|
||||||
|
|
||||||
|
#[allow(deprecated)]
|
||||||
|
let backend = (*client_).backend();
|
||||||
|
let used_state_cache_size = match backend.used_state_cache_size(){
|
||||||
|
Some(size) => size,
|
||||||
|
None => 0,
|
||||||
|
};
|
||||||
|
|
||||||
|
// get cpu usage and memory usage of this process
|
||||||
|
let (cpu_usage, memory) = if sys.refresh_process(self_pid) {
|
||||||
|
let proc = sys.get_process(self_pid).expect("Above refresh_process succeeds, this should be Some(), qed");
|
||||||
|
(proc.cpu_usage(), proc.memory())
|
||||||
|
} else { (0.0, 0) };
|
||||||
|
|
||||||
|
let network_state = network_.network_state();
|
||||||
|
|
||||||
|
telemetry!(
|
||||||
|
SUBSTRATE_INFO;
|
||||||
|
"system.interval";
|
||||||
|
"network_state" => network_state,
|
||||||
|
"peers" => num_peers,
|
||||||
|
"height" => best_number,
|
||||||
|
"best" => ?best_hash,
|
||||||
|
"txcount" => txpool_status.ready,
|
||||||
|
"cpu" => cpu_usage,
|
||||||
|
"memory" => memory,
|
||||||
|
"finalized_height" => finalized_number,
|
||||||
|
"finalized_hash" => ?info.chain.finalized_hash,
|
||||||
|
"bandwidth_download" => bandwidth_download,
|
||||||
|
"bandwidth_upload" => bandwidth_upload,
|
||||||
|
"used_state_cache_size" => used_state_cache_size,
|
||||||
|
);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}).select(exit.clone()).then(|_| Ok(())));
|
||||||
|
|
||||||
// RPC
|
// RPC
|
||||||
let system_info = rpc::apis::system::SystemInfo {
|
let system_info = rpc::apis::system::SystemInfo {
|
||||||
|
|||||||
Reference in New Issue
Block a user