diff --git a/bridges/bin/millau/runtime/src/rialto_messages.rs b/bridges/bin/millau/runtime/src/rialto_messages.rs index c96b6f610d..9162596e7c 100644 --- a/bridges/bin/millau/runtime/src/rialto_messages.rs +++ b/bridges/bin/millau/runtime/src/rialto_messages.rs @@ -35,9 +35,12 @@ use sp_core::storage::StorageKey; use sp_runtime::{FixedPointNumber, FixedU128}; use sp_std::{convert::TryFrom, ops::RangeInclusive}; +/// Initial value of `RialtoToMillauConversionRate` parameter. +pub const INITIAL_RIALTO_TO_MILLAU_CONVERSION_RATE: FixedU128 = FixedU128::from_inner(FixedU128::DIV); + parameter_types! { /// Rialto to Millau conversion rate. Initially we treat both tokens as equal. - storage RialtoToMillauConversionRate: FixedU128 = FixedU128::one(); + pub storage RialtoToMillauConversionRate: FixedU128 = INITIAL_RIALTO_TO_MILLAU_CONVERSION_RATE; } /// Storage key of the Millau -> Rialto message in the runtime storage. diff --git a/bridges/bin/rialto/runtime/src/millau_messages.rs b/bridges/bin/rialto/runtime/src/millau_messages.rs index 90a79891e6..62f4ec7147 100644 --- a/bridges/bin/rialto/runtime/src/millau_messages.rs +++ b/bridges/bin/rialto/runtime/src/millau_messages.rs @@ -35,9 +35,12 @@ use sp_core::storage::StorageKey; use sp_runtime::{FixedPointNumber, FixedU128}; use sp_std::{convert::TryFrom, ops::RangeInclusive}; +/// Initial value of `MillauToRialtoConversionRate` parameter. +pub const INITIAL_MILLAU_TO_RIALTO_CONVERSION_RATE: FixedU128 = FixedU128::from_inner(FixedU128::DIV); + parameter_types! { /// Millau to Rialto conversion rate. Initially we treat both tokens as equal. - storage MillauToRialtoConversionRate: FixedU128 = 1.into(); + pub storage MillauToRialtoConversionRate: FixedU128 = INITIAL_MILLAU_TO_RIALTO_CONVERSION_RATE; } /// Storage key of the Rialto -> Millau message in the runtime storage. diff --git a/bridges/primitives/runtime/src/lib.rs b/bridges/primitives/runtime/src/lib.rs index 5d5469017d..e7f990d283 100644 --- a/bridges/primitives/runtime/src/lib.rs +++ b/bridges/primitives/runtime/src/lib.rs @@ -24,7 +24,7 @@ use sp_io::hashing::blake2_256; use sp_std::convert::TryFrom; pub use chain::{BlockNumberOf, Chain, HashOf, HasherOf, HeaderOf}; -pub use storage_proof::StorageProofChecker; +pub use storage_proof::{Error as StorageProofError, StorageProofChecker}; #[cfg(feature = "std")] pub use storage_proof::craft_valid_storage_proof; diff --git a/bridges/relays/bin-ethereum/src/ethereum_exchange.rs b/bridges/relays/bin-ethereum/src/ethereum_exchange.rs index 813eab0789..18470512b5 100644 --- a/bridges/relays/bin-ethereum/src/ethereum_exchange.rs +++ b/bridges/relays/bin-ethereum/src/ethereum_exchange.rs @@ -66,7 +66,7 @@ pub struct EthereumExchangeParams { /// Relay working mode. pub mode: ExchangeRelayMode, /// Metrics parameters. - pub metrics_params: Option, + pub metrics_params: MetricsParams, /// Instance of the bridge pallet being synchronized. pub instance: Arc, } diff --git a/bridges/relays/bin-ethereum/src/ethereum_sync_loop.rs b/bridges/relays/bin-ethereum/src/ethereum_sync_loop.rs index 0fbe378259..3dcd27e18f 100644 --- a/bridges/relays/bin-ethereum/src/ethereum_sync_loop.rs +++ b/bridges/relays/bin-ethereum/src/ethereum_sync_loop.rs @@ -72,7 +72,7 @@ pub struct EthereumSyncParams { /// Synchronization parameters. pub sync_params: HeadersSyncParams, /// Metrics parameters. - pub metrics_params: Option, + pub metrics_params: MetricsParams, /// Instance of the bridge pallet being synchronized. pub instance: Arc, } diff --git a/bridges/relays/bin-ethereum/src/main.rs b/bridges/relays/bin-ethereum/src/main.rs index d1171d1461..234e1237fc 100644 --- a/bridges/relays/bin-ethereum/src/main.rs +++ b/bridges/relays/bin-ethereum/src/main.rs @@ -34,7 +34,10 @@ use ethereum_sync_loop::EthereumSyncParams; use headers_relay::sync::TargetTransactionMode; use hex_literal::hex; use instances::{BridgeInstance, Kovan, RialtoPoA}; -use relay_utils::{initialize::initialize_relay, metrics::MetricsParams}; +use relay_utils::{ + initialize::initialize_relay, + metrics::{MetricsAddress, MetricsParams}, +}; use secp256k1::SecretKey; use sp_core::crypto::Pair; use substrate_sync_loop::SubstrateSyncParams; @@ -367,12 +370,12 @@ fn ethereum_exchange_params(matches: &clap::ArgMatches) -> Result Result, String> { +fn metrics_params(matches: &clap::ArgMatches) -> Result { if matches.is_present("no-prometheus") { - return Ok(None); + return Ok(None.into()); } - let mut metrics_params = MetricsParams::default(); + let mut metrics_params = MetricsAddress::default(); if let Some(prometheus_host) = matches.value_of("prometheus-host") { metrics_params.host = prometheus_host.into(); @@ -383,7 +386,7 @@ fn metrics_params(matches: &clap::ArgMatches) -> Result, S .map_err(|e| format!("Failed to parse prometheus-port: {}", e))?; } - Ok(Some(metrics_params)) + Ok(Some(metrics_params).into()) } fn instance_params(matches: &clap::ArgMatches) -> Result, String> { diff --git a/bridges/relays/bin-ethereum/src/substrate_sync_loop.rs b/bridges/relays/bin-ethereum/src/substrate_sync_loop.rs index 0765cca031..c0f44da3df 100644 --- a/bridges/relays/bin-ethereum/src/substrate_sync_loop.rs +++ b/bridges/relays/bin-ethereum/src/substrate_sync_loop.rs @@ -68,7 +68,7 @@ pub struct SubstrateSyncParams { /// Synchronization parameters. pub sync_params: HeadersSyncParams, /// Metrics parameters. - pub metrics_params: Option, + pub metrics_params: MetricsParams, } /// Substrate synchronization pipeline. diff --git a/bridges/relays/bin-substrate/src/cli/mod.rs b/bridges/relays/bin-substrate/src/cli/mod.rs index 216b778c21..ebceb3a25e 100644 --- a/bridges/relays/bin-substrate/src/cli/mod.rs +++ b/bridges/relays/bin-substrate/src/cli/mod.rs @@ -330,15 +330,16 @@ pub struct PrometheusParams { pub prometheus_port: u16, } -impl From for Option { - fn from(cli_params: PrometheusParams) -> Option { +impl From for relay_utils::metrics::MetricsParams { + fn from(cli_params: PrometheusParams) -> relay_utils::metrics::MetricsParams { if !cli_params.no_prometheus { - Some(relay_utils::metrics::MetricsParams { + Some(relay_utils::metrics::MetricsAddress { host: cli_params.prometheus_host, port: cli_params.prometheus_port, }) + .into() } else { - None + None.into() } } } diff --git a/bridges/relays/bin-substrate/src/cli/relay_headers.rs b/bridges/relays/bin-substrate/src/cli/relay_headers.rs index 6c2b2f1d66..44b9b70d2b 100644 --- a/bridges/relays/bin-substrate/src/cli/relay_headers.rs +++ b/bridges/relays/bin-substrate/src/cli/relay_headers.rs @@ -15,6 +15,7 @@ // along with Parity Bridges Common. If not, see . use crate::cli::{PrometheusParams, SourceConnectionParams, TargetConnectionParams, TargetSigningParams}; +use crate::finality_pipeline::SubstrateFinalitySyncPipeline; use structopt::{clap::arg_enum, StructOpt}; /// Start headers relayer process. @@ -77,12 +78,13 @@ impl RelayHeaders { let source_client = self.source.into_client::().await?; let target_client = self.target.into_client::().await?; let target_sign = self.target_sign.into_keypair::()?; + let metrics_params = Finality::customize_metrics(self.prometheus_params.into())?; crate::finality_pipeline::run( Finality::new(target_client.clone(), target_sign), source_client, target_client, - self.prometheus_params.into(), + metrics_params, ) .await }) diff --git a/bridges/relays/bin-substrate/src/finality_pipeline.rs b/bridges/relays/bin-substrate/src/finality_pipeline.rs index 0b035cd57f..bc8461f6a8 100644 --- a/bridges/relays/bin-substrate/src/finality_pipeline.rs +++ b/bridges/relays/bin-substrate/src/finality_pipeline.rs @@ -21,7 +21,7 @@ use crate::finality_target::SubstrateFinalityTarget; use bp_header_chain::justification::GrandpaJustification; use finality_relay::{FinalitySyncParams, FinalitySyncPipeline}; use relay_substrate_client::{finality_source::FinalitySource, BlockNumberOf, Chain, Client, HashOf, SyncHeader}; -use relay_utils::BlockNumberBase; +use relay_utils::{metrics::MetricsParams, BlockNumberBase}; use sp_core::Bytes; use std::{fmt::Debug, marker::PhantomData, time::Duration}; @@ -41,6 +41,11 @@ pub trait SubstrateFinalitySyncPipeline: FinalitySyncPipeline { /// Chain with GRANDPA bridge pallet. type TargetChain: Chain; + /// Customize metrics exposed by headers sync loop. + fn customize_metrics(params: MetricsParams) -> anyhow::Result { + Ok(params) + } + /// Returns id of account that we're using to sign transactions at target chain. fn transactions_author(&self) -> ::AccountId; @@ -107,7 +112,7 @@ pub async fn run( pipeline: P, source_client: Client, target_client: Client, - metrics_params: Option, + metrics_params: MetricsParams, ) -> anyhow::Result<()> where P: SubstrateFinalitySyncPipeline< diff --git a/bridges/relays/bin-substrate/src/rialto_millau/millau_messages_to_rialto.rs b/bridges/relays/bin-substrate/src/rialto_millau/millau_messages_to_rialto.rs index ba68b9e490..0e80aacc17 100644 --- a/bridges/relays/bin-substrate/src/rialto_millau/millau_messages_to_rialto.rs +++ b/bridges/relays/bin-substrate/src/rialto_millau/millau_messages_to_rialto.rs @@ -29,7 +29,10 @@ use frame_support::dispatch::GetDispatchInfo; use messages_relay::message_lane::MessageLane; use relay_millau_client::{HeaderId as MillauHeaderId, Millau, SigningParams as MillauSigningParams}; use relay_rialto_client::{HeaderId as RialtoHeaderId, Rialto, SigningParams as RialtoSigningParams}; -use relay_substrate_client::{Chain, TransactionSignScheme}; +use relay_substrate_client::{ + metrics::{FloatStorageValueMetric, StorageProofOverheadMetric}, + Chain, TransactionSignScheme, +}; use relay_utils::metrics::MetricsParams; use sp_core::{Bytes, Pair}; use std::{ops::RangeInclusive, time::Duration}; @@ -135,7 +138,7 @@ pub async fn run( rialto_client: RialtoClient, rialto_sign: RialtoSigningParams, lane_id: LaneId, - metrics_params: Option, + metrics_params: MetricsParams, ) -> Result<(), String> { let stall_timeout = Duration::from_secs(5 * 60); let relayer_id_at_millau = millau_sign.public().as_array_ref().clone().into(); @@ -185,9 +188,27 @@ pub async fn run( max_messages_size_in_single_batch, }, }, - MillauSourceClient::new(millau_client, lane.clone(), lane_id, RIALTO_BRIDGE_INSTANCE), + MillauSourceClient::new(millau_client.clone(), lane.clone(), lane_id, RIALTO_BRIDGE_INSTANCE), RialtoTargetClient::new(rialto_client, lane, lane_id, MILLAU_BRIDGE_INSTANCE), - metrics_params, + relay_utils::relay_metrics( + messages_relay::message_lane_loop::metrics_prefix::(&lane_id), + metrics_params.address, + ) + .standalone_metric(StorageProofOverheadMetric::new( + millau_client.clone(), + (bp_runtime::RIALTO_BRIDGE_INSTANCE, lane_id), + millau_runtime::rialto_messages::inbound_lane_data_key(&lane_id), + "millau_storage_proof_overhead".into(), + "Millau storage proof overhead".into(), + ))? + .standalone_metric(FloatStorageValueMetric::<_, sp_runtime::FixedU128>::new( + millau_client, + sp_core::storage::StorageKey(millau_runtime::rialto_messages::RialtoToMillauConversionRate::key().to_vec()), + Some(millau_runtime::rialto_messages::INITIAL_RIALTO_TO_MILLAU_CONVERSION_RATE), + "millau_rialto_to_millau_conversion_rate".into(), + "Rialto to Millau tokens conversion rate (used by Rialto)".into(), + ))? + .into_params(), futures::future::pending(), ) .await diff --git a/bridges/relays/bin-substrate/src/rialto_millau/rialto_messages_to_millau.rs b/bridges/relays/bin-substrate/src/rialto_millau/rialto_messages_to_millau.rs index e29d23b015..ed231cd208 100644 --- a/bridges/relays/bin-substrate/src/rialto_millau/rialto_messages_to_millau.rs +++ b/bridges/relays/bin-substrate/src/rialto_millau/rialto_messages_to_millau.rs @@ -29,7 +29,10 @@ use frame_support::dispatch::GetDispatchInfo; use messages_relay::message_lane::MessageLane; use relay_millau_client::{HeaderId as MillauHeaderId, Millau, SigningParams as MillauSigningParams}; use relay_rialto_client::{HeaderId as RialtoHeaderId, Rialto, SigningParams as RialtoSigningParams}; -use relay_substrate_client::{Chain, TransactionSignScheme}; +use relay_substrate_client::{ + metrics::{FloatStorageValueMetric, StorageProofOverheadMetric}, + Chain, TransactionSignScheme, +}; use relay_utils::metrics::MetricsParams; use sp_core::{Bytes, Pair}; use std::{ops::RangeInclusive, time::Duration}; @@ -135,7 +138,7 @@ pub async fn run( millau_client: MillauClient, millau_sign: MillauSigningParams, lane_id: LaneId, - metrics_params: Option, + metrics_params: MetricsParams, ) -> Result<(), String> { let stall_timeout = Duration::from_secs(5 * 60); let relayer_id_at_rialto = rialto_sign.public().as_array_ref().clone().into(); @@ -184,9 +187,27 @@ pub async fn run( max_messages_size_in_single_batch, }, }, - RialtoSourceClient::new(rialto_client, lane.clone(), lane_id, MILLAU_BRIDGE_INSTANCE), + RialtoSourceClient::new(rialto_client.clone(), lane.clone(), lane_id, MILLAU_BRIDGE_INSTANCE), MillauTargetClient::new(millau_client, lane, lane_id, RIALTO_BRIDGE_INSTANCE), - metrics_params, + relay_utils::relay_metrics( + messages_relay::message_lane_loop::metrics_prefix::(&lane_id), + metrics_params.address, + ) + .standalone_metric(StorageProofOverheadMetric::new( + rialto_client.clone(), + (bp_runtime::MILLAU_BRIDGE_INSTANCE, lane_id), + rialto_runtime::millau_messages::inbound_lane_data_key(&lane_id), + "rialto_storage_proof_overhead".into(), + "Rialto storage proof overhead".into(), + ))? + .standalone_metric(FloatStorageValueMetric::<_, sp_runtime::FixedU128>::new( + rialto_client, + sp_core::storage::StorageKey(rialto_runtime::millau_messages::MillauToRialtoConversionRate::key().to_vec()), + Some(rialto_runtime::millau_messages::INITIAL_MILLAU_TO_RIALTO_CONVERSION_RATE), + "rialto_millau_to_rialto_conversion_rate".into(), + "Millau to Rialto tokens conversion rate (used by Millau)".into(), + ))? + .into_params(), futures::future::pending(), ) .await diff --git a/bridges/relays/bin-substrate/src/rialto_millau/westend_headers_to_millau.rs b/bridges/relays/bin-substrate/src/rialto_millau/westend_headers_to_millau.rs index 2ba9069b25..ac69a7437e 100644 --- a/bridges/relays/bin-substrate/src/rialto_millau/westend_headers_to_millau.rs +++ b/bridges/relays/bin-substrate/src/rialto_millau/westend_headers_to_millau.rs @@ -22,6 +22,7 @@ use bp_header_chain::justification::GrandpaJustification; use codec::Encode; use relay_millau_client::{Millau, SigningParams as MillauSigningParams}; use relay_substrate_client::{Chain, TransactionSignScheme}; +use relay_utils::metrics::{FloatJsonValueMetric, MetricsParams}; use relay_westend_client::{SyncHeader as WestendSyncHeader, Westend}; use sp_core::{Bytes, Pair}; @@ -33,6 +34,29 @@ impl SubstrateFinalitySyncPipeline for WestendFinalityToMillau { type TargetChain = Millau; + fn customize_metrics(params: MetricsParams) -> anyhow::Result { + Ok( + relay_utils::relay_metrics(finality_relay::metrics_prefix::(), params.address) + // Polkadot/Kusama prices are added as metrics here, because atm we don't have Polkadot <-> Kusama + // relays, but we want to test metrics/dashboards in advance + .standalone_metric(FloatJsonValueMetric::new( + "https://api.coingecko.com/api/v3/simple/price?ids=Polkadot&vs_currencies=usd".into(), + "$.polkadot.usd".into(), + "polkadot_price".into(), + "Polkadot price in USD".into(), + )) + .map_err(|e| anyhow::format_err!("{}", e))? + .standalone_metric(FloatJsonValueMetric::new( + "https://api.coingecko.com/api/v3/simple/price?ids=Kusama&vs_currencies=usd".into(), + "$.kusama.usd".into(), + "kusama_price".into(), + "Kusama price in USD".into(), + )) + .map_err(|e| anyhow::format_err!("{}", e))? + .into_params(), + ) + } + fn transactions_author(&self) -> bp_millau::AccountId { self.target_sign.public().as_array_ref().clone().into() } diff --git a/bridges/relays/client-substrate/src/chain.rs b/bridges/relays/client-substrate/src/chain.rs index 1a06582ec1..07e0a9e17c 100644 --- a/bridges/relays/client-substrate/src/chain.rs +++ b/bridges/relays/client-substrate/src/chain.rs @@ -29,7 +29,7 @@ use sp_runtime::{ use std::{fmt::Debug, time::Duration}; /// Substrate-based chain from minimal relay-client point of view. -pub trait Chain: ChainBase { +pub trait Chain: ChainBase + Clone { /// Chain name. const NAME: &'static str; /// Average block interval. diff --git a/bridges/relays/client-substrate/src/client.rs b/bridges/relays/client-substrate/src/client.rs index 289118753c..a149ba6970 100644 --- a/bridges/relays/client-substrate/src/client.rs +++ b/bridges/relays/client-substrate/src/client.rs @@ -29,7 +29,7 @@ use jsonrpsee_types::{jsonrpc::DeserializeOwned, traits::SubscriptionClient}; use jsonrpsee_ws_client::{WsClient as RpcClient, WsConfig as RpcConfig, WsSubscription as Subscription}; use num_traits::Zero; use pallet_balances::AccountData; -use sp_core::Bytes; +use sp_core::{storage::StorageKey, Bytes}; use sp_trie::StorageProof; use sp_version::RuntimeVersion; use std::ops::RangeInclusive; @@ -177,6 +177,14 @@ impl Client { Ok(Substrate::::runtime_version(&self.client).await?) } + /// Read value from runtime storage. + pub async fn storage_value(&self, storage_key: StorageKey) -> Result> { + Substrate::::get_storage(&self.client, storage_key) + .await? + .map(|encoded_value| T::decode(&mut &encoded_value.0[..]).map_err(Error::ResponseParseFailed)) + .transpose() + } + /// Return native tokens balance of the account. pub async fn free_native_balance(&self, account: C::AccountId) -> Result where diff --git a/bridges/relays/client-substrate/src/error.rs b/bridges/relays/client-substrate/src/error.rs index 899ce4a2b2..db1cdb4374 100644 --- a/bridges/relays/client-substrate/src/error.rs +++ b/bridges/relays/client-substrate/src/error.rs @@ -38,6 +38,8 @@ pub enum Error { AccountDoesNotExist, /// The client we're connected to is not synced, so we can't rely on its state. ClientNotSynced(Health), + /// An error has happened when we have tried to parse storage proof. + StorageProofError(bp_runtime::StorageProofError), /// Custom logic error. Custom(String), } @@ -50,6 +52,7 @@ impl std::error::Error for Error { Self::UninitializedBridgePallet => None, Self::AccountDoesNotExist => None, Self::ClientNotSynced(_) => None, + Self::StorageProofError(_) => None, Self::Custom(_) => None, } } @@ -81,6 +84,7 @@ impl std::fmt::Display for Error { Self::ResponseParseFailed(e) => e.to_string(), Self::UninitializedBridgePallet => "The Substrate bridge pallet has not been initialized yet.".into(), Self::AccountDoesNotExist => "Account does not exist on the chain".into(), + Self::StorageProofError(e) => format!("Error when parsing storage proof: {:?}", e), Self::ClientNotSynced(health) => format!("Substrate client is not synced: {}", health), Self::Custom(e) => e.clone(), }; diff --git a/bridges/relays/client-substrate/src/guard.rs b/bridges/relays/client-substrate/src/guard.rs index 66a6fbdee7..3bd3df9098 100644 --- a/bridges/relays/client-substrate/src/guard.rs +++ b/bridges/relays/client-substrate/src/guard.rs @@ -172,6 +172,7 @@ mod tests { SinkExt, }; + #[derive(Debug, Clone)] struct TestChain; impl bp_runtime::Chain for TestChain { diff --git a/bridges/relays/client-substrate/src/lib.rs b/bridges/relays/client-substrate/src/lib.rs index 9190e17247..0f1bfb481e 100644 --- a/bridges/relays/client-substrate/src/lib.rs +++ b/bridges/relays/client-substrate/src/lib.rs @@ -27,6 +27,7 @@ mod sync_header; pub mod finality_source; pub mod guard; pub mod headers_source; +pub mod metrics; pub use crate::chain::{BlockWithJustification, Chain, ChainWithBalances, TransactionSignScheme}; pub use crate::client::{Client, JustificationsSubscription, OpaqueGrandpaAuthoritiesSet}; diff --git a/bridges/relays/client-substrate/src/metrics/float_storage_value.rs b/bridges/relays/client-substrate/src/metrics/float_storage_value.rs new file mode 100644 index 0000000000..e1647d4bcd --- /dev/null +++ b/bridges/relays/client-substrate/src/metrics/float_storage_value.rs @@ -0,0 +1,91 @@ +// Copyright 2019-2021 Parity Technologies (UK) Ltd. +// This file is part of Parity Bridges Common. + +// Parity Bridges Common is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity Bridges Common is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity Bridges Common. If not, see . + +use crate::chain::Chain; +use crate::client::Client; + +use async_trait::async_trait; +use codec::Decode; +use relay_utils::metrics::{register, Gauge, Metrics, Registry, StandaloneMetrics, F64}; +use sp_core::storage::StorageKey; +use sp_runtime::{traits::UniqueSaturatedInto, FixedPointNumber}; +use std::time::Duration; + +/// Storage value update interval (in blocks). +const UPDATE_INTERVAL_IN_BLOCKS: u32 = 5; + +/// Metric that represents fixed-point runtime storage value as float gauge. +#[derive(Clone, Debug)] +pub struct FloatStorageValueMetric { + client: Client, + storage_key: StorageKey, + maybe_default_value: Option, + metric: Gauge, +} + +impl FloatStorageValueMetric { + /// Create new metric. + pub fn new( + client: Client, + storage_key: StorageKey, + maybe_default_value: Option, + name: String, + help: String, + ) -> Self { + FloatStorageValueMetric { + client, + storage_key, + maybe_default_value, + metric: Gauge::new(name, help).expect( + "only fails if gauge options are customized;\ + we use default options;\ + qed", + ), + } + } +} + +impl Metrics for FloatStorageValueMetric { + fn register(&self, registry: &Registry) -> Result<(), String> { + register(self.metric.clone(), registry).map_err(|e| e.to_string())?; + Ok(()) + } +} + +#[async_trait] +impl StandaloneMetrics for FloatStorageValueMetric +where + T: 'static + Decode + Send + Sync + FixedPointNumber, +{ + fn update_interval(&self) -> Duration { + C::AVERAGE_BLOCK_INTERVAL * UPDATE_INTERVAL_IN_BLOCKS + } + + async fn update(&self) { + relay_utils::metrics::set_gauge_value( + &self.metric, + self.client + .storage_value::(self.storage_key.clone()) + .await + .map(|maybe_storage_value| { + maybe_storage_value.or(self.maybe_default_value).map(|storage_value| { + storage_value.into_inner().unique_saturated_into() as f64 + / T::DIV.unique_saturated_into() as f64 + }) + }), + ); + } +} diff --git a/bridges/relays/client-substrate/src/metrics/mod.rs b/bridges/relays/client-substrate/src/metrics/mod.rs new file mode 100644 index 0000000000..177e2a709c --- /dev/null +++ b/bridges/relays/client-substrate/src/metrics/mod.rs @@ -0,0 +1,23 @@ +// Copyright 2019-2020 Parity Technologies (UK) Ltd. +// This file is part of Parity Bridges Common. + +// Parity Bridges Common is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity Bridges Common is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity Bridges Common. If not, see . + +//! Contains several Substrate-specific metrics that may be exposed by relay. + +pub use float_storage_value::FloatStorageValueMetric; +pub use storage_proof_overhead::StorageProofOverheadMetric; + +mod float_storage_value; +mod storage_proof_overhead; diff --git a/bridges/relays/client-substrate/src/metrics/storage_proof_overhead.rs b/bridges/relays/client-substrate/src/metrics/storage_proof_overhead.rs new file mode 100644 index 0000000000..167f59605d --- /dev/null +++ b/bridges/relays/client-substrate/src/metrics/storage_proof_overhead.rs @@ -0,0 +1,135 @@ +// Copyright 2019-2021 Parity Technologies (UK) Ltd. +// This file is part of Parity Bridges Common. + +// Parity Bridges Common is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity Bridges Common is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity Bridges Common. If not, see . + +use crate::chain::Chain; +use crate::client::Client; +use crate::error::Error; + +use async_trait::async_trait; +use bp_messages::LaneId; +use bp_runtime::InstanceId; +use relay_utils::metrics::{register, Gauge, Metrics, Registry, StandaloneMetrics, U64}; +use sp_core::storage::StorageKey; +use sp_runtime::traits::Header as HeaderT; +use sp_trie::StorageProof; +use std::time::Duration; + +/// Storage proof overhead update interval (in blocks). +const UPDATE_INTERVAL_IN_BLOCKS: u32 = 100; + +/// Metric that represents extra size of storage proof as unsigned integer gauge. +/// +/// Regular Substrate node does not provide any RPC endpoints that return storage proofs. +/// So here we're using our own `pallet-bridge-messages-rpc` RPC API, which returns proof +/// of the inbound message lane state. Then we simply subtract size of this state from +/// the size of storage proof to compute metric value. +/// +/// There are two things to keep in mind when using this metric: +/// +/// 1) it'll only work on inbound lanes that have already accepted at least one message; +/// 2) the overhead may be slightly different for other values, but this metric gives a good estimation. +#[derive(Debug)] +pub struct StorageProofOverheadMetric { + client: Client, + inbound_lane: (InstanceId, LaneId), + inbound_lane_data_key: StorageKey, + metric: Gauge, +} + +impl Clone for StorageProofOverheadMetric { + fn clone(&self) -> Self { + StorageProofOverheadMetric { + client: self.client.clone(), + inbound_lane: self.inbound_lane, + inbound_lane_data_key: self.inbound_lane_data_key.clone(), + metric: self.metric.clone(), + } + } +} + +impl StorageProofOverheadMetric { + /// Create new metric instance with given name and help. + pub fn new( + client: Client, + inbound_lane: (InstanceId, LaneId), + inbound_lane_data_key: StorageKey, + name: String, + help: String, + ) -> Self { + StorageProofOverheadMetric { + client, + inbound_lane, + inbound_lane_data_key, + metric: Gauge::new(name, help).expect( + "only fails if gauge options are customized;\ + we use default options;\ + qed", + ), + } + } + + /// Returns approximate storage proof size overhead. + /// + /// Returs `Ok(None)` if inbound lane we're watching for has no state. This shouldn't be treated as error. + async fn compute_storage_proof_overhead(&self) -> Result, Error> { + let best_header_hash = self.client.best_finalized_header_hash().await?; + let best_header = self.client.header_by_hash(best_header_hash).await?; + + let storage_proof = self + .client + .prove_messages_delivery(self.inbound_lane.0, self.inbound_lane.1, best_header_hash) + .await?; + let storage_proof_size: usize = storage_proof.iter().map(|n| n.len()).sum(); + + let storage_value_reader = bp_runtime::StorageProofChecker::::new( + *best_header.state_root(), + StorageProof::new(storage_proof), + ) + .map_err(Error::StorageProofError)?; + let maybe_encoded_storage_value = storage_value_reader + .read_value(&self.inbound_lane_data_key.0) + .map_err(Error::StorageProofError)?; + let encoded_storage_value_size = match maybe_encoded_storage_value { + Some(encoded_storage_value) => encoded_storage_value.len(), + None => return Ok(None), + }; + + Ok(Some(storage_proof_size - encoded_storage_value_size)) + } +} + +impl Metrics for StorageProofOverheadMetric { + fn register(&self, registry: &Registry) -> Result<(), String> { + register(self.metric.clone(), registry).map_err(|e| e.to_string())?; + Ok(()) + } +} + +#[async_trait] +impl StandaloneMetrics for StorageProofOverheadMetric { + fn update_interval(&self) -> Duration { + C::AVERAGE_BLOCK_INTERVAL * UPDATE_INTERVAL_IN_BLOCKS + } + + async fn update(&self) { + relay_utils::metrics::set_gauge_value( + &self.metric, + self.compute_storage_proof_overhead() + .await + .map(|v| v.map(|overhead| overhead as u64)), + ); + } +} diff --git a/bridges/relays/exchange/src/exchange_loop.rs b/bridges/relays/exchange/src/exchange_loop.rs index 3ef6503353..7bb4abb746 100644 --- a/bridges/relays/exchange/src/exchange_loop.rs +++ b/bridges/relays/exchange/src/exchange_loop.rs @@ -83,16 +83,19 @@ pub async fn run( storage: impl TransactionProofsRelayStorage>, source_client: impl SourceClient

, target_client: impl TargetClient

, - metrics_params: Option, + metrics_params: MetricsParams, exit_signal: impl Future, ) -> Result<(), String> { let exit_signal = exit_signal.shared(); relay_utils::relay_loop(source_client, target_client) - .with_metrics(format!("{}_to_{}_Exchange", P::SOURCE_NAME, P::TARGET_NAME)) + .with_metrics( + format!("{}_to_{}_Exchange", P::SOURCE_NAME, P::TARGET_NAME), + metrics_params, + ) .loop_metric(ExchangeLoopMetrics::default())? .standalone_metric(GlobalMetrics::default())? - .expose(metrics_params) + .expose() .await? .run(|source_client, target_client, metrics| { run_until_connection_lost( @@ -303,7 +306,7 @@ mod tests { storage, source, target, - None, + MetricsParams::disabled(), exit_receiver.into_future().map(|(_, _)| ()), )); } diff --git a/bridges/relays/finality/src/finality_loop.rs b/bridges/relays/finality/src/finality_loop.rs index 512c51b4b2..e0ad8a7c1b 100644 --- a/bridges/relays/finality/src/finality_loop.rs +++ b/bridges/relays/finality/src/finality_loop.rs @@ -90,20 +90,25 @@ pub trait TargetClient: RelayClient { async fn submit_finality_proof(&self, header: P::Header, proof: P::FinalityProof) -> Result<(), Self::Error>; } +/// Return prefix that will be used by default to expose Prometheus metrics of the finality proofs sync loop. +pub fn metrics_prefix() -> String { + format!("{}_to_{}_Sync", P::SOURCE_NAME, P::TARGET_NAME) +} + /// Run finality proofs synchronization loop. pub async fn run( source_client: impl SourceClient

, target_client: impl TargetClient

, sync_params: FinalitySyncParams, - metrics_params: Option, + metrics_params: MetricsParams, exit_signal: impl Future, ) -> Result<(), String> { let exit_signal = exit_signal.shared(); relay_utils::relay_loop(source_client, target_client) - .with_metrics(format!("{}_to_{}_Sync", P::SOURCE_NAME, P::TARGET_NAME)) + .with_metrics(metrics_prefix::

(), metrics_params) .loop_metric(SyncLoopMetrics::default())? .standalone_metric(GlobalMetrics::default())? - .expose(metrics_params) + .expose() .await? .run(|source_client, target_client, metrics| { run_until_connection_lost( diff --git a/bridges/relays/finality/src/finality_loop_tests.rs b/bridges/relays/finality/src/finality_loop_tests.rs index e19f924376..eedd902003 100644 --- a/bridges/relays/finality/src/finality_loop_tests.rs +++ b/bridges/relays/finality/src/finality_loop_tests.rs @@ -27,7 +27,7 @@ use crate::{FinalityProof, FinalitySyncPipeline, SourceHeader}; use async_trait::async_trait; use futures::{FutureExt, Stream, StreamExt}; use parking_lot::Mutex; -use relay_utils::{relay_loop::Client as RelayClient, MaybeConnectionError}; +use relay_utils::{metrics::MetricsParams, relay_loop::Client as RelayClient, MaybeConnectionError}; use std::{collections::HashMap, pin::Pin, sync::Arc, time::Duration}; type IsMandatory = bool; @@ -206,7 +206,7 @@ fn run_sync_loop(state_function: impl Fn(&mut ClientsData) -> bool + Send + Sync source_client, target_client, sync_params, - None, + MetricsParams::disabled(), exit_receiver.into_future().map(|(_, _)| ()), )); diff --git a/bridges/relays/finality/src/lib.rs b/bridges/relays/finality/src/lib.rs index f587749976..d5048aa160 100644 --- a/bridges/relays/finality/src/lib.rs +++ b/bridges/relays/finality/src/lib.rs @@ -19,7 +19,7 @@ //! are still submitted to the target node, but are treated as auxiliary data as we are not trying //! to submit all source headers to the target node. -pub use crate::finality_loop::{run, FinalitySyncParams, SourceClient, TargetClient}; +pub use crate::finality_loop::{metrics_prefix, run, FinalitySyncParams, SourceClient, TargetClient}; use bp_header_chain::FinalityProof; use std::fmt::Debug; diff --git a/bridges/relays/headers/src/sync_loop.rs b/bridges/relays/headers/src/sync_loop.rs index 0e535d8caa..2e0d12ec87 100644 --- a/bridges/relays/headers/src/sync_loop.rs +++ b/bridges/relays/headers/src/sync_loop.rs @@ -119,15 +119,15 @@ pub async fn run>( target_tick: Duration, sync_maintain: impl SyncMaintain

, sync_params: HeadersSyncParams, - metrics_params: Option, + metrics_params: MetricsParams, exit_signal: impl Future, ) -> Result<(), String> { let exit_signal = exit_signal.shared(); relay_utils::relay_loop(source_client, target_client) - .with_metrics(format!("{}_to_{}_Sync", P::SOURCE_NAME, P::TARGET_NAME)) + .with_metrics(format!("{}_to_{}_Sync", P::SOURCE_NAME, P::TARGET_NAME), metrics_params) .loop_metric(SyncLoopMetrics::default())? .standalone_metric(GlobalMetrics::default())? - .expose(metrics_params) + .expose() .await? .run(|source_client, target_client, metrics| { run_until_connection_lost( diff --git a/bridges/relays/headers/src/sync_loop_tests.rs b/bridges/relays/headers/src/sync_loop_tests.rs index ff56dc45d2..3347c4d0d3 100644 --- a/bridges/relays/headers/src/sync_loop_tests.rs +++ b/bridges/relays/headers/src/sync_loop_tests.rs @@ -24,7 +24,8 @@ use backoff::backoff::Backoff; use futures::{future::FutureExt, stream::StreamExt}; use parking_lot::Mutex; use relay_utils::{ - process_future_result, relay_loop::Client as RelayClient, retry_backoff, HeaderId, MaybeConnectionError, + metrics::MetricsParams, process_future_result, relay_loop::Client as RelayClient, retry_backoff, HeaderId, + MaybeConnectionError, }; use std::{ collections::{HashMap, HashSet}, @@ -500,7 +501,7 @@ fn run_sync_loop_test(params: SyncLoopTestParams) { test_tick(), (), crate::sync::tests::default_sync_params(), - None, + MetricsParams::disabled(), exit_receiver.into_future().map(|(_, _)| ()), )); } diff --git a/bridges/relays/messages/src/message_lane_loop.rs b/bridges/relays/messages/src/message_lane_loop.rs index 24a5314d23..daa74b5073 100644 --- a/bridges/relays/messages/src/message_lane_loop.rs +++ b/bridges/relays/messages/src/message_lane_loop.rs @@ -211,26 +211,31 @@ pub struct ClientsState { pub target: Option>, } +/// Return prefix that will be used by default to expose Prometheus metrics of the finality proofs sync loop. +pub fn metrics_prefix(lane: &LaneId) -> String { + format!( + "{}_to_{}_MessageLane_{}", + P::SOURCE_NAME, + P::TARGET_NAME, + hex::encode(lane) + ) +} + /// Run message lane service loop. pub async fn run( params: Params, source_client: impl SourceClient

, target_client: impl TargetClient

, - metrics_params: Option, + metrics_params: MetricsParams, exit_signal: impl Future, ) -> Result<(), String> { let exit_signal = exit_signal.shared(); relay_utils::relay_loop(source_client, target_client) .reconnect_delay(params.reconnect_delay) - .with_metrics(format!( - "{}_to_{}_MessageLane_{}", - P::SOURCE_NAME, - P::TARGET_NAME, - hex::encode(params.lane) - )) + .with_metrics(metrics_prefix::

(¶ms.lane), metrics_params) .loop_metric(MessageLaneLoopMetrics::default())? .standalone_metric(GlobalMetrics::default())? - .expose(metrics_params) + .expose() .await? .run(|source_client, target_client, metrics| { run_until_connection_lost( @@ -722,7 +727,7 @@ pub(crate) mod tests { }, source_client, target_client, - None, + MetricsParams::disabled(), exit_signal, ) .await; diff --git a/bridges/relays/utils/Cargo.toml b/bridges/relays/utils/Cargo.toml index ce6a20bbc4..ff80cab533 100644 --- a/bridges/relays/utils/Cargo.toml +++ b/bridges/relays/utils/Cargo.toml @@ -10,10 +10,13 @@ ansi_term = "0.12" async-std = "1.6.5" async-trait = "0.1.40" backoff = "0.2" +isahc = "1.2" env_logger = "0.8.2" futures = "0.3.5" +jsonpath_lib = "0.2" log = "0.4.11" num-traits = "0.2" +serde_json = "1.0" sysinfo = "0.15" time = "0.2" diff --git a/bridges/relays/utils/src/lib.rs b/bridges/relays/utils/src/lib.rs index ab913b758d..446e00cd23 100644 --- a/bridges/relays/utils/src/lib.rs +++ b/bridges/relays/utils/src/lib.rs @@ -16,7 +16,7 @@ //! Utilities used by different relays. -pub use relay_loop::relay_loop; +pub use relay_loop::{relay_loop, relay_metrics}; use backoff::{backoff::Backoff, ExponentialBackoff}; use futures::future::FutureExt; diff --git a/bridges/relays/utils/src/metrics.rs b/bridges/relays/utils/src/metrics.rs index 41df88ecd0..789ff8a108 100644 --- a/bridges/relays/utils/src/metrics.rs +++ b/bridges/relays/utils/src/metrics.rs @@ -14,23 +14,37 @@ // You should have received a copy of the GNU General Public License // along with Parity Bridges Common. If not, see . +pub use float_json_value::FloatJsonValueMetric; pub use global::GlobalMetrics; -pub use substrate_prometheus_endpoint::{register, Counter, CounterVec, Gauge, GaugeVec, Opts, Registry, F64, U64}; +pub use substrate_prometheus_endpoint::{ + prometheus::core::{Atomic, Collector}, + register, Counter, CounterVec, Gauge, GaugeVec, Opts, Registry, F64, U64, +}; use async_trait::async_trait; -use std::time::Duration; +use std::{fmt::Debug, time::Duration}; +mod float_json_value; mod global; -/// Prometheus endpoint MetricsParams. +/// Unparsed address that needs to be used to expose Prometheus metrics. #[derive(Debug, Clone)] -pub struct MetricsParams { +pub struct MetricsAddress { /// Serve HTTP requests at given host. pub host: String, /// Serve HTTP requests at given port. pub port: u16, } +/// Prometheus endpoint MetricsParams. +#[derive(Debug, Clone)] +pub struct MetricsParams { + /// Interface and TCP port to be used when exposing Prometheus metrics. + pub address: Option, + /// Metrics registry. May be `Some(_)` if several components share the same endpoint. + pub registry: Option, +} + /// Metrics API. pub trait Metrics: Clone + Send + Sync + 'static { /// Register metrics in the registry. @@ -61,11 +75,64 @@ pub trait StandaloneMetrics: Metrics { } } -impl Default for MetricsParams { +impl Default for MetricsAddress { fn default() -> Self { - MetricsParams { + MetricsAddress { host: "127.0.0.1".into(), port: 9616, } } } + +impl MetricsParams { + /// Creates metrics params so that metrics are not exposed. + pub fn disabled() -> Self { + MetricsParams { + address: None, + registry: None, + } + } +} + +impl From> for MetricsParams { + fn from(address: Option) -> Self { + MetricsParams { + address, + registry: None, + } + } +} + +/// Set value of gauge metric. +/// +/// If value is `Ok(None)` or `Err(_)`, metric would have default value. +pub fn set_gauge_value, E: Debug>(gauge: &Gauge, value: Result, E>) { + gauge.set(match value { + Ok(Some(value)) => { + log::trace!( + target: "bridge-metrics", + "Updated value of metric '{:?}': {:?}", + gauge.desc().first().map(|d| &d.fq_name), + value, + ); + value + } + Ok(None) => { + log::warn!( + target: "bridge-metrics", + "Failed to update metric '{:?}': value is empty", + gauge.desc().first().map(|d| &d.fq_name), + ); + Default::default() + } + Err(error) => { + log::warn!( + target: "bridge-metrics", + "Failed to update metric '{:?}': {:?}", + gauge.desc().first().map(|d| &d.fq_name), + error, + ); + Default::default() + } + }) +} diff --git a/bridges/relays/utils/src/metrics/float_json_value.rs b/bridges/relays/utils/src/metrics/float_json_value.rs new file mode 100644 index 0000000000..902b3e8195 --- /dev/null +++ b/bridges/relays/utils/src/metrics/float_json_value.rs @@ -0,0 +1,125 @@ +// Copyright 2019-2020 Parity Technologies (UK) Ltd. +// This file is part of Parity Bridges Common. + +// Parity Bridges Common is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity Bridges Common is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity Bridges Common. If not, see . + +use crate::metrics::{register, Gauge, Metrics, Registry, StandaloneMetrics, F64}; + +use async_trait::async_trait; +use std::time::Duration; + +/// Value update interval. +const UPDATE_INTERVAL: Duration = Duration::from_secs(60); + +/// Metric that represents float value received from HTTP service as float gauge. +#[derive(Debug, Clone)] +pub struct FloatJsonValueMetric { + url: String, + json_path: String, + metric: Gauge, +} + +impl FloatJsonValueMetric { + /// Create new metric instance with given name and help. + pub fn new(url: String, json_path: String, name: String, help: String) -> Self { + FloatJsonValueMetric { + url, + json_path, + metric: Gauge::new(name, help).expect( + "only fails if gauge options are customized;\ + we use default options;\ + qed", + ), + } + } + + /// Read value from HTTP service. + async fn read_value(&self) -> Result { + use isahc::{AsyncReadResponseExt, HttpClient, Request}; + + fn map_isahc_err(err: impl std::fmt::Display) -> String { + format!("Failed to fetch token price from remote server: {}", err) + } + + let request = Request::get(&self.url) + .header("Accept", "application/json") + .body(()) + .map_err(map_isahc_err)?; + let raw_response = HttpClient::new() + .map_err(map_isahc_err)? + .send_async(request) + .await + .map_err(map_isahc_err)? + .text() + .await + .map_err(map_isahc_err)?; + + parse_service_response(&self.json_path, &raw_response) + } +} + +impl Metrics for FloatJsonValueMetric { + fn register(&self, registry: &Registry) -> Result<(), String> { + register(self.metric.clone(), registry).map_err(|e| e.to_string())?; + Ok(()) + } +} + +#[async_trait] +impl StandaloneMetrics for FloatJsonValueMetric { + fn update_interval(&self) -> Duration { + UPDATE_INTERVAL + } + + async fn update(&self) { + crate::metrics::set_gauge_value(&self.metric, self.read_value().await.map(Some)); + } +} + +/// Parse HTTP service response. +fn parse_service_response(json_path: &str, response: &str) -> Result { + let json = serde_json::from_str(response).map_err(|err| { + format!( + "Failed to parse HTTP service response: {:?}. Response: {:?}", + err, response, + ) + })?; + + let mut selector = jsonpath_lib::selector(&json); + let maybe_selected_value = selector(json_path).map_err(|err| { + format!( + "Failed to select value from response: {:?}. Response: {:?}", + err, response, + ) + })?; + let selected_value = maybe_selected_value + .first() + .and_then(|v| v.as_f64()) + .ok_or_else(|| format!("Missing required value from response: {:?}", response,))?; + + Ok(selected_value) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn parse_service_response_works() { + assert_eq!( + parse_service_response("$.kusama.usd", r#"{"kusama":{"usd":433.05}}"#).map_err(drop), + Ok(433.05), + ); + } +} diff --git a/bridges/relays/utils/src/metrics/global.rs b/bridges/relays/utils/src/metrics/global.rs index c19ac8d468..98c7d9a570 100644 --- a/bridges/relays/utils/src/metrics/global.rs +++ b/bridges/relays/utils/src/metrics/global.rs @@ -16,12 +16,11 @@ //! Global system-wide Prometheus metrics exposed by relays. -use crate::metrics::{Metrics, StandaloneMetrics}; +use crate::metrics::{register, Gauge, GaugeVec, Metrics, Opts, Registry, StandaloneMetrics, F64, U64}; use async_std::sync::{Arc, Mutex}; use async_trait::async_trait; use std::time::Duration; -use substrate_prometheus_endpoint::{register, Gauge, GaugeVec, Opts, Registry, F64, U64}; use sysinfo::{ProcessExt, RefreshKind, System, SystemExt}; /// Global metrics update interval. @@ -79,7 +78,7 @@ impl StandaloneMetrics for GlobalMetrics { } _ => { log::warn!( - target: "bridge", + target: "bridge-metrics", "Failed to refresh process information. Metrics may show obsolete values", ); } diff --git a/bridges/relays/utils/src/relay_loop.rs b/bridges/relays/utils/src/relay_loop.rs index 04077bf08a..1a8d63e1db 100644 --- a/bridges/relays/utils/src/relay_loop.rs +++ b/bridges/relays/utils/src/relay_loop.rs @@ -14,7 +14,7 @@ // You should have received a copy of the GNU General Public License // along with Parity Bridges Common. If not, see . -use crate::metrics::{Metrics, MetricsParams, StandaloneMetrics}; +use crate::metrics::{Metrics, MetricsAddress, MetricsParams, StandaloneMetrics}; use crate::{FailedClient, MaybeConnectionError}; use async_trait::async_trait; @@ -44,6 +44,24 @@ pub fn relay_loop(source_client: SC, target_client: TC) -> Loop) -> LoopMetrics<(), (), ()> { + assert!(!prefix.is_empty(), "Metrics prefix can not be empty"); + + LoopMetrics { + relay_loop: Loop { + reconnect_delay: RECONNECT_DELAY, + source_client: (), + target_client: (), + loop_metric: None, + }, + address, + registry: Registry::new_custom(Some(prefix), None) + .expect("only fails if prefix is empty; prefix is not empty; qed"), + loop_metric: None, + } +} + /// Generic relay loop. pub struct Loop { reconnect_delay: Duration, @@ -55,6 +73,7 @@ pub struct Loop { /// Relay loop metrics builder. pub struct LoopMetrics { relay_loop: Loop, + address: Option, registry: Registry, loop_metric: Option, } @@ -69,7 +88,7 @@ impl Loop { /// Start building loop metrics using given prefix. /// /// Panics if `prefix` is empty. - pub fn with_metrics(self, prefix: String) -> LoopMetrics { + pub fn with_metrics(self, prefix: String, params: MetricsParams) -> LoopMetrics { assert!(!prefix.is_empty(), "Metrics prefix can not be empty"); LoopMetrics { @@ -79,8 +98,12 @@ impl Loop { target_client: self.target_client, loop_metric: None, }, - registry: Registry::new_custom(Some(prefix), None) - .expect("only fails if prefix is empty; prefix is not empty; qed"), + address: params.address, + registry: match params.registry { + Some(registry) => registry, + None => Registry::new_custom(Some(prefix), None) + .expect("only fails if prefix is empty; prefix is not empty; qed"), + }, loop_metric: None, } } @@ -159,6 +182,7 @@ impl LoopMetrics { Ok(LoopMetrics { relay_loop: self.relay_loop, + address: self.address, registry: self.registry, loop_metric: Some(loop_metric), }) @@ -171,19 +195,27 @@ impl LoopMetrics { Ok(self) } - /// Expose metrics using given params. + /// Convert into `MetricsParams` structure so that metrics registry may be extended later. + pub fn into_params(self) -> MetricsParams { + MetricsParams { + address: self.address, + registry: Some(self.registry), + } + } + + /// Expose metrics using address passed at creation. /// - /// If `params` is `None`, metrics are not exposed. - pub async fn expose(self, params: Option) -> Result, String> { - if let Some(params) = params { + /// If passed `address` is `None`, metrics are not exposed. + pub async fn expose(self) -> Result, String> { + if let Some(address) = self.address { let socket_addr = SocketAddr::new( - params.host.parse().map_err(|err| { + address.host.parse().map_err(|err| { format!( "Invalid host {} is used to expose Prometheus metrics: {}", - params.host, err, + address.host, err, ) })?, - params.port, + address.port, ); let registry = self.registry;