Pre-create metrics registry before loop is started + administrative metrics (#848)

* administrative metrics

* fmt

* fix compilation

* fix compilation again

* and another one

* remove GenericLoopMetrics

* chttp -> isahc

* remove redundant marker

* not about price metrics

* fmt
This commit is contained in:
Svyatoslav Nikolsky
2021-04-06 12:54:15 +03:00
committed by Bastian Köcher
parent 21baffc832
commit cb90ea0979
34 changed files with 659 additions and 73 deletions
@@ -35,9 +35,12 @@ use sp_core::storage::StorageKey;
use sp_runtime::{FixedPointNumber, FixedU128};
use sp_std::{convert::TryFrom, ops::RangeInclusive};
/// Initial value of `RialtoToMillauConversionRate` parameter.
pub const INITIAL_RIALTO_TO_MILLAU_CONVERSION_RATE: FixedU128 = FixedU128::from_inner(FixedU128::DIV);
parameter_types! {
/// Rialto to Millau conversion rate. Initially we treat both tokens as equal.
storage RialtoToMillauConversionRate: FixedU128 = FixedU128::one();
pub storage RialtoToMillauConversionRate: FixedU128 = INITIAL_RIALTO_TO_MILLAU_CONVERSION_RATE;
}
/// Storage key of the Millau -> Rialto message in the runtime storage.
@@ -35,9 +35,12 @@ use sp_core::storage::StorageKey;
use sp_runtime::{FixedPointNumber, FixedU128};
use sp_std::{convert::TryFrom, ops::RangeInclusive};
/// Initial value of `MillauToRialtoConversionRate` parameter.
pub const INITIAL_MILLAU_TO_RIALTO_CONVERSION_RATE: FixedU128 = FixedU128::from_inner(FixedU128::DIV);
parameter_types! {
/// Millau to Rialto conversion rate. Initially we treat both tokens as equal.
storage MillauToRialtoConversionRate: FixedU128 = 1.into();
pub storage MillauToRialtoConversionRate: FixedU128 = INITIAL_MILLAU_TO_RIALTO_CONVERSION_RATE;
}
/// Storage key of the Rialto -> Millau message in the runtime storage.
+1 -1
View File
@@ -24,7 +24,7 @@ use sp_io::hashing::blake2_256;
use sp_std::convert::TryFrom;
pub use chain::{BlockNumberOf, Chain, HashOf, HasherOf, HeaderOf};
pub use storage_proof::StorageProofChecker;
pub use storage_proof::{Error as StorageProofError, StorageProofChecker};
#[cfg(feature = "std")]
pub use storage_proof::craft_valid_storage_proof;
@@ -66,7 +66,7 @@ pub struct EthereumExchangeParams {
/// Relay working mode.
pub mode: ExchangeRelayMode,
/// Metrics parameters.
pub metrics_params: Option<MetricsParams>,
pub metrics_params: MetricsParams,
/// Instance of the bridge pallet being synchronized.
pub instance: Arc<dyn BridgeInstance>,
}
@@ -72,7 +72,7 @@ pub struct EthereumSyncParams {
/// Synchronization parameters.
pub sync_params: HeadersSyncParams,
/// Metrics parameters.
pub metrics_params: Option<MetricsParams>,
pub metrics_params: MetricsParams,
/// Instance of the bridge pallet being synchronized.
pub instance: Arc<dyn BridgeInstance>,
}
+8 -5
View File
@@ -34,7 +34,10 @@ use ethereum_sync_loop::EthereumSyncParams;
use headers_relay::sync::TargetTransactionMode;
use hex_literal::hex;
use instances::{BridgeInstance, Kovan, RialtoPoA};
use relay_utils::{initialize::initialize_relay, metrics::MetricsParams};
use relay_utils::{
initialize::initialize_relay,
metrics::{MetricsAddress, MetricsParams},
};
use secp256k1::SecretKey;
use sp_core::crypto::Pair;
use substrate_sync_loop::SubstrateSyncParams;
@@ -367,12 +370,12 @@ fn ethereum_exchange_params(matches: &clap::ArgMatches) -> Result<EthereumExchan
Ok(params)
}
fn metrics_params(matches: &clap::ArgMatches) -> Result<Option<MetricsParams>, String> {
fn metrics_params(matches: &clap::ArgMatches) -> Result<MetricsParams, String> {
if matches.is_present("no-prometheus") {
return Ok(None);
return Ok(None.into());
}
let mut metrics_params = MetricsParams::default();
let mut metrics_params = MetricsAddress::default();
if let Some(prometheus_host) = matches.value_of("prometheus-host") {
metrics_params.host = prometheus_host.into();
@@ -383,7 +386,7 @@ fn metrics_params(matches: &clap::ArgMatches) -> Result<Option<MetricsParams>, S
.map_err(|e| format!("Failed to parse prometheus-port: {}", e))?;
}
Ok(Some(metrics_params))
Ok(Some(metrics_params).into())
}
fn instance_params(matches: &clap::ArgMatches) -> Result<Arc<dyn BridgeInstance>, String> {
@@ -68,7 +68,7 @@ pub struct SubstrateSyncParams {
/// Synchronization parameters.
pub sync_params: HeadersSyncParams,
/// Metrics parameters.
pub metrics_params: Option<MetricsParams>,
pub metrics_params: MetricsParams,
}
/// Substrate synchronization pipeline.
+5 -4
View File
@@ -330,15 +330,16 @@ pub struct PrometheusParams {
pub prometheus_port: u16,
}
impl From<PrometheusParams> for Option<relay_utils::metrics::MetricsParams> {
fn from(cli_params: PrometheusParams) -> Option<relay_utils::metrics::MetricsParams> {
impl From<PrometheusParams> for relay_utils::metrics::MetricsParams {
fn from(cli_params: PrometheusParams) -> relay_utils::metrics::MetricsParams {
if !cli_params.no_prometheus {
Some(relay_utils::metrics::MetricsParams {
Some(relay_utils::metrics::MetricsAddress {
host: cli_params.prometheus_host,
port: cli_params.prometheus_port,
})
.into()
} else {
None
None.into()
}
}
}
@@ -15,6 +15,7 @@
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
use crate::cli::{PrometheusParams, SourceConnectionParams, TargetConnectionParams, TargetSigningParams};
use crate::finality_pipeline::SubstrateFinalitySyncPipeline;
use structopt::{clap::arg_enum, StructOpt};
/// Start headers relayer process.
@@ -77,12 +78,13 @@ impl RelayHeaders {
let source_client = self.source.into_client::<Source>().await?;
let target_client = self.target.into_client::<Target>().await?;
let target_sign = self.target_sign.into_keypair::<Target>()?;
let metrics_params = Finality::customize_metrics(self.prometheus_params.into())?;
crate::finality_pipeline::run(
Finality::new(target_client.clone(), target_sign),
source_client,
target_client,
self.prometheus_params.into(),
metrics_params,
)
.await
})
@@ -21,7 +21,7 @@ use crate::finality_target::SubstrateFinalityTarget;
use bp_header_chain::justification::GrandpaJustification;
use finality_relay::{FinalitySyncParams, FinalitySyncPipeline};
use relay_substrate_client::{finality_source::FinalitySource, BlockNumberOf, Chain, Client, HashOf, SyncHeader};
use relay_utils::BlockNumberBase;
use relay_utils::{metrics::MetricsParams, BlockNumberBase};
use sp_core::Bytes;
use std::{fmt::Debug, marker::PhantomData, time::Duration};
@@ -41,6 +41,11 @@ pub trait SubstrateFinalitySyncPipeline: FinalitySyncPipeline {
/// Chain with GRANDPA bridge pallet.
type TargetChain: Chain;
/// Customize metrics exposed by headers sync loop.
fn customize_metrics(params: MetricsParams) -> anyhow::Result<MetricsParams> {
Ok(params)
}
/// Returns id of account that we're using to sign transactions at target chain.
fn transactions_author(&self) -> <Self::TargetChain as Chain>::AccountId;
@@ -107,7 +112,7 @@ pub async fn run<SourceChain, TargetChain, P>(
pipeline: P,
source_client: Client<SourceChain>,
target_client: Client<TargetChain>,
metrics_params: Option<relay_utils::metrics::MetricsParams>,
metrics_params: MetricsParams,
) -> anyhow::Result<()>
where
P: SubstrateFinalitySyncPipeline<
@@ -29,7 +29,10 @@ use frame_support::dispatch::GetDispatchInfo;
use messages_relay::message_lane::MessageLane;
use relay_millau_client::{HeaderId as MillauHeaderId, Millau, SigningParams as MillauSigningParams};
use relay_rialto_client::{HeaderId as RialtoHeaderId, Rialto, SigningParams as RialtoSigningParams};
use relay_substrate_client::{Chain, TransactionSignScheme};
use relay_substrate_client::{
metrics::{FloatStorageValueMetric, StorageProofOverheadMetric},
Chain, TransactionSignScheme,
};
use relay_utils::metrics::MetricsParams;
use sp_core::{Bytes, Pair};
use std::{ops::RangeInclusive, time::Duration};
@@ -135,7 +138,7 @@ pub async fn run(
rialto_client: RialtoClient,
rialto_sign: RialtoSigningParams,
lane_id: LaneId,
metrics_params: Option<MetricsParams>,
metrics_params: MetricsParams,
) -> Result<(), String> {
let stall_timeout = Duration::from_secs(5 * 60);
let relayer_id_at_millau = millau_sign.public().as_array_ref().clone().into();
@@ -185,9 +188,27 @@ pub async fn run(
max_messages_size_in_single_batch,
},
},
MillauSourceClient::new(millau_client, lane.clone(), lane_id, RIALTO_BRIDGE_INSTANCE),
MillauSourceClient::new(millau_client.clone(), lane.clone(), lane_id, RIALTO_BRIDGE_INSTANCE),
RialtoTargetClient::new(rialto_client, lane, lane_id, MILLAU_BRIDGE_INSTANCE),
metrics_params,
relay_utils::relay_metrics(
messages_relay::message_lane_loop::metrics_prefix::<MillauMessagesToRialto>(&lane_id),
metrics_params.address,
)
.standalone_metric(StorageProofOverheadMetric::new(
millau_client.clone(),
(bp_runtime::RIALTO_BRIDGE_INSTANCE, lane_id),
millau_runtime::rialto_messages::inbound_lane_data_key(&lane_id),
"millau_storage_proof_overhead".into(),
"Millau storage proof overhead".into(),
))?
.standalone_metric(FloatStorageValueMetric::<_, sp_runtime::FixedU128>::new(
millau_client,
sp_core::storage::StorageKey(millau_runtime::rialto_messages::RialtoToMillauConversionRate::key().to_vec()),
Some(millau_runtime::rialto_messages::INITIAL_RIALTO_TO_MILLAU_CONVERSION_RATE),
"millau_rialto_to_millau_conversion_rate".into(),
"Rialto to Millau tokens conversion rate (used by Rialto)".into(),
))?
.into_params(),
futures::future::pending(),
)
.await
@@ -29,7 +29,10 @@ use frame_support::dispatch::GetDispatchInfo;
use messages_relay::message_lane::MessageLane;
use relay_millau_client::{HeaderId as MillauHeaderId, Millau, SigningParams as MillauSigningParams};
use relay_rialto_client::{HeaderId as RialtoHeaderId, Rialto, SigningParams as RialtoSigningParams};
use relay_substrate_client::{Chain, TransactionSignScheme};
use relay_substrate_client::{
metrics::{FloatStorageValueMetric, StorageProofOverheadMetric},
Chain, TransactionSignScheme,
};
use relay_utils::metrics::MetricsParams;
use sp_core::{Bytes, Pair};
use std::{ops::RangeInclusive, time::Duration};
@@ -135,7 +138,7 @@ pub async fn run(
millau_client: MillauClient,
millau_sign: MillauSigningParams,
lane_id: LaneId,
metrics_params: Option<MetricsParams>,
metrics_params: MetricsParams,
) -> Result<(), String> {
let stall_timeout = Duration::from_secs(5 * 60);
let relayer_id_at_rialto = rialto_sign.public().as_array_ref().clone().into();
@@ -184,9 +187,27 @@ pub async fn run(
max_messages_size_in_single_batch,
},
},
RialtoSourceClient::new(rialto_client, lane.clone(), lane_id, MILLAU_BRIDGE_INSTANCE),
RialtoSourceClient::new(rialto_client.clone(), lane.clone(), lane_id, MILLAU_BRIDGE_INSTANCE),
MillauTargetClient::new(millau_client, lane, lane_id, RIALTO_BRIDGE_INSTANCE),
metrics_params,
relay_utils::relay_metrics(
messages_relay::message_lane_loop::metrics_prefix::<RialtoMessagesToMillau>(&lane_id),
metrics_params.address,
)
.standalone_metric(StorageProofOverheadMetric::new(
rialto_client.clone(),
(bp_runtime::MILLAU_BRIDGE_INSTANCE, lane_id),
rialto_runtime::millau_messages::inbound_lane_data_key(&lane_id),
"rialto_storage_proof_overhead".into(),
"Rialto storage proof overhead".into(),
))?
.standalone_metric(FloatStorageValueMetric::<_, sp_runtime::FixedU128>::new(
rialto_client,
sp_core::storage::StorageKey(rialto_runtime::millau_messages::MillauToRialtoConversionRate::key().to_vec()),
Some(rialto_runtime::millau_messages::INITIAL_MILLAU_TO_RIALTO_CONVERSION_RATE),
"rialto_millau_to_rialto_conversion_rate".into(),
"Millau to Rialto tokens conversion rate (used by Millau)".into(),
))?
.into_params(),
futures::future::pending(),
)
.await
@@ -22,6 +22,7 @@ use bp_header_chain::justification::GrandpaJustification;
use codec::Encode;
use relay_millau_client::{Millau, SigningParams as MillauSigningParams};
use relay_substrate_client::{Chain, TransactionSignScheme};
use relay_utils::metrics::{FloatJsonValueMetric, MetricsParams};
use relay_westend_client::{SyncHeader as WestendSyncHeader, Westend};
use sp_core::{Bytes, Pair};
@@ -33,6 +34,29 @@ impl SubstrateFinalitySyncPipeline for WestendFinalityToMillau {
type TargetChain = Millau;
fn customize_metrics(params: MetricsParams) -> anyhow::Result<MetricsParams> {
Ok(
relay_utils::relay_metrics(finality_relay::metrics_prefix::<Self>(), params.address)
// Polkadot/Kusama prices are added as metrics here, because atm we don't have Polkadot <-> Kusama
// relays, but we want to test metrics/dashboards in advance
.standalone_metric(FloatJsonValueMetric::new(
"https://api.coingecko.com/api/v3/simple/price?ids=Polkadot&vs_currencies=usd".into(),
"$.polkadot.usd".into(),
"polkadot_price".into(),
"Polkadot price in USD".into(),
))
.map_err(|e| anyhow::format_err!("{}", e))?
.standalone_metric(FloatJsonValueMetric::new(
"https://api.coingecko.com/api/v3/simple/price?ids=Kusama&vs_currencies=usd".into(),
"$.kusama.usd".into(),
"kusama_price".into(),
"Kusama price in USD".into(),
))
.map_err(|e| anyhow::format_err!("{}", e))?
.into_params(),
)
}
fn transactions_author(&self) -> bp_millau::AccountId {
self.target_sign.public().as_array_ref().clone().into()
}
+1 -1
View File
@@ -29,7 +29,7 @@ use sp_runtime::{
use std::{fmt::Debug, time::Duration};
/// Substrate-based chain from minimal relay-client point of view.
pub trait Chain: ChainBase {
pub trait Chain: ChainBase + Clone {
/// Chain name.
const NAME: &'static str;
/// Average block interval.
@@ -29,7 +29,7 @@ use jsonrpsee_types::{jsonrpc::DeserializeOwned, traits::SubscriptionClient};
use jsonrpsee_ws_client::{WsClient as RpcClient, WsConfig as RpcConfig, WsSubscription as Subscription};
use num_traits::Zero;
use pallet_balances::AccountData;
use sp_core::Bytes;
use sp_core::{storage::StorageKey, Bytes};
use sp_trie::StorageProof;
use sp_version::RuntimeVersion;
use std::ops::RangeInclusive;
@@ -177,6 +177,14 @@ impl<C: Chain> Client<C> {
Ok(Substrate::<C>::runtime_version(&self.client).await?)
}
/// Read value from runtime storage.
pub async fn storage_value<T: Decode>(&self, storage_key: StorageKey) -> Result<Option<T>> {
Substrate::<C>::get_storage(&self.client, storage_key)
.await?
.map(|encoded_value| T::decode(&mut &encoded_value.0[..]).map_err(Error::ResponseParseFailed))
.transpose()
}
/// Return native tokens balance of the account.
pub async fn free_native_balance(&self, account: C::AccountId) -> Result<C::NativeBalance>
where
@@ -38,6 +38,8 @@ pub enum Error {
AccountDoesNotExist,
/// The client we're connected to is not synced, so we can't rely on its state.
ClientNotSynced(Health),
/// An error has happened when we have tried to parse storage proof.
StorageProofError(bp_runtime::StorageProofError),
/// Custom logic error.
Custom(String),
}
@@ -50,6 +52,7 @@ impl std::error::Error for Error {
Self::UninitializedBridgePallet => None,
Self::AccountDoesNotExist => None,
Self::ClientNotSynced(_) => None,
Self::StorageProofError(_) => None,
Self::Custom(_) => None,
}
}
@@ -81,6 +84,7 @@ impl std::fmt::Display for Error {
Self::ResponseParseFailed(e) => e.to_string(),
Self::UninitializedBridgePallet => "The Substrate bridge pallet has not been initialized yet.".into(),
Self::AccountDoesNotExist => "Account does not exist on the chain".into(),
Self::StorageProofError(e) => format!("Error when parsing storage proof: {:?}", e),
Self::ClientNotSynced(health) => format!("Substrate client is not synced: {}", health),
Self::Custom(e) => e.clone(),
};
@@ -172,6 +172,7 @@ mod tests {
SinkExt,
};
#[derive(Debug, Clone)]
struct TestChain;
impl bp_runtime::Chain for TestChain {
@@ -27,6 +27,7 @@ mod sync_header;
pub mod finality_source;
pub mod guard;
pub mod headers_source;
pub mod metrics;
pub use crate::chain::{BlockWithJustification, Chain, ChainWithBalances, TransactionSignScheme};
pub use crate::client::{Client, JustificationsSubscription, OpaqueGrandpaAuthoritiesSet};
@@ -0,0 +1,91 @@
// Copyright 2019-2021 Parity Technologies (UK) Ltd.
// This file is part of Parity Bridges Common.
// Parity Bridges Common is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Bridges Common is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
use crate::chain::Chain;
use crate::client::Client;
use async_trait::async_trait;
use codec::Decode;
use relay_utils::metrics::{register, Gauge, Metrics, Registry, StandaloneMetrics, F64};
use sp_core::storage::StorageKey;
use sp_runtime::{traits::UniqueSaturatedInto, FixedPointNumber};
use std::time::Duration;
/// Storage value update interval (in blocks).
const UPDATE_INTERVAL_IN_BLOCKS: u32 = 5;
/// Metric that represents fixed-point runtime storage value as float gauge.
#[derive(Clone, Debug)]
pub struct FloatStorageValueMetric<C: Chain, T: Clone> {
client: Client<C>,
storage_key: StorageKey,
maybe_default_value: Option<T>,
metric: Gauge<F64>,
}
impl<C: Chain, T: Decode + FixedPointNumber> FloatStorageValueMetric<C, T> {
/// Create new metric.
pub fn new(
client: Client<C>,
storage_key: StorageKey,
maybe_default_value: Option<T>,
name: String,
help: String,
) -> Self {
FloatStorageValueMetric {
client,
storage_key,
maybe_default_value,
metric: Gauge::new(name, help).expect(
"only fails if gauge options are customized;\
we use default options;\
qed",
),
}
}
}
impl<C: Chain, T: Clone + Send + Sync + 'static> Metrics for FloatStorageValueMetric<C, T> {
fn register(&self, registry: &Registry) -> Result<(), String> {
register(self.metric.clone(), registry).map_err(|e| e.to_string())?;
Ok(())
}
}
#[async_trait]
impl<C: Chain, T> StandaloneMetrics for FloatStorageValueMetric<C, T>
where
T: 'static + Decode + Send + Sync + FixedPointNumber,
{
fn update_interval(&self) -> Duration {
C::AVERAGE_BLOCK_INTERVAL * UPDATE_INTERVAL_IN_BLOCKS
}
async fn update(&self) {
relay_utils::metrics::set_gauge_value(
&self.metric,
self.client
.storage_value::<T>(self.storage_key.clone())
.await
.map(|maybe_storage_value| {
maybe_storage_value.or(self.maybe_default_value).map(|storage_value| {
storage_value.into_inner().unique_saturated_into() as f64
/ T::DIV.unique_saturated_into() as f64
})
}),
);
}
}
@@ -0,0 +1,23 @@
// Copyright 2019-2020 Parity Technologies (UK) Ltd.
// This file is part of Parity Bridges Common.
// Parity Bridges Common is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Bridges Common is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
//! Contains several Substrate-specific metrics that may be exposed by relay.
pub use float_storage_value::FloatStorageValueMetric;
pub use storage_proof_overhead::StorageProofOverheadMetric;
mod float_storage_value;
mod storage_proof_overhead;
@@ -0,0 +1,135 @@
// Copyright 2019-2021 Parity Technologies (UK) Ltd.
// This file is part of Parity Bridges Common.
// Parity Bridges Common is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Bridges Common is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
use crate::chain::Chain;
use crate::client::Client;
use crate::error::Error;
use async_trait::async_trait;
use bp_messages::LaneId;
use bp_runtime::InstanceId;
use relay_utils::metrics::{register, Gauge, Metrics, Registry, StandaloneMetrics, U64};
use sp_core::storage::StorageKey;
use sp_runtime::traits::Header as HeaderT;
use sp_trie::StorageProof;
use std::time::Duration;
/// Storage proof overhead update interval (in blocks).
const UPDATE_INTERVAL_IN_BLOCKS: u32 = 100;
/// Metric that represents extra size of storage proof as unsigned integer gauge.
///
/// Regular Substrate node does not provide any RPC endpoints that return storage proofs.
/// So here we're using our own `pallet-bridge-messages-rpc` RPC API, which returns proof
/// of the inbound message lane state. Then we simply subtract size of this state from
/// the size of storage proof to compute metric value.
///
/// There are two things to keep in mind when using this metric:
///
/// 1) it'll only work on inbound lanes that have already accepted at least one message;
/// 2) the overhead may be slightly different for other values, but this metric gives a good estimation.
#[derive(Debug)]
pub struct StorageProofOverheadMetric<C: Chain> {
client: Client<C>,
inbound_lane: (InstanceId, LaneId),
inbound_lane_data_key: StorageKey,
metric: Gauge<U64>,
}
impl<C: Chain> Clone for StorageProofOverheadMetric<C> {
fn clone(&self) -> Self {
StorageProofOverheadMetric {
client: self.client.clone(),
inbound_lane: self.inbound_lane,
inbound_lane_data_key: self.inbound_lane_data_key.clone(),
metric: self.metric.clone(),
}
}
}
impl<C: Chain> StorageProofOverheadMetric<C> {
/// Create new metric instance with given name and help.
pub fn new(
client: Client<C>,
inbound_lane: (InstanceId, LaneId),
inbound_lane_data_key: StorageKey,
name: String,
help: String,
) -> Self {
StorageProofOverheadMetric {
client,
inbound_lane,
inbound_lane_data_key,
metric: Gauge::new(name, help).expect(
"only fails if gauge options are customized;\
we use default options;\
qed",
),
}
}
/// Returns approximate storage proof size overhead.
///
/// Returs `Ok(None)` if inbound lane we're watching for has no state. This shouldn't be treated as error.
async fn compute_storage_proof_overhead(&self) -> Result<Option<usize>, Error> {
let best_header_hash = self.client.best_finalized_header_hash().await?;
let best_header = self.client.header_by_hash(best_header_hash).await?;
let storage_proof = self
.client
.prove_messages_delivery(self.inbound_lane.0, self.inbound_lane.1, best_header_hash)
.await?;
let storage_proof_size: usize = storage_proof.iter().map(|n| n.len()).sum();
let storage_value_reader = bp_runtime::StorageProofChecker::<C::Hasher>::new(
*best_header.state_root(),
StorageProof::new(storage_proof),
)
.map_err(Error::StorageProofError)?;
let maybe_encoded_storage_value = storage_value_reader
.read_value(&self.inbound_lane_data_key.0)
.map_err(Error::StorageProofError)?;
let encoded_storage_value_size = match maybe_encoded_storage_value {
Some(encoded_storage_value) => encoded_storage_value.len(),
None => return Ok(None),
};
Ok(Some(storage_proof_size - encoded_storage_value_size))
}
}
impl<C: Chain> Metrics for StorageProofOverheadMetric<C> {
fn register(&self, registry: &Registry) -> Result<(), String> {
register(self.metric.clone(), registry).map_err(|e| e.to_string())?;
Ok(())
}
}
#[async_trait]
impl<C: Chain> StandaloneMetrics for StorageProofOverheadMetric<C> {
fn update_interval(&self) -> Duration {
C::AVERAGE_BLOCK_INTERVAL * UPDATE_INTERVAL_IN_BLOCKS
}
async fn update(&self) {
relay_utils::metrics::set_gauge_value(
&self.metric,
self.compute_storage_proof_overhead()
.await
.map(|v| v.map(|overhead| overhead as u64)),
);
}
}
+7 -4
View File
@@ -83,16 +83,19 @@ pub async fn run<P: TransactionProofPipeline>(
storage: impl TransactionProofsRelayStorage<BlockNumber = BlockNumberOf<P>>,
source_client: impl SourceClient<P>,
target_client: impl TargetClient<P>,
metrics_params: Option<MetricsParams>,
metrics_params: MetricsParams,
exit_signal: impl Future<Output = ()>,
) -> Result<(), String> {
let exit_signal = exit_signal.shared();
relay_utils::relay_loop(source_client, target_client)
.with_metrics(format!("{}_to_{}_Exchange", P::SOURCE_NAME, P::TARGET_NAME))
.with_metrics(
format!("{}_to_{}_Exchange", P::SOURCE_NAME, P::TARGET_NAME),
metrics_params,
)
.loop_metric(ExchangeLoopMetrics::default())?
.standalone_metric(GlobalMetrics::default())?
.expose(metrics_params)
.expose()
.await?
.run(|source_client, target_client, metrics| {
run_until_connection_lost(
@@ -303,7 +306,7 @@ mod tests {
storage,
source,
target,
None,
MetricsParams::disabled(),
exit_receiver.into_future().map(|(_, _)| ()),
));
}
+8 -3
View File
@@ -90,20 +90,25 @@ pub trait TargetClient<P: FinalitySyncPipeline>: RelayClient {
async fn submit_finality_proof(&self, header: P::Header, proof: P::FinalityProof) -> Result<(), Self::Error>;
}
/// Return prefix that will be used by default to expose Prometheus metrics of the finality proofs sync loop.
pub fn metrics_prefix<P: FinalitySyncPipeline>() -> String {
format!("{}_to_{}_Sync", P::SOURCE_NAME, P::TARGET_NAME)
}
/// Run finality proofs synchronization loop.
pub async fn run<P: FinalitySyncPipeline>(
source_client: impl SourceClient<P>,
target_client: impl TargetClient<P>,
sync_params: FinalitySyncParams,
metrics_params: Option<MetricsParams>,
metrics_params: MetricsParams,
exit_signal: impl Future<Output = ()>,
) -> Result<(), String> {
let exit_signal = exit_signal.shared();
relay_utils::relay_loop(source_client, target_client)
.with_metrics(format!("{}_to_{}_Sync", P::SOURCE_NAME, P::TARGET_NAME))
.with_metrics(metrics_prefix::<P>(), metrics_params)
.loop_metric(SyncLoopMetrics::default())?
.standalone_metric(GlobalMetrics::default())?
.expose(metrics_params)
.expose()
.await?
.run(|source_client, target_client, metrics| {
run_until_connection_lost(
@@ -27,7 +27,7 @@ use crate::{FinalityProof, FinalitySyncPipeline, SourceHeader};
use async_trait::async_trait;
use futures::{FutureExt, Stream, StreamExt};
use parking_lot::Mutex;
use relay_utils::{relay_loop::Client as RelayClient, MaybeConnectionError};
use relay_utils::{metrics::MetricsParams, relay_loop::Client as RelayClient, MaybeConnectionError};
use std::{collections::HashMap, pin::Pin, sync::Arc, time::Duration};
type IsMandatory = bool;
@@ -206,7 +206,7 @@ fn run_sync_loop(state_function: impl Fn(&mut ClientsData) -> bool + Send + Sync
source_client,
target_client,
sync_params,
None,
MetricsParams::disabled(),
exit_receiver.into_future().map(|(_, _)| ()),
));
+1 -1
View File
@@ -19,7 +19,7 @@
//! are still submitted to the target node, but are treated as auxiliary data as we are not trying
//! to submit all source headers to the target node.
pub use crate::finality_loop::{run, FinalitySyncParams, SourceClient, TargetClient};
pub use crate::finality_loop::{metrics_prefix, run, FinalitySyncParams, SourceClient, TargetClient};
use bp_header_chain::FinalityProof;
use std::fmt::Debug;
+3 -3
View File
@@ -119,15 +119,15 @@ pub async fn run<P: HeadersSyncPipeline, TC: TargetClient<P>>(
target_tick: Duration,
sync_maintain: impl SyncMaintain<P>,
sync_params: HeadersSyncParams,
metrics_params: Option<MetricsParams>,
metrics_params: MetricsParams,
exit_signal: impl Future<Output = ()>,
) -> Result<(), String> {
let exit_signal = exit_signal.shared();
relay_utils::relay_loop(source_client, target_client)
.with_metrics(format!("{}_to_{}_Sync", P::SOURCE_NAME, P::TARGET_NAME))
.with_metrics(format!("{}_to_{}_Sync", P::SOURCE_NAME, P::TARGET_NAME), metrics_params)
.loop_metric(SyncLoopMetrics::default())?
.standalone_metric(GlobalMetrics::default())?
.expose(metrics_params)
.expose()
.await?
.run(|source_client, target_client, metrics| {
run_until_connection_lost(
@@ -24,7 +24,8 @@ use backoff::backoff::Backoff;
use futures::{future::FutureExt, stream::StreamExt};
use parking_lot::Mutex;
use relay_utils::{
process_future_result, relay_loop::Client as RelayClient, retry_backoff, HeaderId, MaybeConnectionError,
metrics::MetricsParams, process_future_result, relay_loop::Client as RelayClient, retry_backoff, HeaderId,
MaybeConnectionError,
};
use std::{
collections::{HashMap, HashSet},
@@ -500,7 +501,7 @@ fn run_sync_loop_test(params: SyncLoopTestParams) {
test_tick(),
(),
crate::sync::tests::default_sync_params(),
None,
MetricsParams::disabled(),
exit_receiver.into_future().map(|(_, _)| ()),
));
}
@@ -211,26 +211,31 @@ pub struct ClientsState<P: MessageLane> {
pub target: Option<TargetClientState<P>>,
}
/// Return prefix that will be used by default to expose Prometheus metrics of the finality proofs sync loop.
pub fn metrics_prefix<P: MessageLane>(lane: &LaneId) -> String {
format!(
"{}_to_{}_MessageLane_{}",
P::SOURCE_NAME,
P::TARGET_NAME,
hex::encode(lane)
)
}
/// Run message lane service loop.
pub async fn run<P: MessageLane>(
params: Params,
source_client: impl SourceClient<P>,
target_client: impl TargetClient<P>,
metrics_params: Option<MetricsParams>,
metrics_params: MetricsParams,
exit_signal: impl Future<Output = ()>,
) -> Result<(), String> {
let exit_signal = exit_signal.shared();
relay_utils::relay_loop(source_client, target_client)
.reconnect_delay(params.reconnect_delay)
.with_metrics(format!(
"{}_to_{}_MessageLane_{}",
P::SOURCE_NAME,
P::TARGET_NAME,
hex::encode(params.lane)
))
.with_metrics(metrics_prefix::<P>(&params.lane), metrics_params)
.loop_metric(MessageLaneLoopMetrics::default())?
.standalone_metric(GlobalMetrics::default())?
.expose(metrics_params)
.expose()
.await?
.run(|source_client, target_client, metrics| {
run_until_connection_lost(
@@ -722,7 +727,7 @@ pub(crate) mod tests {
},
source_client,
target_client,
None,
MetricsParams::disabled(),
exit_signal,
)
.await;
+3
View File
@@ -10,10 +10,13 @@ ansi_term = "0.12"
async-std = "1.6.5"
async-trait = "0.1.40"
backoff = "0.2"
isahc = "1.2"
env_logger = "0.8.2"
futures = "0.3.5"
jsonpath_lib = "0.2"
log = "0.4.11"
num-traits = "0.2"
serde_json = "1.0"
sysinfo = "0.15"
time = "0.2"
+1 -1
View File
@@ -16,7 +16,7 @@
//! Utilities used by different relays.
pub use relay_loop::relay_loop;
pub use relay_loop::{relay_loop, relay_metrics};
use backoff::{backoff::Backoff, ExponentialBackoff};
use futures::future::FutureExt;
+73 -6
View File
@@ -14,23 +14,37 @@
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
pub use float_json_value::FloatJsonValueMetric;
pub use global::GlobalMetrics;
pub use substrate_prometheus_endpoint::{register, Counter, CounterVec, Gauge, GaugeVec, Opts, Registry, F64, U64};
pub use substrate_prometheus_endpoint::{
prometheus::core::{Atomic, Collector},
register, Counter, CounterVec, Gauge, GaugeVec, Opts, Registry, F64, U64,
};
use async_trait::async_trait;
use std::time::Duration;
use std::{fmt::Debug, time::Duration};
mod float_json_value;
mod global;
/// Prometheus endpoint MetricsParams.
/// Unparsed address that needs to be used to expose Prometheus metrics.
#[derive(Debug, Clone)]
pub struct MetricsParams {
pub struct MetricsAddress {
/// Serve HTTP requests at given host.
pub host: String,
/// Serve HTTP requests at given port.
pub port: u16,
}
/// Prometheus endpoint MetricsParams.
#[derive(Debug, Clone)]
pub struct MetricsParams {
/// Interface and TCP port to be used when exposing Prometheus metrics.
pub address: Option<MetricsAddress>,
/// Metrics registry. May be `Some(_)` if several components share the same endpoint.
pub registry: Option<Registry>,
}
/// Metrics API.
pub trait Metrics: Clone + Send + Sync + 'static {
/// Register metrics in the registry.
@@ -61,11 +75,64 @@ pub trait StandaloneMetrics: Metrics {
}
}
impl Default for MetricsParams {
impl Default for MetricsAddress {
fn default() -> Self {
MetricsParams {
MetricsAddress {
host: "127.0.0.1".into(),
port: 9616,
}
}
}
impl MetricsParams {
/// Creates metrics params so that metrics are not exposed.
pub fn disabled() -> Self {
MetricsParams {
address: None,
registry: None,
}
}
}
impl From<Option<MetricsAddress>> for MetricsParams {
fn from(address: Option<MetricsAddress>) -> Self {
MetricsParams {
address,
registry: None,
}
}
}
/// Set value of gauge metric.
///
/// If value is `Ok(None)` or `Err(_)`, metric would have default value.
pub fn set_gauge_value<T: Default + Debug, V: Atomic<T = T>, E: Debug>(gauge: &Gauge<V>, value: Result<Option<T>, E>) {
gauge.set(match value {
Ok(Some(value)) => {
log::trace!(
target: "bridge-metrics",
"Updated value of metric '{:?}': {:?}",
gauge.desc().first().map(|d| &d.fq_name),
value,
);
value
}
Ok(None) => {
log::warn!(
target: "bridge-metrics",
"Failed to update metric '{:?}': value is empty",
gauge.desc().first().map(|d| &d.fq_name),
);
Default::default()
}
Err(error) => {
log::warn!(
target: "bridge-metrics",
"Failed to update metric '{:?}': {:?}",
gauge.desc().first().map(|d| &d.fq_name),
error,
);
Default::default()
}
})
}
@@ -0,0 +1,125 @@
// Copyright 2019-2020 Parity Technologies (UK) Ltd.
// This file is part of Parity Bridges Common.
// Parity Bridges Common is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Bridges Common is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
use crate::metrics::{register, Gauge, Metrics, Registry, StandaloneMetrics, F64};
use async_trait::async_trait;
use std::time::Duration;
/// Value update interval.
const UPDATE_INTERVAL: Duration = Duration::from_secs(60);
/// Metric that represents float value received from HTTP service as float gauge.
#[derive(Debug, Clone)]
pub struct FloatJsonValueMetric {
url: String,
json_path: String,
metric: Gauge<F64>,
}
impl FloatJsonValueMetric {
/// Create new metric instance with given name and help.
pub fn new(url: String, json_path: String, name: String, help: String) -> Self {
FloatJsonValueMetric {
url,
json_path,
metric: Gauge::new(name, help).expect(
"only fails if gauge options are customized;\
we use default options;\
qed",
),
}
}
/// Read value from HTTP service.
async fn read_value(&self) -> Result<f64, String> {
use isahc::{AsyncReadResponseExt, HttpClient, Request};
fn map_isahc_err(err: impl std::fmt::Display) -> String {
format!("Failed to fetch token price from remote server: {}", err)
}
let request = Request::get(&self.url)
.header("Accept", "application/json")
.body(())
.map_err(map_isahc_err)?;
let raw_response = HttpClient::new()
.map_err(map_isahc_err)?
.send_async(request)
.await
.map_err(map_isahc_err)?
.text()
.await
.map_err(map_isahc_err)?;
parse_service_response(&self.json_path, &raw_response)
}
}
impl Metrics for FloatJsonValueMetric {
fn register(&self, registry: &Registry) -> Result<(), String> {
register(self.metric.clone(), registry).map_err(|e| e.to_string())?;
Ok(())
}
}
#[async_trait]
impl StandaloneMetrics for FloatJsonValueMetric {
fn update_interval(&self) -> Duration {
UPDATE_INTERVAL
}
async fn update(&self) {
crate::metrics::set_gauge_value(&self.metric, self.read_value().await.map(Some));
}
}
/// Parse HTTP service response.
fn parse_service_response(json_path: &str, response: &str) -> Result<f64, String> {
let json = serde_json::from_str(response).map_err(|err| {
format!(
"Failed to parse HTTP service response: {:?}. Response: {:?}",
err, response,
)
})?;
let mut selector = jsonpath_lib::selector(&json);
let maybe_selected_value = selector(json_path).map_err(|err| {
format!(
"Failed to select value from response: {:?}. Response: {:?}",
err, response,
)
})?;
let selected_value = maybe_selected_value
.first()
.and_then(|v| v.as_f64())
.ok_or_else(|| format!("Missing required value from response: {:?}", response,))?;
Ok(selected_value)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn parse_service_response_works() {
assert_eq!(
parse_service_response("$.kusama.usd", r#"{"kusama":{"usd":433.05}}"#).map_err(drop),
Ok(433.05),
);
}
}
+2 -3
View File
@@ -16,12 +16,11 @@
//! Global system-wide Prometheus metrics exposed by relays.
use crate::metrics::{Metrics, StandaloneMetrics};
use crate::metrics::{register, Gauge, GaugeVec, Metrics, Opts, Registry, StandaloneMetrics, F64, U64};
use async_std::sync::{Arc, Mutex};
use async_trait::async_trait;
use std::time::Duration;
use substrate_prometheus_endpoint::{register, Gauge, GaugeVec, Opts, Registry, F64, U64};
use sysinfo::{ProcessExt, RefreshKind, System, SystemExt};
/// Global metrics update interval.
@@ -79,7 +78,7 @@ impl StandaloneMetrics for GlobalMetrics {
}
_ => {
log::warn!(
target: "bridge",
target: "bridge-metrics",
"Failed to refresh process information. Metrics may show obsolete values",
);
}
+43 -11
View File
@@ -14,7 +14,7 @@
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
use crate::metrics::{Metrics, MetricsParams, StandaloneMetrics};
use crate::metrics::{Metrics, MetricsAddress, MetricsParams, StandaloneMetrics};
use crate::{FailedClient, MaybeConnectionError};
use async_trait::async_trait;
@@ -44,6 +44,24 @@ pub fn relay_loop<SC, TC>(source_client: SC, target_client: TC) -> Loop<SC, TC,
}
}
/// Returns generic relay loop metrics that may be customized and used in one or several relay loops.
pub fn relay_metrics(prefix: String, address: Option<MetricsAddress>) -> LoopMetrics<(), (), ()> {
assert!(!prefix.is_empty(), "Metrics prefix can not be empty");
LoopMetrics {
relay_loop: Loop {
reconnect_delay: RECONNECT_DELAY,
source_client: (),
target_client: (),
loop_metric: None,
},
address,
registry: Registry::new_custom(Some(prefix), None)
.expect("only fails if prefix is empty; prefix is not empty; qed"),
loop_metric: None,
}
}
/// Generic relay loop.
pub struct Loop<SC, TC, LM> {
reconnect_delay: Duration,
@@ -55,6 +73,7 @@ pub struct Loop<SC, TC, LM> {
/// Relay loop metrics builder.
pub struct LoopMetrics<SC, TC, LM> {
relay_loop: Loop<SC, TC, ()>,
address: Option<MetricsAddress>,
registry: Registry,
loop_metric: Option<LM>,
}
@@ -69,7 +88,7 @@ impl<SC, TC, LM> Loop<SC, TC, LM> {
/// Start building loop metrics using given prefix.
///
/// Panics if `prefix` is empty.
pub fn with_metrics(self, prefix: String) -> LoopMetrics<SC, TC, ()> {
pub fn with_metrics(self, prefix: String, params: MetricsParams) -> LoopMetrics<SC, TC, ()> {
assert!(!prefix.is_empty(), "Metrics prefix can not be empty");
LoopMetrics {
@@ -79,8 +98,12 @@ impl<SC, TC, LM> Loop<SC, TC, LM> {
target_client: self.target_client,
loop_metric: None,
},
registry: Registry::new_custom(Some(prefix), None)
.expect("only fails if prefix is empty; prefix is not empty; qed"),
address: params.address,
registry: match params.registry {
Some(registry) => registry,
None => Registry::new_custom(Some(prefix), None)
.expect("only fails if prefix is empty; prefix is not empty; qed"),
},
loop_metric: None,
}
}
@@ -159,6 +182,7 @@ impl<SC, TC, LM> LoopMetrics<SC, TC, LM> {
Ok(LoopMetrics {
relay_loop: self.relay_loop,
address: self.address,
registry: self.registry,
loop_metric: Some(loop_metric),
})
@@ -171,19 +195,27 @@ impl<SC, TC, LM> LoopMetrics<SC, TC, LM> {
Ok(self)
}
/// Expose metrics using given params.
/// Convert into `MetricsParams` structure so that metrics registry may be extended later.
pub fn into_params(self) -> MetricsParams {
MetricsParams {
address: self.address,
registry: Some(self.registry),
}
}
/// Expose metrics using address passed at creation.
///
/// If `params` is `None`, metrics are not exposed.
pub async fn expose(self, params: Option<MetricsParams>) -> Result<Loop<SC, TC, LM>, String> {
if let Some(params) = params {
/// If passed `address` is `None`, metrics are not exposed.
pub async fn expose(self) -> Result<Loop<SC, TC, LM>, String> {
if let Some(address) = self.address {
let socket_addr = SocketAddr::new(
params.host.parse().map_err(|err| {
address.host.parse().map_err(|err| {
format!(
"Invalid host {} is used to expose Prometheus metrics: {}",
params.host, err,
address.host, err,
)
})?,
params.port,
address.port,
);
let registry = self.registry;