Refactor & detach network metrics. (#6986)

* Refactor sc-network/service metrics.

  1. Aggregate sc-network metrics into a submodule, introducing
  two more sourced metrics to avoid duplicate atomics.

  2. Decouple periodic sc-service network metrics from other
  metrics, so that they can be updated independently.

* Update client/service/src/metrics.rs

* Update client/service/src/metrics.rs
This commit is contained in:
Roman Borschel
2020-09-06 19:59:05 +02:00
committed by GitHub
parent e6ce3e7ac0
commit ec47877288
8 changed files with 571 additions and 403 deletions
+14 -55
View File
@@ -17,10 +17,10 @@
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use crate::{
NetworkStatus, NetworkState, error::Error, DEFAULT_PROTOCOL_ID, MallocSizeOfWasm,
error::Error, DEFAULT_PROTOCOL_ID, MallocSizeOfWasm,
TelemetryConnectionSinks, RpcHandlers, NetworkStatusSinks,
start_rpc_servers, build_network_future, TransactionPoolAdapter, TaskManager, SpawnTaskHandle,
status_sinks, metrics::MetricsService,
metrics::MetricsService,
client::{light, Client, ClientConfig},
config::{Configuration, KeystoreConfig, PrometheusConfig},
};
@@ -472,7 +472,9 @@ pub fn spawn_tasks<TBl, TBackend, TExPool, TRpc, TCl>(
transaction_pool,
rpc_extensions_builder,
remote_blockchain,
network, network_status_sinks, system_rpc_tx,
network,
network_status_sinks,
system_rpc_tx,
telemetry_connection_sinks,
} = params;
@@ -521,15 +523,13 @@ pub fn spawn_tasks<TBl, TBackend, TExPool, TRpc, TCl>(
MetricsService::new()
};
// Periodically notify the telemetry.
spawn_handle.spawn("telemetry-periodic-send", telemetry_periodic_send(
client.clone(), transaction_pool.clone(), metrics_service, network_status_sinks.clone()
));
// Periodically send the network state to the telemetry.
spawn_handle.spawn(
"telemetry-periodic-network-state",
telemetry_periodic_network_state(network_status_sinks.clone()),
// Periodically updated metrics and telemetry updates.
spawn_handle.spawn("telemetry-periodic-send",
metrics_service.run(
client.clone(),
transaction_pool.clone(),
network_status_sinks.clone()
)
);
// RPC
@@ -574,7 +574,7 @@ pub fn spawn_tasks<TBl, TBackend, TExPool, TRpc, TCl>(
// Spawn informant task
spawn_handle.spawn("informant", sc_informant::build(
client.clone(),
network_status_sinks.clone().0,
network_status_sinks.status.clone(),
transaction_pool.clone(),
config.informant_output_format,
));
@@ -606,47 +606,6 @@ async fn transaction_notifications<TBl, TExPool>(
.await;
}
// Periodically notify the telemetry.
async fn telemetry_periodic_send<TBl, TExPool, TCl>(
client: Arc<TCl>,
transaction_pool: Arc<TExPool>,
mut metrics_service: MetricsService,
network_status_sinks: NetworkStatusSinks<TBl>,
)
where
TBl: BlockT,
TCl: ProvideRuntimeApi<TBl> + UsageProvider<TBl>,
TExPool: MaintainedTransactionPool<Block=TBl, Hash = <TBl as BlockT>::Hash>,
{
let (state_tx, state_rx) = tracing_unbounded::<(NetworkStatus<_>, NetworkState)>("mpsc_netstat1");
network_status_sinks.0.push(std::time::Duration::from_millis(5000), state_tx);
state_rx.for_each(move |(net_status, _)| {
let info = client.usage_info();
metrics_service.tick(
&info,
&transaction_pool.status(),
&net_status,
);
ready(())
}).await;
}
async fn telemetry_periodic_network_state<TBl: BlockT>(
network_status_sinks: NetworkStatusSinks<TBl>,
) {
// Periodically send the network state to the telemetry.
let (netstat_tx, netstat_rx) = tracing_unbounded::<(NetworkStatus<_>, NetworkState)>("mpsc_netstat2");
network_status_sinks.0.push(std::time::Duration::from_secs(30), netstat_tx);
netstat_rx.for_each(move |(_, network_state)| {
telemetry!(
SUBSTRATE_INFO;
"system.network_state";
"state" => network_state,
);
ready(())
}).await;
}
fn build_telemetry<TBl: BlockT>(
config: &mut Configuration,
endpoints: sc_telemetry::TelemetryEndpoints,
@@ -887,7 +846,7 @@ pub fn build_network<TBl, TExPool, TImpQu, TCl>(
let has_bootnodes = !network_params.network_config.boot_nodes.is_empty();
let network_mut = sc_network::NetworkWorker::new(network_params)?;
let network = network_mut.service().clone();
let network_status_sinks = NetworkStatusSinks::new(Arc::new(status_sinks::StatusSinks::new()));
let network_status_sinks = NetworkStatusSinks::new();
let (system_rpc_tx, system_rpc_rx) = tracing_unbounded("mpsc_system_rpc");
+34 -25
View File
@@ -126,24 +126,37 @@ impl RpcHandlers {
/// Sinks to propagate network status updates.
/// For each element, every time the `Interval` fires we push an element on the sender.
#[derive(Clone)]
pub struct NetworkStatusSinks<Block: BlockT>(
Arc<status_sinks::StatusSinks<(NetworkStatus<Block>, NetworkState)>>,
);
pub struct NetworkStatusSinks<Block: BlockT> {
status: Arc<status_sinks::StatusSinks<NetworkStatus<Block>>>,
state: Arc<status_sinks::StatusSinks<NetworkState>>,
}
impl<Block: BlockT> NetworkStatusSinks<Block> {
fn new(
sinks: Arc<status_sinks::StatusSinks<(NetworkStatus<Block>, NetworkState)>>
) -> Self {
Self(sinks)
fn new() -> Self {
Self {
status: Arc::new(status_sinks::StatusSinks::new()),
state: Arc::new(status_sinks::StatusSinks::new()),
}
}
/// Returns a receiver that periodically receives a status of the network.
pub fn network_status(&self, interval: Duration)
-> TracingUnboundedReceiver<(NetworkStatus<Block>, NetworkState)> {
/// Returns a receiver that periodically yields a [`NetworkStatus`].
pub fn status_stream(&self, interval: Duration)
-> TracingUnboundedReceiver<NetworkStatus<Block>>
{
let (sink, stream) = tracing_unbounded("mpsc_network_status");
self.0.push(interval, sink);
self.status.push(interval, sink);
stream
}
/// Returns a receiver that periodically yields a [`NetworkState`].
pub fn state_stream(&self, interval: Duration)
-> TracingUnboundedReceiver<NetworkState>
{
let (sink, stream) = tracing_unbounded("mpsc_network_state");
self.state.push(interval, sink);
stream
}
}
/// Sinks to propagate telemetry connection established events.
@@ -319,20 +332,16 @@ async fn build_network_future<
// the network.
_ = (&mut network).fuse() => {}
// At a regular interval, we send the state of the network on what is called
// the "status sinks".
ready_sink = status_sinks.0.next().fuse() => {
let status = NetworkStatus {
sync_state: network.sync_state(),
best_seen_block: network.best_seen_block(),
num_sync_peers: network.num_sync_peers(),
num_connected_peers: network.num_connected_peers(),
num_active_peers: network.num_active_peers(),
total_bytes_inbound: network.total_bytes_inbound(),
total_bytes_outbound: network.total_bytes_outbound(),
};
let state = network.network_state();
ready_sink.send((status, state));
// At a regular interval, we send high-level status as well as
// detailed state information of the network on what are called
// "status sinks".
status_sink = status_sinks.status.next().fuse() => {
status_sink.send(network.status());
}
state_sink = status_sinks.state.next().fuse() => {
state_sink.send(network.network_state());
}
}
}
+141 -39
View File
@@ -18,14 +18,19 @@
use std::{convert::TryFrom, time::SystemTime};
use crate::{NetworkStatus, config::Configuration};
use crate::{NetworkStatus, NetworkState, NetworkStatusSinks, config::Configuration};
use futures_timer::Delay;
use prometheus_endpoint::{register, Gauge, U64, Registry, PrometheusError, Opts, GaugeVec};
use sc_telemetry::{telemetry, SUBSTRATE_INFO};
use sp_api::ProvideRuntimeApi;
use sp_runtime::traits::{NumberFor, Block, SaturatedConversion, UniqueSaturatedInto};
use sp_transaction_pool::PoolStatus;
use sp_transaction_pool::{PoolStatus, MaintainedTransactionPool};
use sp_utils::metrics::register_globals;
use sc_client_api::ClientInfo;
use sp_utils::mpsc::TracingUnboundedReceiver;
use sc_client_api::{ClientInfo, UsageProvider};
use sc_network::config::Role;
use std::sync::Arc;
use std::time::Duration;
use wasm_timer::Instant;
struct PrometheusMetrics {
@@ -99,6 +104,9 @@ impl PrometheusMetrics {
}
}
/// A `MetricsService` periodically sends general client and
/// network state to the telemetry as well as (optionally)
/// a Prometheus endpoint.
pub struct MetricsService {
metrics: Option<PrometheusMetrics>,
last_update: Instant,
@@ -107,6 +115,8 @@ pub struct MetricsService {
}
impl MetricsService {
/// Creates a `MetricsService` that only sends information
/// to the telemetry.
pub fn new() -> Self {
MetricsService {
metrics: None,
@@ -116,6 +126,8 @@ impl MetricsService {
}
}
/// Creates a `MetricsService` that sends metrics
/// to prometheus alongside the telemetry.
pub fn with_prometheus(
registry: &Registry,
config: &Configuration,
@@ -141,60 +153,109 @@ impl MetricsService {
})
}
pub fn tick<T: Block>(
/// Returns a never-ending `Future` that performs the
/// metric and telemetry updates with information from
/// the given sources.
pub async fn run<TBl, TExPool, TCl>(
mut self,
client: Arc<TCl>,
transactions: Arc<TExPool>,
network: NetworkStatusSinks<TBl>,
) where
TBl: Block,
TCl: ProvideRuntimeApi<TBl> + UsageProvider<TBl>,
TExPool: MaintainedTransactionPool<Block = TBl, Hash = <TBl as Block>::Hash>,
{
let mut timer = Delay::new(Duration::from_secs(0));
let timer_interval = Duration::from_secs(5);
// Metric and telemetry update interval.
let net_status_interval = timer_interval;
let net_state_interval = Duration::from_secs(30);
// Source of network information.
let mut net_status_rx = Some(network.status_stream(net_status_interval));
let mut net_state_rx = Some(network.state_stream(net_state_interval));
loop {
// Wait for the next tick of the timer.
(&mut timer).await;
// Try to get the latest network information.
let mut net_status = None;
let mut net_state = None;
if let Some(rx) = net_status_rx.as_mut() {
match Self::latest(rx) {
Ok(status) => { net_status = status; }
Err(()) => { net_status_rx = None; }
}
}
if let Some(rx) = net_state_rx.as_mut() {
match Self::latest(rx) {
Ok(state) => { net_state = state; }
Err(()) => { net_state_rx = None; }
}
}
// Update / Send the metrics.
self.update(
&client.usage_info(),
&transactions.status(),
net_status,
net_state,
);
// Schedule next tick.
timer.reset(timer_interval);
}
}
// Try to get the latest value from a receiver, dropping intermediate values.
fn latest<T>(rx: &mut TracingUnboundedReceiver<T>) -> Result<Option<T>, ()> {
let mut value = None;
while let Ok(next) = rx.try_next() {
match next {
Some(v) => {
value = Some(v)
}
None => {
log::error!("Receiver closed unexpectedly.");
return Err(())
}
}
}
Ok(value)
}
fn update<T: Block>(
&mut self,
info: &ClientInfo<T>,
txpool_status: &PoolStatus,
net_status: &NetworkStatus<T>,
net_status: Option<NetworkStatus<T>>,
net_state: Option<NetworkState>,
) {
let now = Instant::now();
let elapsed = (now - self.last_update).as_secs();
self.last_update = now;
let best_number = info.chain.best_number.saturated_into::<u64>();
let best_hash = info.chain.best_hash;
let num_peers = net_status.num_connected_peers;
let finalized_number: u64 = info.chain.finalized_number.saturated_into::<u64>();
let total_bytes_inbound = net_status.total_bytes_inbound;
let total_bytes_outbound = net_status.total_bytes_outbound;
let best_seen_block = net_status
.best_seen_block
.map(|num: NumberFor<T>| num.unique_saturated_into() as u64);
let diff_bytes_inbound = total_bytes_inbound - self.last_total_bytes_inbound;
let diff_bytes_outbound = total_bytes_outbound - self.last_total_bytes_outbound;
let (avg_bytes_per_sec_inbound, avg_bytes_per_sec_outbound) =
if elapsed > 0 {
self.last_total_bytes_inbound = total_bytes_inbound;
self.last_total_bytes_outbound = total_bytes_outbound;
(diff_bytes_inbound / elapsed, diff_bytes_outbound / elapsed)
} else {
(diff_bytes_inbound, diff_bytes_outbound)
};
self.last_update = now;
// Update/send metrics that are always available.
telemetry!(
SUBSTRATE_INFO;
"system.interval";
"peers" => num_peers,
"height" => best_number,
"best" => ?best_hash,
"txcount" => txpool_status.ready,
"finalized_height" => finalized_number,
"finalized_hash" => ?info.chain.finalized_hash,
"bandwidth_download" => avg_bytes_per_sec_inbound,
"bandwidth_upload" => avg_bytes_per_sec_outbound,
"used_state_cache_size" => info.usage.as_ref()
.map(|usage| usage.memory.state_cache.as_bytes())
.unwrap_or(0),
"used_db_cache_size" => info.usage.as_ref()
.map(|usage| usage.memory.database_cache.as_bytes())
.unwrap_or(0),
"disk_read_per_sec" => info.usage.as_ref()
.map(|usage| usage.io.bytes_read)
.unwrap_or(0),
"disk_write_per_sec" => info.usage.as_ref()
.map(|usage| usage.io.bytes_written)
.unwrap_or(0),
);
if let Some(metrics) = self.metrics.as_ref() {
@@ -213,10 +274,6 @@ impl MetricsService {
metrics.ready_transactions_number.set(txpool_status.ready as u64);
if let Some(best_seen_block) = best_seen_block {
metrics.block_height.with_label_values(&["sync_target"]).set(best_seen_block);
}
if let Some(info) = info.usage.as_ref() {
metrics.database_cache.set(info.memory.database_cache.as_bytes() as u64);
metrics.state_cache.set(info.memory.state_cache.as_bytes() as u64);
@@ -232,5 +289,50 @@ impl MetricsService {
);
}
}
// Update/send network status information, if any.
if let Some(net_status) = net_status {
let num_peers = net_status.num_connected_peers;
let total_bytes_inbound = net_status.total_bytes_inbound;
let total_bytes_outbound = net_status.total_bytes_outbound;
let diff_bytes_inbound = total_bytes_inbound - self.last_total_bytes_inbound;
let diff_bytes_outbound = total_bytes_outbound - self.last_total_bytes_outbound;
let (avg_bytes_per_sec_inbound, avg_bytes_per_sec_outbound) =
if elapsed > 0 {
self.last_total_bytes_inbound = total_bytes_inbound;
self.last_total_bytes_outbound = total_bytes_outbound;
(diff_bytes_inbound / elapsed, diff_bytes_outbound / elapsed)
} else {
(diff_bytes_inbound, diff_bytes_outbound)
};
telemetry!(
SUBSTRATE_INFO;
"system.interval";
"peers" => num_peers,
"bandwidth_download" => avg_bytes_per_sec_inbound,
"bandwidth_upload" => avg_bytes_per_sec_outbound,
);
if let Some(metrics) = self.metrics.as_ref() {
let best_seen_block = net_status
.best_seen_block
.map(|num: NumberFor<T>| num.unique_saturated_into() as u64);
if let Some(best_seen_block) = best_seen_block {
metrics.block_height.with_label_values(&["sync_target"]).set(best_seen_block);
}
}
}
// Send network state information, if any.
if let Some(net_state) = net_state {
telemetry!(
SUBSTRATE_INFO;
"system.network_state";
"state" => net_state,
);
}
}
}