Relay dashboard (#191)

* expose metrics for Prometheus

* added preconfigured configs for Prometheus and Grafana

* metrics-related cli args

* fix compilation
This commit is contained in:
Svyatoslav Nikolsky
2020-07-17 21:48:59 +03:00
committed by Bastian Köcher
parent bebb5e6035
commit 8121b3f82b
15 changed files with 744 additions and 1 deletions
+14
View File
@@ -43,6 +43,17 @@ subcommands:
long: sub-signer-password
value_name: SUB_SIGNER_PASSWORD
help: The password for the SURI of secret key to use when transactions are submitted to the Substrate node.
- no-prometheus: &no-prometheus
long: no-prometheus
help: Do not expose a Prometheus metric endpoint.
- prometheus-host: &prometheus-host
long: prometheus-host
value_name: PROMETHEUS_HOST
help: Expose Prometheus endpoint at given interface.
- prometheus-port: &prometheus-port
long: prometheus-port
value_name: PROMETHEUS_PORT
help: Expose Prometheus endpoint at given port.
- sub-to-eth:
about: Synchronize headers from Substrate node to Ethereum node.
args:
@@ -63,6 +74,9 @@ subcommands:
help: Hex-encoded secret to use when transactions are submitted to the Ethereum node.
- sub-host: *sub-host
- sub-port: *sub-port
- no-prometheus: *no-prometheus
- prometheus-host: *prometheus-host
- prometheus-port: *prometheus-port
- eth-deploy-contract:
about: Deploy Bridge contract on Ethereum node.
args:
@@ -18,6 +18,7 @@
use crate::ethereum_client::{EthereumConnectionParams, EthereumHighLevelRpc, EthereumRpcClient};
use crate::ethereum_types::{EthereumHeaderId, EthereumHeadersSyncPipeline, Header, QueuedEthereumHeader, Receipt};
use crate::metrics::MetricsParams;
use crate::rpc::{EthereumRpc, SubstrateRpc};
use crate::rpc_errors::RpcError;
use crate::substrate_client::{
@@ -61,6 +62,8 @@ pub struct EthereumSyncParams {
pub sub_sign: SubstrateSigningParams,
/// Synchronization parameters.
pub sync_params: HeadersSyncParams,
/// Metrics parameters.
pub metrics_params: Option<MetricsParams>,
}
impl Default for EthereumSyncParams {
@@ -77,6 +80,7 @@ impl Default for EthereumSyncParams {
prune_depth: PRUNE_DEPTH,
target_tx_mode: TargetTransactionMode::Signed,
},
metrics_params: Some(Default::default()),
}
}
}
@@ -204,6 +208,7 @@ pub fn run(params: EthereumSyncParams) -> Result<(), RpcError> {
target,
SUBSTRATE_TICK_INTERVAL,
params.sync_params,
params.metrics_params,
futures::future::pending(),
);
+23
View File
@@ -23,6 +23,7 @@ mod ethereum_sync_loop;
mod ethereum_types;
mod exchange;
mod headers;
mod metrics;
mod rpc;
mod rpc_errors;
mod substrate_client;
@@ -30,6 +31,7 @@ mod substrate_sync_loop;
mod substrate_types;
mod sync;
mod sync_loop;
mod sync_loop_metrics;
mod sync_loop_tests;
mod sync_types;
mod utils;
@@ -199,6 +201,7 @@ fn ethereum_sync_params(matches: &clap::ArgMatches) -> Result<EthereumSyncParams
eth_sync_params.eth = ethereum_connection_params(matches)?;
eth_sync_params.sub = substrate_connection_params(matches)?;
eth_sync_params.sub_sign = substrate_signing_params(matches)?;
eth_sync_params.metrics_params = metrics_params(matches)?;
match matches.value_of("sub-tx-mode") {
Some("signed") => eth_sync_params.sync_params.target_tx_mode = sync::TargetTransactionMode::Signed,
@@ -223,6 +226,7 @@ fn substrate_sync_params(matches: &clap::ArgMatches) -> Result<SubstrateSyncPara
sub_sync_params.eth = ethereum_connection_params(matches)?;
sub_sync_params.eth_sign = ethereum_signing_params(matches)?;
sub_sync_params.sub = substrate_connection_params(matches)?;
sub_sync_params.metrics_params = metrics_params(matches)?;
if let Some(eth_contract) = matches.value_of("eth-contract") {
sub_sync_params.eth_contract_address = eth_contract.parse().map_err(|e| format!("{}", e))?;
@@ -265,3 +269,22 @@ fn ethereum_exchange_params(matches: &clap::ArgMatches) -> Result<ethereum_excha
Ok(params)
}
fn metrics_params(matches: &clap::ArgMatches) -> Result<Option<metrics::MetricsParams>, String> {
if matches.is_present("no-prometheus") {
return Ok(None);
}
let mut metrics_params = metrics::MetricsParams::default();
if let Some(prometheus_host) = matches.value_of("prometheus-host") {
metrics_params.host = prometheus_host.into();
}
if let Some(prometheus_port) = matches.value_of("prometheus-port") {
metrics_params.port = prometheus_port
.parse()
.map_err(|e| format!("Failed to parse prometheus-port: {}", e))?;
}
Ok(Some(metrics_params))
}
+119
View File
@@ -0,0 +1,119 @@
// Copyright 2019-2020 Parity Technologies (UK) Ltd.
// This file is part of Parity Bridges Common.
// Parity Bridges Common is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Bridges Common is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
pub use substrate_prometheus_endpoint::{register, Gauge, GaugeVec, Opts, Registry, F64, U64};
use std::net::SocketAddr;
use substrate_prometheus_endpoint::init_prometheus;
use sysinfo::{ProcessExt, System, SystemExt};
/// Prometheus endpoint MetricsParams.
#[derive(Debug, Clone)]
pub struct MetricsParams {
/// Serve HTTP requests at given host.
pub host: String,
/// Serve HTTP requests at given port.
pub port: u16,
}
/// Global Prometheus metrics.
#[derive(Debug)]
pub struct GlobalMetrics {
system: System,
system_average_load: GaugeVec<F64>,
process_cpu_usage_percentage: Gauge<F64>,
process_memory_usage_bytes: Gauge<U64>,
}
/// Start Prometheus endpoint with given metrics registry.
pub async fn start(params: MetricsParams, registry: Registry) -> Result<(), String> {
init_prometheus(
SocketAddr::new(
params
.host
.parse()
.map_err(|err| format!("Invalid Prometheus host {}: {}", params.host, err))?,
params.port,
),
registry,
)
.await
.map_err(|err| format!("Error starting Prometheus endpoint: {}", err))
}
impl Default for MetricsParams {
fn default() -> Self {
MetricsParams {
host: "127.0.0.1".into(),
port: 9616,
}
}
}
impl GlobalMetrics {
/// Creates global metrics.
pub fn new() -> Self {
GlobalMetrics {
system: System::new(),
system_average_load: GaugeVec::new(Opts::new("system_average_load", "System load average"), &["over"])
.expect("metric is static and thus valid; qed"),
process_cpu_usage_percentage: Gauge::new("process_cpu_usage_percentage", "Process CPU usage")
.expect("metric is static and thus valid; qed"),
process_memory_usage_bytes: Gauge::new(
"process_memory_usage_bytes",
"Process memory (resident set size) usage",
)
.expect("metric is static and thus valid; qed"),
}
}
/// Registers global metrics in the metrics registry.
pub fn register(&self, registry: &Registry) -> Result<(), String> {
register(self.system_average_load.clone(), registry).map_err(|e| e.to_string())?;
register(self.process_cpu_usage_percentage.clone(), registry).map_err(|e| e.to_string())?;
register(self.process_memory_usage_bytes.clone(), registry).map_err(|e| e.to_string())?;
Ok(())
}
/// Update metrics.
pub fn update(&mut self) {
// update system-wide metrics
let load = self.system.get_load_average();
self.system_average_load.with_label_values(&["1min"]).set(load.one);
self.system_average_load.with_label_values(&["5min"]).set(load.five);
self.system_average_load.with_label_values(&["15min"]).set(load.fifteen);
// update process-related metrics
let pid = sysinfo::get_current_pid().expect(
"only fails where pid is unavailable (os=unknown || arch=wasm32);\
relay is not supposed to run in such MetricsParamss;\
qed",
);
let is_process_refreshed = self.system.refresh_process(pid);
match (is_process_refreshed, self.system.get_process(pid)) {
(true, Some(process_info)) => {
self.process_cpu_usage_percentage.set(process_info.cpu_usage() as f64);
self.process_memory_usage_bytes.set(process_info.memory() * 1024);
}
_ => {
log::warn!(
target: "bridge",
"Failed to refresh process information. Metrics may show obsolete values",
);
}
}
}
}
@@ -20,6 +20,7 @@ use crate::ethereum_client::{
EthereumConnectionParams, EthereumHighLevelRpc, EthereumRpcClient, EthereumSigningParams,
};
use crate::ethereum_types::Address;
use crate::metrics::MetricsParams;
use crate::rpc::SubstrateRpc;
use crate::rpc_errors::RpcError;
use crate::substrate_client::{SubstrateConnectionParams, SubstrateRpcClient};
@@ -58,6 +59,8 @@ pub struct SubstrateSyncParams {
pub sub: SubstrateConnectionParams,
/// Synchronization parameters.
pub sync_params: HeadersSyncParams,
/// Metrics parameters.
pub metrics_params: Option<MetricsParams>,
}
impl Default for SubstrateSyncParams {
@@ -81,6 +84,7 @@ impl Default for SubstrateSyncParams {
prune_depth: PRUNE_DEPTH,
target_tx_mode: TargetTransactionMode::Signed,
},
metrics_params: Some(Default::default()),
}
}
}
@@ -209,6 +213,7 @@ pub fn run(params: SubstrateSyncParams) -> Result<(), RpcError> {
target,
ETHEREUM_TICK_INTERVAL,
params.sync_params,
params.metrics_params,
futures::future::pending(),
);
+10
View File
@@ -72,6 +72,16 @@ impl<P: HeadersSyncPipeline> HeadersSync<P> {
}
}
/// Return best header number known to source node.
pub fn source_best_number(&self) -> Option<P::Number> {
self.source_best_number
}
/// Best header known to target node.
pub fn target_best_header(&self) -> Option<HeaderId<P::Hash, P::Number>> {
self.target_best_header
}
/// Returns true if we have synced almost all known headers.
pub fn is_almost_synced(&self) -> bool {
match self.source_best_number {
+35
View File
@@ -14,7 +14,9 @@
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
use crate::metrics::{start as metrics_start, GlobalMetrics, MetricsParams, Registry as MetricsRegistry};
use crate::sync::HeadersSyncParams;
use crate::sync_loop_metrics::SyncLoopMetrics;
use crate::sync_types::{
HeaderId, HeaderStatus, HeadersSyncPipeline, MaybeConnectionError, QueuedHeader, SubmittedHeaders,
};
@@ -124,6 +126,7 @@ pub fn run<P: HeadersSyncPipeline, TC: TargetClient<P>>(
target_client: TC,
target_tick: Duration,
sync_params: HeadersSyncParams,
metrics_params: Option<MetricsParams>,
exit_signal: impl Future<Output = ()>,
) {
let mut local_pool = futures::executor::LocalPool::new();
@@ -134,6 +137,19 @@ pub fn run<P: HeadersSyncPipeline, TC: TargetClient<P>>(
let mut stall_countdown = None;
let mut last_update_time = Instant::now();
let mut metrics_global = GlobalMetrics::new();
let mut metrics_sync = SyncLoopMetrics::new();
let metrics_enabled = metrics_params.is_some();
if let Some(metrics_params) = metrics_params {
if let Err(err) = expose_metrics(metrics_params, &metrics_global, &metrics_sync).await {
log::warn!(
target: "bridge",
"Failed to expose metrics: {}",
err,
);
}
}
let mut source_retry_backoff = retry_backoff();
let mut source_client_is_online = false;
let mut source_best_block_number_required = false;
@@ -361,6 +377,12 @@ pub fn run<P: HeadersSyncPipeline, TC: TargetClient<P>>(
}
}
// update metrics
if metrics_enabled {
metrics_global.update();
metrics_sync.update(&mut sync);
}
// print progress
progress_context = print_sync_progress(progress_context, &sync);
@@ -540,6 +562,19 @@ pub fn run<P: HeadersSyncPipeline, TC: TargetClient<P>>(
});
}
/// Expose sync loop metrics.
async fn expose_metrics(
metrics_params: MetricsParams,
metrics_global: &GlobalMetrics,
metrics_sync: &SyncLoopMetrics,
) -> Result<(), String> {
let metrics_registry = MetricsRegistry::new();
metrics_global.register(&metrics_registry)?;
metrics_sync.register(&metrics_registry)?;
async_std::task::spawn(metrics_start(metrics_params, metrics_registry));
Ok(())
}
/// Stream that emits item every `timeout_ms` milliseconds.
fn interval(timeout: Duration) -> impl futures::Stream<Item = ()> {
futures::stream::unfold((), move |_| async move {
@@ -0,0 +1,90 @@
// Copyright 2019-2020 Parity Technologies (UK) Ltd.
// This file is part of Parity Bridges Common.
// Parity Bridges Common is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Bridges Common is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
use crate::metrics::{register, GaugeVec, Opts, Registry, U64};
use crate::sync::HeadersSync;
use crate::sync_types::{HeaderStatus, HeadersSyncPipeline};
use num_traits::Zero;
/// Headers sync metrics.
pub struct SyncLoopMetrics {
/// Best syncing headers at "source" and "target" nodes.
best_block_numbers: GaugeVec<U64>,
/// Number of headers in given states (see `HeaderStatus`).
blocks_in_state: GaugeVec<U64>,
}
impl SyncLoopMetrics {
/// Creates sync loop metrics.
pub fn new() -> Self {
SyncLoopMetrics {
best_block_numbers: GaugeVec::new(
Opts::new("best_block_numbers", "Best block numbers on source and target nodes"),
&["node"],
)
.expect("metric is static and thus valid; qed"),
blocks_in_state: GaugeVec::new(
Opts::new("blocks_in_state", "Number of blocks in given state"),
&["state"],
)
.expect("metric is static and thus valid; qed"),
}
}
/// Registers sync loop metrics in the metrics registry.
pub fn register(&self, registry: &Registry) -> Result<(), String> {
register(self.best_block_numbers.clone(), registry).map_err(|e| e.to_string())?;
register(self.blocks_in_state.clone(), registry).map_err(|e| e.to_string())?;
Ok(())
}
/// Update metrics.
pub fn update<P: HeadersSyncPipeline>(&mut self, sync: &HeadersSync<P>) {
let headers = sync.headers();
let source_best_number = sync.source_best_number().unwrap_or(Zero::zero());
let target_best_number = sync.target_best_header().map(|id| id.0).unwrap_or(Zero::zero());
self.best_block_numbers
.with_label_values(&["source"])
.set(source_best_number.into());
self.best_block_numbers
.with_label_values(&["target"])
.set(target_best_number.into());
self.blocks_in_state
.with_label_values(&["maybe_orphan"])
.set(headers.headers_in_status(HeaderStatus::MaybeOrphan) as _);
self.blocks_in_state
.with_label_values(&["orphan"])
.set(headers.headers_in_status(HeaderStatus::Orphan) as _);
self.blocks_in_state
.with_label_values(&["maybe_extra"])
.set(headers.headers_in_status(HeaderStatus::MaybeExtra) as _);
self.blocks_in_state
.with_label_values(&["extra"])
.set(headers.headers_in_status(HeaderStatus::Extra) as _);
self.blocks_in_state
.with_label_values(&["ready"])
.set(headers.headers_in_status(HeaderStatus::Ready) as _);
self.blocks_in_state
.with_label_values(&["incomplete"])
.set(headers.headers_in_status(HeaderStatus::Incomplete) as _);
self.blocks_in_state
.with_label_values(&["submitted"])
.set(headers.headers_in_status(HeaderStatus::Submitted) as _);
}
}
@@ -481,6 +481,7 @@ fn run_sync_loop_test(params: SyncLoopTestParams) {
target,
test_tick(),
crate::sync::tests::default_sync_params(),
None,
exit_receiver.into_future().map(|(_, _)| ()),
);
}
+2 -1
View File
@@ -70,7 +70,8 @@ pub trait HeadersSyncPipeline: Clone + Copy {
+ std::ops::Sub<Output = Self::Number>
+ num_traits::Saturating
+ num_traits::Zero
+ num_traits::One;
+ num_traits::One
+ Into<u64>;
/// Type of header that we're syncing.
type Header: Clone + std::fmt::Debug + PartialEq + SourceHeader<Self::Hash, Self::Number>;
/// Type of extra data for the header that we're receiving from the source node: