From 0be27839bc90b68f45bd3277ab1a6554ed1f7f77 Mon Sep 17 00:00:00 2001 From: Svyatoslav Nikolsky Date: Tue, 28 Jul 2020 14:08:20 +0300 Subject: [PATCH] Auto exchange transactions relay metrics (#237) * auto exchange tx relay dashboard * cargo fmt --all * single metrics startup fn --- bridges/relays/ethereum/src/cli.yml | 3 + .../relays/ethereum/src/ethereum_exchange.rs | 5 ++ bridges/relays/ethereum/src/exchange.rs | 9 +- bridges/relays/ethereum/src/exchange_loop.rs | 30 ++++++- .../ethereum/src/exchange_loop_metrics.rs | 84 +++++++++++++++++++ bridges/relays/ethereum/src/main.rs | 2 + bridges/relays/ethereum/src/metrics.rs | 62 ++++++++++---- bridges/relays/ethereum/src/sync_loop.rs | 25 +----- .../relays/ethereum/src/sync_loop_metrics.rs | 17 ++-- 9 files changed, 187 insertions(+), 50 deletions(-) create mode 100644 bridges/relays/ethereum/src/exchange_loop_metrics.rs diff --git a/bridges/relays/ethereum/src/cli.yml b/bridges/relays/ethereum/src/cli.yml index 2026da39ef..6e11c70ada 100644 --- a/bridges/relays/ethereum/src/cli.yml +++ b/bridges/relays/ethereum/src/cli.yml @@ -127,3 +127,6 @@ subcommands: - sub-port: *sub-port - sub-signer: *sub-signer - sub-signer-password: *sub-signer-password + - no-prometheus: *no-prometheus + - prometheus-host: *prometheus-host + - prometheus-port: *prometheus-port diff --git a/bridges/relays/ethereum/src/ethereum_exchange.rs b/bridges/relays/ethereum/src/ethereum_exchange.rs index 62e2d3eefb..9661f28f19 100644 --- a/bridges/relays/ethereum/src/ethereum_exchange.rs +++ b/bridges/relays/ethereum/src/ethereum_exchange.rs @@ -26,6 +26,7 @@ use crate::exchange::{ TransactionProofPipeline, }; use crate::exchange_loop::{run as run_loop, InMemoryStorage}; +use crate::metrics::MetricsParams; use crate::rpc::{EthereumRpc, SubstrateRpc}; use crate::rpc_errors::RpcError; use crate::substrate_client::{ @@ -64,6 +65,8 @@ pub struct EthereumExchangeParams { pub sub_sign: SubstrateSigningParams, /// Relay working mode. pub mode: ExchangeRelayMode, + /// Metrics parameters. + pub metrics_params: Option, } /// Ethereum to Substrate exchange pipeline. @@ -257,6 +260,7 @@ impl Default for EthereumExchangeParams { sub: Default::default(), sub_sign: Default::default(), mode: ExchangeRelayMode::Auto(None), + metrics_params: Some(Default::default()), } } } @@ -335,6 +339,7 @@ fn run_auto_transactions_relay_loop(params: EthereumExchangeParams, eth_start_wi client: sub_client, sign_params: params.sub_sign, }, + params.metrics_params, futures::future::pending(), ); diff --git a/bridges/relays/ethereum/src/exchange.rs b/bridges/relays/ethereum/src/exchange.rs index 4de47e5975..58ddcf6b66 100644 --- a/bridges/relays/ethereum/src/exchange.rs +++ b/bridges/relays/ethereum/src/exchange.rs @@ -42,7 +42,14 @@ pub trait SourceBlock { /// Block hash type. type Hash: Clone + Debug + Display; /// Block number type. - type Number: Debug + Display + Clone + Copy + std::cmp::Ord + std::ops::Add + num_traits::One; + type Number: Debug + + Display + + Clone + + Copy + + Into + + std::cmp::Ord + + std::ops::Add + + num_traits::One; /// Block transaction. type Transaction: SourceTransaction; diff --git a/bridges/relays/ethereum/src/exchange_loop.rs b/bridges/relays/ethereum/src/exchange_loop.rs index 678211d7c1..b089e70328 100644 --- a/bridges/relays/ethereum/src/exchange_loop.rs +++ b/bridges/relays/ethereum/src/exchange_loop.rs @@ -20,6 +20,8 @@ use crate::exchange::{ relay_block_transactions, BlockNumberOf, RelayedBlockTransactions, SourceClient, TargetClient, TransactionProofPipeline, }; +use crate::exchange_loop_metrics::ExchangeLoopMetrics; +use crate::metrics::{start as metrics_start, GlobalMetrics, MetricsParams}; use crate::utils::retry_backoff; use backoff::backoff::Backoff; @@ -83,6 +85,7 @@ pub fn run( mut storage: impl TransactionProofsRelayStorage>, source_client: impl SourceClient

, target_client: impl TargetClient

, + metrics_params: Option, exit_signal: impl Future, ) { let mut local_pool = futures::executor::LocalPool::new(); @@ -92,6 +95,11 @@ pub fn run( let mut state = storage.state(); let mut current_finalized_block = None; + let mut metrics_global = GlobalMetrics::new(); + let mut metrics_exch = ExchangeLoopMetrics::new(); + let metrics_enabled = metrics_params.is_some(); + metrics_start(metrics_params, &metrics_global, &metrics_exch); + let exit_signal = exit_signal.fuse(); futures::pin_mut!(exit_signal); @@ -103,9 +111,14 @@ pub fn run( &target_client, &mut state, &mut current_finalized_block, + if metrics_enabled { Some(&mut metrics_exch) } else { None }, ) .await; + if metrics_enabled { + metrics_global.update(); + } + match iteration_result { Ok(_) => { retry_backoff.reset(); @@ -135,6 +148,7 @@ async fn run_loop_iteration( target_client: &impl TargetClient

, state: &mut TransactionProofsRelayState>, current_finalized_block: &mut Option<(P::Block, RelayedBlockTransactions)>, + mut exchange_loop_metrics: Option<&mut ExchangeLoopMetrics>, ) -> Result<(), ()> { let best_finalized_header_id = match target_client.best_finalized_header_id().await { Ok(best_finalized_header_id) => { @@ -181,6 +195,14 @@ async fn run_loop_iteration( state.best_processed_header_number = state.best_processed_header_number + One::one(); storage.set_state(state); + if let Some(exchange_loop_metrics) = exchange_loop_metrics.as_mut() { + exchange_loop_metrics.update::

( + state.best_processed_header_number, + best_finalized_header_id.0, + relayed_transactions, + ); + } + // we have just updated state => proceed to next block retrieval } Err(relayed_transactions) => { @@ -262,6 +284,12 @@ mod tests { } })); - run(storage, source, target, exit_receiver.into_future().map(|(_, _)| ())); + run( + storage, + source, + target, + None, + exit_receiver.into_future().map(|(_, _)| ()), + ); } } diff --git a/bridges/relays/ethereum/src/exchange_loop_metrics.rs b/bridges/relays/ethereum/src/exchange_loop_metrics.rs new file mode 100644 index 0000000000..438360e224 --- /dev/null +++ b/bridges/relays/ethereum/src/exchange_loop_metrics.rs @@ -0,0 +1,84 @@ +// Copyright 2019-2020 Parity Technologies (UK) Ltd. +// This file is part of Parity Bridges Common. + +// Parity Bridges Common is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity Bridges Common is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity Bridges Common. If not, see . + +use crate::exchange::{BlockNumberOf, RelayedBlockTransactions, TransactionProofPipeline}; +use crate::metrics::{register, Counter, CounterVec, GaugeVec, Metrics, Opts, Registry, U64}; + +/// Exchange transactions relay metrics. +pub struct ExchangeLoopMetrics { + /// Best finalized block numbers - "processed" and "known". + best_block_numbers: GaugeVec, + /// Number of processed blocks ("total"). + processed_blocks: Counter, + /// Number of processed transactions ("total", "relayed" and "failed"). + processed_transactions: CounterVec, +} + +impl Metrics for ExchangeLoopMetrics { + fn register(&self, registry: &Registry) -> Result<(), String> { + register(self.best_block_numbers.clone(), registry).map_err(|e| e.to_string())?; + register(self.processed_blocks.clone(), registry).map_err(|e| e.to_string())?; + register(self.processed_transactions.clone(), registry).map_err(|e| e.to_string())?; + Ok(()) + } +} + +impl ExchangeLoopMetrics { + /// Creates sync loop metrics. + pub fn new() -> Self { + ExchangeLoopMetrics { + best_block_numbers: GaugeVec::new( + Opts::new("best_block_numbers", "Best finalized block numbers"), + &["type"], + ) + .expect("metric is static and thus valid; qed"), + processed_blocks: Counter::new("processed_blocks", "Total number of processed blocks") + .expect("metric is static and thus valid; qed"), + processed_transactions: CounterVec::new( + Opts::new("processed_transactions", "Total number of processed transactions"), + &["type"], + ) + .expect("metric is static and thus valid; qed"), + } + } + + /// Update metrics when single block is relayed. + pub fn update( + &mut self, + best_processed_block_number: BlockNumberOf

, + best_known_block_number: BlockNumberOf

, + relayed_transactions: RelayedBlockTransactions, + ) { + self.best_block_numbers + .with_label_values(&["processed"]) + .set(best_processed_block_number.into()); + self.best_block_numbers + .with_label_values(&["known"]) + .set(best_known_block_number.into()); + + self.processed_blocks.inc(); + + self.processed_transactions + .with_label_values(&["total"]) + .inc_by(relayed_transactions.processed as _); + self.processed_transactions + .with_label_values(&["relayed"]) + .inc_by(relayed_transactions.relayed as _); + self.processed_transactions + .with_label_values(&["failed"]) + .inc_by(relayed_transactions.failed as _); + } +} diff --git a/bridges/relays/ethereum/src/main.rs b/bridges/relays/ethereum/src/main.rs index 403ff101f8..caddcfb9ca 100644 --- a/bridges/relays/ethereum/src/main.rs +++ b/bridges/relays/ethereum/src/main.rs @@ -23,6 +23,7 @@ mod ethereum_sync_loop; mod ethereum_types; mod exchange; mod exchange_loop; +mod exchange_loop_metrics; mod headers; mod metrics; mod rpc; @@ -260,6 +261,7 @@ fn ethereum_exchange_params(matches: &clap::ArgMatches) -> Result ethereum_exchange::ExchangeRelayMode::Single( diff --git a/bridges/relays/ethereum/src/metrics.rs b/bridges/relays/ethereum/src/metrics.rs index 95e84fc647..084b5ae10f 100644 --- a/bridges/relays/ethereum/src/metrics.rs +++ b/bridges/relays/ethereum/src/metrics.rs @@ -14,7 +14,7 @@ // You should have received a copy of the GNU General Public License // along with Parity Bridges Common. If not, see . -pub use substrate_prometheus_endpoint::{register, Gauge, GaugeVec, Opts, Registry, F64, U64}; +pub use substrate_prometheus_endpoint::{register, Counter, CounterVec, Gauge, GaugeVec, Opts, Registry, F64, U64}; use std::net::SocketAddr; use substrate_prometheus_endpoint::init_prometheus; @@ -29,6 +29,12 @@ pub struct MetricsParams { pub port: u16, } +/// Metrics API. +pub trait Metrics { + /// Register metrics in the registry. + fn register(&self, registry: &Registry) -> Result<(), String>; +} + /// Global Prometheus metrics. #[derive(Debug)] pub struct GlobalMetrics { @@ -39,19 +45,40 @@ pub struct GlobalMetrics { } /// Start Prometheus endpoint with given metrics registry. -pub async fn start(params: MetricsParams, registry: Registry) -> Result<(), String> { - init_prometheus( - SocketAddr::new( +pub fn start(params: Option, global_metrics: &GlobalMetrics, extra_metrics: &impl Metrics) { + let params = match params { + Some(params) => params, + None => return, + }; + + let do_start = move || { + let prometheus_socket_addr = SocketAddr::new( params .host .parse() .map_err(|err| format!("Invalid Prometheus host {}: {}", params.host, err))?, params.port, - ), - registry, - ) - .await - .map_err(|err| format!("Error starting Prometheus endpoint: {}", err)) + ); + let metrics_registry = Registry::new(); + global_metrics.register(&metrics_registry)?; + extra_metrics.register(&metrics_registry)?; + async_std::task::spawn(async move { + init_prometheus(prometheus_socket_addr, metrics_registry) + .await + .map_err(|err| format!("Error starting Prometheus endpoint: {}", err)) + }); + + Ok(()) + }; + + let result: Result<(), String> = do_start(); + if let Err(err) = result { + log::warn!( + target: "bridge", + "Failed to expose metrics: {}", + err, + ); + } } impl Default for MetricsParams { @@ -63,6 +90,15 @@ impl Default for MetricsParams { } } +impl Metrics for GlobalMetrics { + fn register(&self, registry: &Registry) -> Result<(), String> { + register(self.system_average_load.clone(), registry).map_err(|e| e.to_string())?; + register(self.process_cpu_usage_percentage.clone(), registry).map_err(|e| e.to_string())?; + register(self.process_memory_usage_bytes.clone(), registry).map_err(|e| e.to_string())?; + Ok(()) + } +} + impl GlobalMetrics { /// Creates global metrics. pub fn new() -> Self { @@ -80,14 +116,6 @@ impl GlobalMetrics { } } - /// Registers global metrics in the metrics registry. - pub fn register(&self, registry: &Registry) -> Result<(), String> { - register(self.system_average_load.clone(), registry).map_err(|e| e.to_string())?; - register(self.process_cpu_usage_percentage.clone(), registry).map_err(|e| e.to_string())?; - register(self.process_memory_usage_bytes.clone(), registry).map_err(|e| e.to_string())?; - Ok(()) - } - /// Update metrics. pub fn update(&mut self) { // update system-wide metrics diff --git a/bridges/relays/ethereum/src/sync_loop.rs b/bridges/relays/ethereum/src/sync_loop.rs index b2c5d2a5ee..95a6171544 100644 --- a/bridges/relays/ethereum/src/sync_loop.rs +++ b/bridges/relays/ethereum/src/sync_loop.rs @@ -14,7 +14,7 @@ // You should have received a copy of the GNU General Public License // along with Parity Bridges Common. If not, see . -use crate::metrics::{start as metrics_start, GlobalMetrics, MetricsParams, Registry as MetricsRegistry}; +use crate::metrics::{start as metrics_start, GlobalMetrics, MetricsParams}; use crate::sync::HeadersSyncParams; use crate::sync_loop_metrics::SyncLoopMetrics; use crate::sync_types::{ @@ -124,15 +124,7 @@ pub fn run>( let mut metrics_global = GlobalMetrics::new(); let mut metrics_sync = SyncLoopMetrics::new(); let metrics_enabled = metrics_params.is_some(); - if let Some(metrics_params) = metrics_params { - if let Err(err) = expose_metrics(metrics_params, &metrics_global, &metrics_sync).await { - log::warn!( - target: "bridge", - "Failed to expose metrics: {}", - err, - ); - } - } + metrics_start(metrics_params, &metrics_global, &metrics_sync); let mut source_retry_backoff = retry_backoff(); let mut source_client_is_online = false; @@ -546,19 +538,6 @@ pub fn run>( }); } -/// Expose sync loop metrics. -async fn expose_metrics( - metrics_params: MetricsParams, - metrics_global: &GlobalMetrics, - metrics_sync: &SyncLoopMetrics, -) -> Result<(), String> { - let metrics_registry = MetricsRegistry::new(); - metrics_global.register(&metrics_registry)?; - metrics_sync.register(&metrics_registry)?; - async_std::task::spawn(metrics_start(metrics_params, metrics_registry)); - Ok(()) -} - /// Stream that emits item every `timeout_ms` milliseconds. fn interval(timeout: Duration) -> impl futures::Stream { futures::stream::unfold((), move |_| async move { diff --git a/bridges/relays/ethereum/src/sync_loop_metrics.rs b/bridges/relays/ethereum/src/sync_loop_metrics.rs index fa34c8b511..dc512a46e4 100644 --- a/bridges/relays/ethereum/src/sync_loop_metrics.rs +++ b/bridges/relays/ethereum/src/sync_loop_metrics.rs @@ -14,7 +14,7 @@ // You should have received a copy of the GNU General Public License // along with Parity Bridges Common. If not, see . -use crate::metrics::{register, GaugeVec, Opts, Registry, U64}; +use crate::metrics::{register, GaugeVec, Metrics, Opts, Registry, U64}; use crate::sync::HeadersSync; use crate::sync_types::{HeaderStatus, HeadersSyncPipeline}; @@ -28,6 +28,14 @@ pub struct SyncLoopMetrics { blocks_in_state: GaugeVec, } +impl Metrics for SyncLoopMetrics { + fn register(&self, registry: &Registry) -> Result<(), String> { + register(self.best_block_numbers.clone(), registry).map_err(|e| e.to_string())?; + register(self.blocks_in_state.clone(), registry).map_err(|e| e.to_string())?; + Ok(()) + } +} + impl SyncLoopMetrics { /// Creates sync loop metrics. pub fn new() -> Self { @@ -45,13 +53,6 @@ impl SyncLoopMetrics { } } - /// Registers sync loop metrics in the metrics registry. - pub fn register(&self, registry: &Registry) -> Result<(), String> { - register(self.best_block_numbers.clone(), registry).map_err(|e| e.to_string())?; - register(self.blocks_in_state.clone(), registry).map_err(|e| e.to_string())?; - Ok(()) - } - /// Update metrics. pub fn update(&mut self, sync: &HeadersSync

) { let headers = sync.headers();