mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-05-30 21:01:02 +00:00
Auto exchange transactions relay metrics (#237)
* auto exchange tx relay dashboard * cargo fmt --all * single metrics startup fn
This commit is contained in:
committed by
Bastian Köcher
parent
156ec9862f
commit
0be27839bc
@@ -127,3 +127,6 @@ subcommands:
|
||||
- sub-port: *sub-port
|
||||
- sub-signer: *sub-signer
|
||||
- sub-signer-password: *sub-signer-password
|
||||
- no-prometheus: *no-prometheus
|
||||
- prometheus-host: *prometheus-host
|
||||
- prometheus-port: *prometheus-port
|
||||
|
||||
@@ -26,6 +26,7 @@ use crate::exchange::{
|
||||
TransactionProofPipeline,
|
||||
};
|
||||
use crate::exchange_loop::{run as run_loop, InMemoryStorage};
|
||||
use crate::metrics::MetricsParams;
|
||||
use crate::rpc::{EthereumRpc, SubstrateRpc};
|
||||
use crate::rpc_errors::RpcError;
|
||||
use crate::substrate_client::{
|
||||
@@ -64,6 +65,8 @@ pub struct EthereumExchangeParams {
|
||||
pub sub_sign: SubstrateSigningParams,
|
||||
/// Relay working mode.
|
||||
pub mode: ExchangeRelayMode,
|
||||
/// Metrics parameters.
|
||||
pub metrics_params: Option<MetricsParams>,
|
||||
}
|
||||
|
||||
/// Ethereum to Substrate exchange pipeline.
|
||||
@@ -257,6 +260,7 @@ impl Default for EthereumExchangeParams {
|
||||
sub: Default::default(),
|
||||
sub_sign: Default::default(),
|
||||
mode: ExchangeRelayMode::Auto(None),
|
||||
metrics_params: Some(Default::default()),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -335,6 +339,7 @@ fn run_auto_transactions_relay_loop(params: EthereumExchangeParams, eth_start_wi
|
||||
client: sub_client,
|
||||
sign_params: params.sub_sign,
|
||||
},
|
||||
params.metrics_params,
|
||||
futures::future::pending(),
|
||||
);
|
||||
|
||||
|
||||
@@ -42,7 +42,14 @@ pub trait SourceBlock {
|
||||
/// Block hash type.
|
||||
type Hash: Clone + Debug + Display;
|
||||
/// Block number type.
|
||||
type Number: Debug + Display + Clone + Copy + std::cmp::Ord + std::ops::Add<Output = Self::Number> + num_traits::One;
|
||||
type Number: Debug
|
||||
+ Display
|
||||
+ Clone
|
||||
+ Copy
|
||||
+ Into<u64>
|
||||
+ std::cmp::Ord
|
||||
+ std::ops::Add<Output = Self::Number>
|
||||
+ num_traits::One;
|
||||
/// Block transaction.
|
||||
type Transaction: SourceTransaction;
|
||||
|
||||
|
||||
@@ -20,6 +20,8 @@ use crate::exchange::{
|
||||
relay_block_transactions, BlockNumberOf, RelayedBlockTransactions, SourceClient, TargetClient,
|
||||
TransactionProofPipeline,
|
||||
};
|
||||
use crate::exchange_loop_metrics::ExchangeLoopMetrics;
|
||||
use crate::metrics::{start as metrics_start, GlobalMetrics, MetricsParams};
|
||||
use crate::utils::retry_backoff;
|
||||
|
||||
use backoff::backoff::Backoff;
|
||||
@@ -83,6 +85,7 @@ pub fn run<P: TransactionProofPipeline>(
|
||||
mut storage: impl TransactionProofsRelayStorage<BlockNumber = BlockNumberOf<P>>,
|
||||
source_client: impl SourceClient<P>,
|
||||
target_client: impl TargetClient<P>,
|
||||
metrics_params: Option<MetricsParams>,
|
||||
exit_signal: impl Future<Output = ()>,
|
||||
) {
|
||||
let mut local_pool = futures::executor::LocalPool::new();
|
||||
@@ -92,6 +95,11 @@ pub fn run<P: TransactionProofPipeline>(
|
||||
let mut state = storage.state();
|
||||
let mut current_finalized_block = None;
|
||||
|
||||
let mut metrics_global = GlobalMetrics::new();
|
||||
let mut metrics_exch = ExchangeLoopMetrics::new();
|
||||
let metrics_enabled = metrics_params.is_some();
|
||||
metrics_start(metrics_params, &metrics_global, &metrics_exch);
|
||||
|
||||
let exit_signal = exit_signal.fuse();
|
||||
|
||||
futures::pin_mut!(exit_signal);
|
||||
@@ -103,9 +111,14 @@ pub fn run<P: TransactionProofPipeline>(
|
||||
&target_client,
|
||||
&mut state,
|
||||
&mut current_finalized_block,
|
||||
if metrics_enabled { Some(&mut metrics_exch) } else { None },
|
||||
)
|
||||
.await;
|
||||
|
||||
if metrics_enabled {
|
||||
metrics_global.update();
|
||||
}
|
||||
|
||||
match iteration_result {
|
||||
Ok(_) => {
|
||||
retry_backoff.reset();
|
||||
@@ -135,6 +148,7 @@ async fn run_loop_iteration<P: TransactionProofPipeline>(
|
||||
target_client: &impl TargetClient<P>,
|
||||
state: &mut TransactionProofsRelayState<BlockNumberOf<P>>,
|
||||
current_finalized_block: &mut Option<(P::Block, RelayedBlockTransactions)>,
|
||||
mut exchange_loop_metrics: Option<&mut ExchangeLoopMetrics>,
|
||||
) -> Result<(), ()> {
|
||||
let best_finalized_header_id = match target_client.best_finalized_header_id().await {
|
||||
Ok(best_finalized_header_id) => {
|
||||
@@ -181,6 +195,14 @@ async fn run_loop_iteration<P: TransactionProofPipeline>(
|
||||
state.best_processed_header_number = state.best_processed_header_number + One::one();
|
||||
storage.set_state(state);
|
||||
|
||||
if let Some(exchange_loop_metrics) = exchange_loop_metrics.as_mut() {
|
||||
exchange_loop_metrics.update::<P>(
|
||||
state.best_processed_header_number,
|
||||
best_finalized_header_id.0,
|
||||
relayed_transactions,
|
||||
);
|
||||
}
|
||||
|
||||
// we have just updated state => proceed to next block retrieval
|
||||
}
|
||||
Err(relayed_transactions) => {
|
||||
@@ -262,6 +284,12 @@ mod tests {
|
||||
}
|
||||
}));
|
||||
|
||||
run(storage, source, target, exit_receiver.into_future().map(|(_, _)| ()));
|
||||
run(
|
||||
storage,
|
||||
source,
|
||||
target,
|
||||
None,
|
||||
exit_receiver.into_future().map(|(_, _)| ()),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,84 @@
|
||||
// Copyright 2019-2020 Parity Technologies (UK) Ltd.
|
||||
// This file is part of Parity Bridges Common.
|
||||
|
||||
// Parity Bridges Common is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// Parity Bridges Common is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
use crate::exchange::{BlockNumberOf, RelayedBlockTransactions, TransactionProofPipeline};
|
||||
use crate::metrics::{register, Counter, CounterVec, GaugeVec, Metrics, Opts, Registry, U64};
|
||||
|
||||
/// Exchange transactions relay metrics.
|
||||
pub struct ExchangeLoopMetrics {
|
||||
/// Best finalized block numbers - "processed" and "known".
|
||||
best_block_numbers: GaugeVec<U64>,
|
||||
/// Number of processed blocks ("total").
|
||||
processed_blocks: Counter<U64>,
|
||||
/// Number of processed transactions ("total", "relayed" and "failed").
|
||||
processed_transactions: CounterVec<U64>,
|
||||
}
|
||||
|
||||
impl Metrics for ExchangeLoopMetrics {
|
||||
fn register(&self, registry: &Registry) -> Result<(), String> {
|
||||
register(self.best_block_numbers.clone(), registry).map_err(|e| e.to_string())?;
|
||||
register(self.processed_blocks.clone(), registry).map_err(|e| e.to_string())?;
|
||||
register(self.processed_transactions.clone(), registry).map_err(|e| e.to_string())?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl ExchangeLoopMetrics {
|
||||
/// Creates sync loop metrics.
|
||||
pub fn new() -> Self {
|
||||
ExchangeLoopMetrics {
|
||||
best_block_numbers: GaugeVec::new(
|
||||
Opts::new("best_block_numbers", "Best finalized block numbers"),
|
||||
&["type"],
|
||||
)
|
||||
.expect("metric is static and thus valid; qed"),
|
||||
processed_blocks: Counter::new("processed_blocks", "Total number of processed blocks")
|
||||
.expect("metric is static and thus valid; qed"),
|
||||
processed_transactions: CounterVec::new(
|
||||
Opts::new("processed_transactions", "Total number of processed transactions"),
|
||||
&["type"],
|
||||
)
|
||||
.expect("metric is static and thus valid; qed"),
|
||||
}
|
||||
}
|
||||
|
||||
/// Update metrics when single block is relayed.
|
||||
pub fn update<P: TransactionProofPipeline>(
|
||||
&mut self,
|
||||
best_processed_block_number: BlockNumberOf<P>,
|
||||
best_known_block_number: BlockNumberOf<P>,
|
||||
relayed_transactions: RelayedBlockTransactions,
|
||||
) {
|
||||
self.best_block_numbers
|
||||
.with_label_values(&["processed"])
|
||||
.set(best_processed_block_number.into());
|
||||
self.best_block_numbers
|
||||
.with_label_values(&["known"])
|
||||
.set(best_known_block_number.into());
|
||||
|
||||
self.processed_blocks.inc();
|
||||
|
||||
self.processed_transactions
|
||||
.with_label_values(&["total"])
|
||||
.inc_by(relayed_transactions.processed as _);
|
||||
self.processed_transactions
|
||||
.with_label_values(&["relayed"])
|
||||
.inc_by(relayed_transactions.relayed as _);
|
||||
self.processed_transactions
|
||||
.with_label_values(&["failed"])
|
||||
.inc_by(relayed_transactions.failed as _);
|
||||
}
|
||||
}
|
||||
@@ -23,6 +23,7 @@ mod ethereum_sync_loop;
|
||||
mod ethereum_types;
|
||||
mod exchange;
|
||||
mod exchange_loop;
|
||||
mod exchange_loop_metrics;
|
||||
mod headers;
|
||||
mod metrics;
|
||||
mod rpc;
|
||||
@@ -260,6 +261,7 @@ fn ethereum_exchange_params(matches: &clap::ArgMatches) -> Result<ethereum_excha
|
||||
params.eth = ethereum_connection_params(matches)?;
|
||||
params.sub = substrate_connection_params(matches)?;
|
||||
params.sub_sign = substrate_signing_params(matches)?;
|
||||
params.metrics_params = metrics_params(matches)?;
|
||||
|
||||
params.mode = match matches.value_of("eth-tx-hash") {
|
||||
Some(eth_tx_hash) => ethereum_exchange::ExchangeRelayMode::Single(
|
||||
|
||||
@@ -14,7 +14,7 @@
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
pub use substrate_prometheus_endpoint::{register, Gauge, GaugeVec, Opts, Registry, F64, U64};
|
||||
pub use substrate_prometheus_endpoint::{register, Counter, CounterVec, Gauge, GaugeVec, Opts, Registry, F64, U64};
|
||||
|
||||
use std::net::SocketAddr;
|
||||
use substrate_prometheus_endpoint::init_prometheus;
|
||||
@@ -29,6 +29,12 @@ pub struct MetricsParams {
|
||||
pub port: u16,
|
||||
}
|
||||
|
||||
/// Metrics API.
|
||||
pub trait Metrics {
|
||||
/// Register metrics in the registry.
|
||||
fn register(&self, registry: &Registry) -> Result<(), String>;
|
||||
}
|
||||
|
||||
/// Global Prometheus metrics.
|
||||
#[derive(Debug)]
|
||||
pub struct GlobalMetrics {
|
||||
@@ -39,19 +45,40 @@ pub struct GlobalMetrics {
|
||||
}
|
||||
|
||||
/// Start Prometheus endpoint with given metrics registry.
|
||||
pub async fn start(params: MetricsParams, registry: Registry) -> Result<(), String> {
|
||||
init_prometheus(
|
||||
SocketAddr::new(
|
||||
pub fn start(params: Option<MetricsParams>, global_metrics: &GlobalMetrics, extra_metrics: &impl Metrics) {
|
||||
let params = match params {
|
||||
Some(params) => params,
|
||||
None => return,
|
||||
};
|
||||
|
||||
let do_start = move || {
|
||||
let prometheus_socket_addr = SocketAddr::new(
|
||||
params
|
||||
.host
|
||||
.parse()
|
||||
.map_err(|err| format!("Invalid Prometheus host {}: {}", params.host, err))?,
|
||||
params.port,
|
||||
),
|
||||
registry,
|
||||
)
|
||||
.await
|
||||
.map_err(|err| format!("Error starting Prometheus endpoint: {}", err))
|
||||
);
|
||||
let metrics_registry = Registry::new();
|
||||
global_metrics.register(&metrics_registry)?;
|
||||
extra_metrics.register(&metrics_registry)?;
|
||||
async_std::task::spawn(async move {
|
||||
init_prometheus(prometheus_socket_addr, metrics_registry)
|
||||
.await
|
||||
.map_err(|err| format!("Error starting Prometheus endpoint: {}", err))
|
||||
});
|
||||
|
||||
Ok(())
|
||||
};
|
||||
|
||||
let result: Result<(), String> = do_start();
|
||||
if let Err(err) = result {
|
||||
log::warn!(
|
||||
target: "bridge",
|
||||
"Failed to expose metrics: {}",
|
||||
err,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for MetricsParams {
|
||||
@@ -63,6 +90,15 @@ impl Default for MetricsParams {
|
||||
}
|
||||
}
|
||||
|
||||
impl Metrics for GlobalMetrics {
|
||||
fn register(&self, registry: &Registry) -> Result<(), String> {
|
||||
register(self.system_average_load.clone(), registry).map_err(|e| e.to_string())?;
|
||||
register(self.process_cpu_usage_percentage.clone(), registry).map_err(|e| e.to_string())?;
|
||||
register(self.process_memory_usage_bytes.clone(), registry).map_err(|e| e.to_string())?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl GlobalMetrics {
|
||||
/// Creates global metrics.
|
||||
pub fn new() -> Self {
|
||||
@@ -80,14 +116,6 @@ impl GlobalMetrics {
|
||||
}
|
||||
}
|
||||
|
||||
/// Registers global metrics in the metrics registry.
|
||||
pub fn register(&self, registry: &Registry) -> Result<(), String> {
|
||||
register(self.system_average_load.clone(), registry).map_err(|e| e.to_string())?;
|
||||
register(self.process_cpu_usage_percentage.clone(), registry).map_err(|e| e.to_string())?;
|
||||
register(self.process_memory_usage_bytes.clone(), registry).map_err(|e| e.to_string())?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Update metrics.
|
||||
pub fn update(&mut self) {
|
||||
// update system-wide metrics
|
||||
|
||||
@@ -14,7 +14,7 @@
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
use crate::metrics::{start as metrics_start, GlobalMetrics, MetricsParams, Registry as MetricsRegistry};
|
||||
use crate::metrics::{start as metrics_start, GlobalMetrics, MetricsParams};
|
||||
use crate::sync::HeadersSyncParams;
|
||||
use crate::sync_loop_metrics::SyncLoopMetrics;
|
||||
use crate::sync_types::{
|
||||
@@ -124,15 +124,7 @@ pub fn run<P: HeadersSyncPipeline, TC: TargetClient<P>>(
|
||||
let mut metrics_global = GlobalMetrics::new();
|
||||
let mut metrics_sync = SyncLoopMetrics::new();
|
||||
let metrics_enabled = metrics_params.is_some();
|
||||
if let Some(metrics_params) = metrics_params {
|
||||
if let Err(err) = expose_metrics(metrics_params, &metrics_global, &metrics_sync).await {
|
||||
log::warn!(
|
||||
target: "bridge",
|
||||
"Failed to expose metrics: {}",
|
||||
err,
|
||||
);
|
||||
}
|
||||
}
|
||||
metrics_start(metrics_params, &metrics_global, &metrics_sync);
|
||||
|
||||
let mut source_retry_backoff = retry_backoff();
|
||||
let mut source_client_is_online = false;
|
||||
@@ -546,19 +538,6 @@ pub fn run<P: HeadersSyncPipeline, TC: TargetClient<P>>(
|
||||
});
|
||||
}
|
||||
|
||||
/// Expose sync loop metrics.
|
||||
async fn expose_metrics(
|
||||
metrics_params: MetricsParams,
|
||||
metrics_global: &GlobalMetrics,
|
||||
metrics_sync: &SyncLoopMetrics,
|
||||
) -> Result<(), String> {
|
||||
let metrics_registry = MetricsRegistry::new();
|
||||
metrics_global.register(&metrics_registry)?;
|
||||
metrics_sync.register(&metrics_registry)?;
|
||||
async_std::task::spawn(metrics_start(metrics_params, metrics_registry));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Stream that emits item every `timeout_ms` milliseconds.
|
||||
fn interval(timeout: Duration) -> impl futures::Stream<Item = ()> {
|
||||
futures::stream::unfold((), move |_| async move {
|
||||
|
||||
@@ -14,7 +14,7 @@
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
use crate::metrics::{register, GaugeVec, Opts, Registry, U64};
|
||||
use crate::metrics::{register, GaugeVec, Metrics, Opts, Registry, U64};
|
||||
use crate::sync::HeadersSync;
|
||||
use crate::sync_types::{HeaderStatus, HeadersSyncPipeline};
|
||||
|
||||
@@ -28,6 +28,14 @@ pub struct SyncLoopMetrics {
|
||||
blocks_in_state: GaugeVec<U64>,
|
||||
}
|
||||
|
||||
impl Metrics for SyncLoopMetrics {
|
||||
fn register(&self, registry: &Registry) -> Result<(), String> {
|
||||
register(self.best_block_numbers.clone(), registry).map_err(|e| e.to_string())?;
|
||||
register(self.blocks_in_state.clone(), registry).map_err(|e| e.to_string())?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl SyncLoopMetrics {
|
||||
/// Creates sync loop metrics.
|
||||
pub fn new() -> Self {
|
||||
@@ -45,13 +53,6 @@ impl SyncLoopMetrics {
|
||||
}
|
||||
}
|
||||
|
||||
/// Registers sync loop metrics in the metrics registry.
|
||||
pub fn register(&self, registry: &Registry) -> Result<(), String> {
|
||||
register(self.best_block_numbers.clone(), registry).map_err(|e| e.to_string())?;
|
||||
register(self.blocks_in_state.clone(), registry).map_err(|e| e.to_string())?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Update metrics.
|
||||
pub fn update<P: HeadersSyncPipeline>(&mut self, sync: &HeadersSync<P>) {
|
||||
let headers = sync.headers();
|
||||
|
||||
Reference in New Issue
Block a user