Shared reference to conversion rate metric value (#1034)

* shared conversion rate metric value

* clippy
This commit is contained in:
Svyatoslav Nikolsky
2021-06-29 14:24:54 +03:00
committed by Bastian Köcher
parent db0216dabb
commit ecd20d9d24
23 changed files with 260 additions and 161 deletions
+1
View File
@@ -7,6 +7,7 @@ license = "GPL-3.0-or-later WITH Classpath-exception-2.0"
[dependencies]
ansi_term = "0.12"
anyhow = "1.0"
async-std = "1.9.0"
async-trait = "0.1.42"
clap = { version = "2.33.3", features = ["yaml"] }
@@ -355,7 +355,7 @@ async fn run_single_transaction_relay(params: EthereumExchangeParams, eth_tx_has
async fn run_auto_transactions_relay_loop(
params: EthereumExchangeParams,
eth_start_with_block_number: Option<u64>,
) -> Result<(), String> {
) -> anyhow::Result<()> {
let EthereumExchangeParams {
eth_params,
sub_params,
@@ -375,7 +375,7 @@ async fn run_auto_transactions_relay_loop(
.best_ethereum_finalized_block()
.await
.map_err(|err| {
format!(
anyhow::format_err!(
"Error retrieving best finalized Ethereum block from Substrate node: {:?}",
err
)
@@ -292,7 +292,7 @@ pub async fn run(params: EthereumSyncParams) -> Result<(), RpcError> {
futures::future::pending(),
)
.await
.map_err(RpcError::SyncLoop)?;
.map_err(|e| RpcError::SyncLoop(e.to_string()))?;
Ok(())
}
@@ -194,7 +194,7 @@ pub async fn run(params: SubstrateSyncParams) -> Result<(), RpcError> {
futures::future::pending(),
)
.await
.map_err(RpcError::SyncLoop)?;
.map_err(|e| RpcError::SyncLoop(e.to_string()))?;
Ok(())
}
@@ -17,7 +17,8 @@
//! Millau-to-Rialto messages sync entrypoint.
use crate::messages_lane::{
select_delivery_transaction_limits, MessagesRelayParams, SubstrateMessageLane, SubstrateMessageLaneToSubstrate,
select_delivery_transaction_limits, MessagesRelayParams, StandaloneMessagesMetrics, SubstrateMessageLane,
SubstrateMessageLaneToSubstrate,
};
use crate::messages_source::SubstrateMessagesSource;
use crate::messages_target::SubstrateMessagesTarget;
@@ -30,10 +31,8 @@ use frame_support::dispatch::GetDispatchInfo;
use messages_relay::message_lane::MessageLane;
use relay_millau_client::{HeaderId as MillauHeaderId, Millau, SigningParams as MillauSigningParams};
use relay_rialto_client::{HeaderId as RialtoHeaderId, Rialto, SigningParams as RialtoSigningParams};
use relay_substrate_client::{
metrics::{FloatStorageValueMetric, StorageProofOverheadMetric},
Chain, TransactionSignScheme,
};
use relay_substrate_client::{Chain, Client, TransactionSignScheme};
use relay_utils::metrics::MetricsParams;
use sp_core::{Bytes, Pair};
use std::{ops::RangeInclusive, time::Duration};
@@ -136,7 +135,7 @@ type RialtoTargetClient =
/// Run Millau-to-Rialto messages sync.
pub async fn run(
params: MessagesRelayParams<Millau, MillauSigningParams, Rialto, RialtoSigningParams>,
) -> Result<(), String> {
) -> anyhow::Result<()> {
let stall_timeout = Duration::from_secs(5 * 60);
let relayer_id_at_millau = (*params.source_sign.public().as_array_ref()).into();
@@ -172,6 +171,7 @@ pub async fn run(
max_messages_weight_in_single_batch,
);
let (metrics_params, _) = add_standalone_metrics(params.metrics_params, source_client.clone())?;
messages_relay::message_lane_loop::run(
messages_relay::message_lane_loop::Params {
lane: lane_id,
@@ -202,36 +202,25 @@ pub async fn run(
MILLAU_CHAIN_ID,
params.source_to_target_headers_relay,
),
relay_utils::relay_metrics(
Some(messages_relay::message_lane_loop::metrics_prefix::<
MillauMessagesToRialto,
>(&lane_id)),
params.metrics_params,
)
.standalone_metric(|registry, prefix| {
StorageProofOverheadMetric::new(
registry,
prefix,
source_client.clone(),
"millau_storage_proof_overhead".into(),
"Millau storage proof overhead".into(),
)
})?
.standalone_metric(|registry, prefix| {
FloatStorageValueMetric::<_, sp_runtime::FixedU128>::new(
registry,
prefix,
source_client,
sp_core::storage::StorageKey(
millau_runtime::rialto_messages::RialtoToMillauConversionRate::key().to_vec(),
),
Some(millau_runtime::rialto_messages::INITIAL_RIALTO_TO_MILLAU_CONVERSION_RATE),
"millau_rialto_to_millau_conversion_rate".into(),
"Rialto to Millau tokens conversion rate (used by Rialto)".into(),
)
})?
.into_params(),
metrics_params,
futures::future::pending(),
)
.await
}
/// Add standalone metrics for the Millau -> Rialto messages loop.
pub(crate) fn add_standalone_metrics(
metrics_params: MetricsParams,
source_client: Client<Millau>,
) -> anyhow::Result<(MetricsParams, StandaloneMessagesMetrics)> {
crate::messages_lane::add_standalone_metrics::<MillauMessagesToRialto>(
metrics_params,
source_client,
None,
None,
Some((
sp_core::storage::StorageKey(millau_runtime::rialto_messages::RialtoToMillauConversionRate::key().to_vec()),
millau_runtime::rialto_messages::INITIAL_RIALTO_TO_MILLAU_CONVERSION_RATE,
)),
)
}
+28 -28
View File
@@ -32,38 +32,38 @@ mod rococo;
mod westend;
mod wococo;
use relay_utils::metrics::{FloatJsonValueMetric, MetricsParams};
use relay_utils::metrics::{FloatJsonValueMetric, MetricsParams, PrometheusError, Registry};
pub(crate) fn add_polkadot_kusama_price_metrics<T: finality_relay::FinalitySyncPipeline>(
params: MetricsParams,
) -> anyhow::Result<MetricsParams> {
Ok(
relay_utils::relay_metrics(Some(finality_relay::metrics_prefix::<T>()), params)
// Polkadot/Kusama prices are added as metrics here, because atm we don't have Polkadot <-> Kusama
// relays, but we want to test metrics/dashboards in advance
.standalone_metric(|registry, prefix| {
FloatJsonValueMetric::new(
registry,
prefix,
"https://api.coingecko.com/api/v3/simple/price?ids=Polkadot&vs_currencies=btc".into(),
"$.polkadot.btc".into(),
"polkadot_to_base_conversion_rate".into(),
"Rate used to convert from DOT to some BASE tokens".into(),
)
})
.map_err(|e| anyhow::format_err!("{}", e))?
.standalone_metric(|registry, prefix| {
FloatJsonValueMetric::new(
registry,
prefix,
"https://api.coingecko.com/api/v3/simple/price?ids=Kusama&vs_currencies=btc".into(),
"$.kusama.btc".into(),
"kusama_to_base_conversion_rate".into(),
"Rate used to convert from KSM to some BASE tokens".into(),
)
})
.map_err(|e| anyhow::format_err!("{}", e))?
.into_params(),
// Polkadot/Kusama prices are added as metrics here, because atm we don't have Polkadot <-> Kusama
// relays, but we want to test metrics/dashboards in advance
Ok(relay_utils::relay_metrics(None, params)
.standalone_metric(|registry, prefix| token_price_metric(registry, prefix, "polkadot"))?
.standalone_metric(|registry, prefix| token_price_metric(registry, prefix, "kusama"))?
.into_params())
}
/// Creates standalone token price metric.
pub(crate) fn token_price_metric(
registry: &Registry,
prefix: Option<&str>,
token_id: &str,
) -> Result<FloatJsonValueMetric, PrometheusError> {
FloatJsonValueMetric::new(
registry,
prefix,
format!(
"https://api.coingecko.com/api/v3/simple/price?ids={}&vs_currencies=btc",
token_id
),
format!("$.{}.btc", token_id),
format!("{}_to_base_conversion_rate", token_id.replace("-", "_")),
format!(
"Rate used to convert from {} to some BASE tokens",
token_id.to_uppercase()
),
)
}
@@ -17,7 +17,8 @@
//! Rialto-to-Millau messages sync entrypoint.
use crate::messages_lane::{
select_delivery_transaction_limits, MessagesRelayParams, SubstrateMessageLane, SubstrateMessageLaneToSubstrate,
select_delivery_transaction_limits, MessagesRelayParams, StandaloneMessagesMetrics, SubstrateMessageLane,
SubstrateMessageLaneToSubstrate,
};
use crate::messages_source::SubstrateMessagesSource;
use crate::messages_target::SubstrateMessagesTarget;
@@ -30,10 +31,8 @@ use frame_support::dispatch::GetDispatchInfo;
use messages_relay::message_lane::MessageLane;
use relay_millau_client::{HeaderId as MillauHeaderId, Millau, SigningParams as MillauSigningParams};
use relay_rialto_client::{HeaderId as RialtoHeaderId, Rialto, SigningParams as RialtoSigningParams};
use relay_substrate_client::{
metrics::{FloatStorageValueMetric, StorageProofOverheadMetric},
Chain, TransactionSignScheme,
};
use relay_substrate_client::{Chain, Client, TransactionSignScheme};
use relay_utils::metrics::MetricsParams;
use sp_core::{Bytes, Pair};
use std::{ops::RangeInclusive, time::Duration};
@@ -136,7 +135,7 @@ type MillauTargetClient =
/// Run Rialto-to-Millau messages sync.
pub async fn run(
params: MessagesRelayParams<Rialto, RialtoSigningParams, Millau, MillauSigningParams>,
) -> Result<(), String> {
) -> anyhow::Result<()> {
let stall_timeout = Duration::from_secs(5 * 60);
let relayer_id_at_rialto = (*params.source_sign.public().as_array_ref()).into();
@@ -171,6 +170,7 @@ pub async fn run(
max_messages_weight_in_single_batch,
);
let (metrics_params, _) = add_standalone_metrics(params.metrics_params, source_client.clone())?;
messages_relay::message_lane_loop::run(
messages_relay::message_lane_loop::Params {
lane: lane_id,
@@ -201,36 +201,25 @@ pub async fn run(
RIALTO_CHAIN_ID,
params.source_to_target_headers_relay,
),
relay_utils::relay_metrics(
Some(messages_relay::message_lane_loop::metrics_prefix::<
RialtoMessagesToMillau,
>(&lane_id)),
params.metrics_params,
)
.standalone_metric(|registry, prefix| {
StorageProofOverheadMetric::new(
registry,
prefix,
source_client.clone(),
"rialto_storage_proof_overhead".into(),
"Rialto storage proof overhead".into(),
)
})?
.standalone_metric(|registry, prefix| {
FloatStorageValueMetric::<_, sp_runtime::FixedU128>::new(
registry,
prefix,
source_client,
sp_core::storage::StorageKey(
rialto_runtime::millau_messages::MillauToRialtoConversionRate::key().to_vec(),
),
Some(rialto_runtime::millau_messages::INITIAL_MILLAU_TO_RIALTO_CONVERSION_RATE),
"rialto_millau_to_rialto_conversion_rate".into(),
"Millau to Rialto tokens conversion rate (used by Millau)".into(),
)
})?
.into_params(),
metrics_params,
futures::future::pending(),
)
.await
}
/// Add standalone metrics for the Rialto -> Millau messages loop.
pub(crate) fn add_standalone_metrics(
metrics_params: MetricsParams,
source_client: Client<Rialto>,
) -> anyhow::Result<(MetricsParams, StandaloneMessagesMetrics)> {
crate::messages_lane::add_standalone_metrics::<RialtoMessagesToMillau>(
metrics_params,
source_client,
None,
None,
Some((
sp_core::storage::StorageKey(rialto_runtime::millau_messages::MillauToRialtoConversionRate::key().to_vec()),
rialto_runtime::millau_messages::INITIAL_MILLAU_TO_RIALTO_CONVERSION_RATE,
)),
)
}
@@ -17,7 +17,8 @@
//! Rococo-to-Wococo messages sync entrypoint.
use crate::messages_lane::{
select_delivery_transaction_limits, MessagesRelayParams, SubstrateMessageLane, SubstrateMessageLaneToSubstrate,
select_delivery_transaction_limits, MessagesRelayParams, StandaloneMessagesMetrics, SubstrateMessageLane,
SubstrateMessageLaneToSubstrate,
};
use crate::messages_source::SubstrateMessagesSource;
use crate::messages_target::SubstrateMessagesTarget;
@@ -28,7 +29,8 @@ use bridge_runtime_common::messages::target::FromBridgedChainMessagesProof;
use codec::Encode;
use messages_relay::message_lane::MessageLane;
use relay_rococo_client::{HeaderId as RococoHeaderId, Rococo, SigningParams as RococoSigningParams};
use relay_substrate_client::{metrics::StorageProofOverheadMetric, Chain, TransactionSignScheme};
use relay_substrate_client::{Chain, Client, TransactionSignScheme};
use relay_utils::metrics::MetricsParams;
use relay_wococo_client::{HeaderId as WococoHeaderId, SigningParams as WococoSigningParams, Wococo};
use sp_core::{Bytes, Pair};
use std::{ops::RangeInclusive, time::Duration};
@@ -142,7 +144,7 @@ type WococoTargetClient = SubstrateMessagesTarget<
/// Run Rococo-to-Wococo messages sync.
pub async fn run(
params: MessagesRelayParams<Rococo, RococoSigningParams, Wococo, WococoSigningParams>,
) -> Result<(), String> {
) -> anyhow::Result<()> {
let stall_timeout = Duration::from_secs(5 * 60);
let relayer_id_at_rococo = (*params.source_sign.public().as_array_ref()).into();
@@ -183,6 +185,7 @@ pub async fn run(
max_messages_weight_in_single_batch,
);
let (metrics_params, _) = add_standalone_metrics(params.metrics_params, source_client.clone())?;
messages_relay::message_lane_loop::run(
messages_relay::message_lane_loop::Params {
lane: lane_id,
@@ -213,23 +216,22 @@ pub async fn run(
ROCOCO_CHAIN_ID,
params.source_to_target_headers_relay,
),
relay_utils::relay_metrics(
Some(messages_relay::message_lane_loop::metrics_prefix::<
RococoMessagesToWococo,
>(&lane_id)),
params.metrics_params,
)
.standalone_metric(|registry, prefix| {
StorageProofOverheadMetric::new(
registry,
prefix,
source_client.clone(),
"rococo_storage_proof_overhead".into(),
"Rococo storage proof overhead".into(),
)
})?
.into_params(),
metrics_params,
futures::future::pending(),
)
.await
}
/// Add standalone metrics for the Rococo -> Wococo messages loop.
pub(crate) fn add_standalone_metrics(
metrics_params: MetricsParams,
source_client: Client<Rococo>,
) -> anyhow::Result<(MetricsParams, StandaloneMessagesMetrics)> {
crate::messages_lane::add_standalone_metrics::<RococoMessagesToWococo>(
metrics_params,
source_client,
None,
None,
None,
)
}
@@ -17,7 +17,8 @@
//! Wococo-to-Rococo messages sync entrypoint.
use crate::messages_lane::{
select_delivery_transaction_limits, MessagesRelayParams, SubstrateMessageLane, SubstrateMessageLaneToSubstrate,
select_delivery_transaction_limits, MessagesRelayParams, StandaloneMessagesMetrics, SubstrateMessageLane,
SubstrateMessageLaneToSubstrate,
};
use crate::messages_source::SubstrateMessagesSource;
use crate::messages_target::SubstrateMessagesTarget;
@@ -28,7 +29,8 @@ use bridge_runtime_common::messages::target::FromBridgedChainMessagesProof;
use codec::Encode;
use messages_relay::message_lane::MessageLane;
use relay_rococo_client::{HeaderId as RococoHeaderId, Rococo, SigningParams as RococoSigningParams};
use relay_substrate_client::{metrics::StorageProofOverheadMetric, Chain, TransactionSignScheme};
use relay_substrate_client::{Chain, Client, TransactionSignScheme};
use relay_utils::metrics::MetricsParams;
use relay_wococo_client::{HeaderId as WococoHeaderId, SigningParams as WococoSigningParams, Wococo};
use sp_core::{Bytes, Pair};
use std::{ops::RangeInclusive, time::Duration};
@@ -142,7 +144,7 @@ type RococoTargetClient = SubstrateMessagesTarget<
/// Run Wococo-to-Rococo messages sync.
pub async fn run(
params: MessagesRelayParams<Wococo, WococoSigningParams, Rococo, RococoSigningParams>,
) -> Result<(), String> {
) -> anyhow::Result<()> {
let stall_timeout = Duration::from_secs(5 * 60);
let relayer_id_at_wococo = (*params.source_sign.public().as_array_ref()).into();
@@ -183,6 +185,7 @@ pub async fn run(
max_messages_weight_in_single_batch,
);
let (metrics_params, _) = add_standalone_metrics(params.metrics_params, source_client.clone())?;
messages_relay::message_lane_loop::run(
messages_relay::message_lane_loop::Params {
lane: lane_id,
@@ -213,23 +216,22 @@ pub async fn run(
WOCOCO_CHAIN_ID,
params.source_to_target_headers_relay,
),
relay_utils::relay_metrics(
Some(messages_relay::message_lane_loop::metrics_prefix::<
WococoMessagesToRococo,
>(&lane_id)),
params.metrics_params,
)
.standalone_metric(|registry, prefix| {
StorageProofOverheadMetric::new(
registry,
prefix,
source_client.clone(),
"wococo_storage_proof_overhead".into(),
"Wococo storage proof overhead".into(),
)
})?
.into_params(),
metrics_params,
futures::future::pending(),
)
.await
}
/// Add standalone metrics for the Wococo -> Rococo messages loop.
pub(crate) fn add_standalone_metrics(
metrics_params: MetricsParams,
source_client: Client<Wococo>,
) -> anyhow::Result<(MetricsParams, StandaloneMessagesMetrics)> {
crate::messages_lane::add_standalone_metrics::<WococoMessagesToRococo>(
metrics_params,
source_client,
None,
None,
None,
)
}
@@ -100,8 +100,12 @@ macro_rules! select_bridge {
const MAX_MISSING_LEFT_HEADERS_AT_RIGHT: bp_millau::BlockNumber = bp_millau::SESSION_LENGTH;
const MAX_MISSING_RIGHT_HEADERS_AT_LEFT: bp_rialto::BlockNumber = bp_rialto::SESSION_LENGTH;
use crate::chains::millau_messages_to_rialto::run as left_to_right_messages;
use crate::chains::rialto_messages_to_millau::run as right_to_left_messages;
use crate::chains::millau_messages_to_rialto::{
add_standalone_metrics as add_left_to_right_standalone_metrics, run as left_to_right_messages,
};
use crate::chains::rialto_messages_to_millau::{
add_standalone_metrics as add_right_to_left_standalone_metrics, run as right_to_left_messages,
};
$generic
}
@@ -120,8 +124,12 @@ macro_rules! select_bridge {
const MAX_MISSING_LEFT_HEADERS_AT_RIGHT: bp_rococo::BlockNumber = bp_rococo::SESSION_LENGTH;
const MAX_MISSING_RIGHT_HEADERS_AT_LEFT: bp_wococo::BlockNumber = bp_wococo::SESSION_LENGTH;
use crate::chains::rococo_messages_to_wococo::run as left_to_right_messages;
use crate::chains::wococo_messages_to_rococo::run as right_to_left_messages;
use crate::chains::rococo_messages_to_wococo::{
add_standalone_metrics as add_left_to_right_standalone_metrics, run as left_to_right_messages,
};
use crate::chains::wococo_messages_to_rococo::{
add_standalone_metrics as add_right_to_left_standalone_metrics, run as right_to_left_messages,
};
$generic
}
@@ -153,6 +161,8 @@ impl RelayHeadersAndMessages {
let metrics_params: MetricsParams = params.shared.prometheus_params.into();
let metrics_params = relay_utils::relay_metrics(None, metrics_params).into_params();
let (metrics_params, _) = add_left_to_right_standalone_metrics(metrics_params, left_client.clone())?;
let (metrics_params, _) = add_right_to_left_standalone_metrics(metrics_params, right_client.clone())?;
let left_to_right_on_demand_headers = OnDemandHeadersRelay::new(
left_client.clone(),
@@ -21,9 +21,16 @@ use crate::on_demand_headers::OnDemandHeadersRelay;
use bp_messages::{LaneId, MessageNonce};
use frame_support::weights::Weight;
use messages_relay::message_lane::{MessageLane, SourceHeaderIdOf, TargetHeaderIdOf};
use relay_substrate_client::{BlockNumberOf, Chain, Client, HashOf};
use relay_utils::{metrics::MetricsParams, BlockNumberBase};
use sp_core::Bytes;
use relay_substrate_client::{
metrics::{FloatStorageValueMetric, StorageProofOverheadMetric},
BlockNumberOf, Chain, Client, HashOf,
};
use relay_utils::{
metrics::{F64SharedRef, MetricsParams},
BlockNumberBase,
};
use sp_core::{storage::StorageKey, Bytes};
use sp_runtime::FixedU128;
use std::ops::RangeInclusive;
/// Substrate <-> Substrate messages relay parameters.
@@ -185,6 +192,84 @@ pub fn select_delivery_transaction_limits<W: pallet_bridge_messages::WeightInfoE
(max_number_of_messages, weight_for_messages_dispatch)
}
/// Shared references to the values of standalone metrics of the message lane relay loop.
#[derive(Debug, Clone)]
pub struct StandaloneMessagesMetrics {
/// Shared reference to the actual target -> <base> chain token conversion rate.
pub target_to_base_conversion_rate: Option<F64SharedRef>,
/// Shared reference to the actual source -> <base> chain token conversion rate.
pub source_to_base_conversion_rate: Option<F64SharedRef>,
}
/// Add general standalone metrics for the message lane relay loop.
pub fn add_standalone_metrics<P: SubstrateMessageLane>(
metrics_params: MetricsParams,
source_client: Client<P::SourceChain>,
source_chain_token_id: Option<&str>,
target_chain_token_id: Option<&str>,
target_to_source_conversion_rate_params: Option<(StorageKey, FixedU128)>,
) -> anyhow::Result<(MetricsParams, StandaloneMessagesMetrics)> {
let mut source_to_base_conversion_rate = None;
let mut target_to_base_conversion_rate = None;
let mut metrics_params =
relay_utils::relay_metrics(None, metrics_params).standalone_metric(|registry, prefix| {
StorageProofOverheadMetric::new(
registry,
prefix,
source_client.clone(),
format!("{}_storage_proof_overhead", P::SourceChain::NAME.to_lowercase()),
format!("{} storage proof overhead", P::SourceChain::NAME),
)
})?;
if let Some((target_to_source_conversion_rate_storage_key, initial_target_to_source_conversion_rate)) =
target_to_source_conversion_rate_params
{
metrics_params = metrics_params.standalone_metric(|registry, prefix| {
let metric = FloatStorageValueMetric::<_, sp_runtime::FixedU128>::new(
registry,
prefix,
source_client,
target_to_source_conversion_rate_storage_key,
Some(initial_target_to_source_conversion_rate),
format!(
"{}_{}_to_{}_conversion_rate",
P::SourceChain::NAME,
P::TargetChain::NAME,
P::SourceChain::NAME
),
format!(
"{} to {} tokens conversion rate (used by {})",
P::TargetChain::NAME,
P::SourceChain::NAME,
P::SourceChain::NAME
),
)?;
Ok(metric)
})?;
}
if let Some(source_chain_token_id) = source_chain_token_id {
metrics_params = metrics_params.standalone_metric(|registry, prefix| {
let metric = crate::chains::token_price_metric(registry, prefix, source_chain_token_id)?;
source_to_base_conversion_rate = Some(metric.shared_value_ref());
Ok(metric)
})?;
}
if let Some(target_chain_token_id) = target_chain_token_id {
metrics_params = metrics_params.standalone_metric(|registry, prefix| {
let metric = crate::chains::token_price_metric(registry, prefix, target_chain_token_id)?;
target_to_base_conversion_rate = Some(metric.shared_value_ref());
Ok(metric)
})?;
}
Ok((
metrics_params.into_params(),
StandaloneMessagesMetrics {
source_to_base_conversion_rate,
target_to_base_conversion_rate,
},
))
}
#[cfg(test)]
mod tests {
use super::*;
+1
View File
@@ -6,6 +6,7 @@ edition = "2018"
license = "GPL-3.0-or-later WITH Classpath-exception-2.0"
[dependencies]
anyhow = "1.0"
async-std = "1.6.5"
async-trait = "0.1.40"
backoff = "0.2"
+1 -1
View File
@@ -90,7 +90,7 @@ pub async fn run<P: TransactionProofPipeline>(
target_client: impl TargetClient<P>,
metrics_params: MetricsParams,
exit_signal: impl Future<Output = ()> + 'static + Send,
) -> Result<(), String> {
) -> anyhow::Result<()> {
let exit_signal = exit_signal.shared();
relay_utils::relay_loop(source_client, target_client)
+1
View File
@@ -7,6 +7,7 @@ license = "GPL-3.0-or-later WITH Classpath-exception-2.0"
description = "Finality proofs relay"
[dependencies]
anyhow = "1.0"
async-std = "1.6.5"
async-trait = "0.1.40"
backoff = "0.2"
+1 -1
View File
@@ -104,7 +104,7 @@ pub async fn run<P: FinalitySyncPipeline>(
sync_params: FinalitySyncParams,
metrics_params: MetricsParams,
exit_signal: impl Future<Output = ()> + 'static + Send,
) -> Result<(), String> {
) -> anyhow::Result<()> {
let exit_signal = exit_signal.shared();
relay_utils::relay_loop(source_client, target_client)
.with_metrics(Some(metrics_prefix::<P>()), metrics_params)
+1
View File
@@ -6,6 +6,7 @@ edition = "2018"
license = "GPL-3.0-or-later WITH Classpath-exception-2.0"
[dependencies]
anyhow = "1.0"
async-std = "1.6.5"
async-trait = "0.1.40"
backoff = "0.2"
+1 -1
View File
@@ -126,7 +126,7 @@ pub async fn run<P: HeadersSyncPipeline, TC: TargetClient<P>>(
sync_params: HeadersSyncParams,
metrics_params: MetricsParams,
exit_signal: impl Future<Output = ()> + 'static + Send,
) -> Result<(), String> {
) -> anyhow::Result<()> {
let exit_signal = exit_signal.shared();
relay_utils::relay_loop(source_client, target_client)
.with_metrics(Some(metrics_prefix::<P>()), metrics_params)
+1
View File
@@ -6,6 +6,7 @@ edition = "2018"
license = "GPL-3.0-or-later WITH Classpath-exception-2.0"
[dependencies]
anyhow = "1.0"
async-std = { version = "1.6.5", features = ["attributes"] }
async-trait = "0.1.40"
futures = "0.3.5"
@@ -258,7 +258,7 @@ pub async fn run<P: MessageLane>(
target_client: impl TargetClient<P>,
metrics_params: MetricsParams,
exit_signal: impl Future<Output = ()> + Send + 'static,
) -> Result<(), String> {
) -> anyhow::Result<()> {
let exit_signal = exit_signal.shared();
relay_utils::relay_loop(source_client, target_client)
.reconnect_delay(params.reconnect_delay)
+1
View File
@@ -7,6 +7,7 @@ license = "GPL-3.0-or-later WITH Classpath-exception-2.0"
[dependencies]
ansi_term = "0.12"
anyhow = "1.0"
async-std = "1.6.5"
async-trait = "0.1.40"
backoff = "0.2"
+4
View File
@@ -21,12 +21,16 @@ pub use substrate_prometheus_endpoint::{
register, Counter, CounterVec, Gauge, GaugeVec, Opts, PrometheusError, Registry, F64, U64,
};
use async_std::sync::{Arc, RwLock};
use async_trait::async_trait;
use std::{fmt::Debug, time::Duration};
mod float_json_value;
mod global;
/// Shared reference to `f64` value that is updated by the metric.
pub type F64SharedRef = Arc<RwLock<Option<f64>>>;
/// Unparsed address that needs to be used to expose Prometheus metrics.
#[derive(Debug, Clone)]
pub struct MetricsAddress {
@@ -14,8 +14,9 @@
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
use crate::metrics::{metric_name, register, Gauge, PrometheusError, Registry, StandaloneMetrics, F64};
use crate::metrics::{metric_name, register, F64SharedRef, Gauge, PrometheusError, Registry, StandaloneMetrics, F64};
use async_std::sync::{Arc, RwLock};
use async_trait::async_trait;
use std::time::Duration;
@@ -28,6 +29,7 @@ pub struct FloatJsonValueMetric {
url: String,
json_path: String,
metric: Gauge<F64>,
shared_value_ref: F64SharedRef,
}
impl FloatJsonValueMetric {
@@ -40,13 +42,20 @@ impl FloatJsonValueMetric {
name: String,
help: String,
) -> Result<Self, PrometheusError> {
let shared_value_ref = Arc::new(RwLock::new(None));
Ok(FloatJsonValueMetric {
url,
json_path,
metric: register(Gauge::new(metric_name(prefix, &name), help)?, registry)?,
shared_value_ref,
})
}
/// Get shared reference to metric value.
pub fn shared_value_ref(&self) -> F64SharedRef {
self.shared_value_ref.clone()
}
/// Read value from HTTP service.
async fn read_value(&self) -> Result<f64, String> {
use isahc::{AsyncReadResponseExt, HttpClient, Request};
@@ -79,7 +88,9 @@ impl StandaloneMetrics for FloatJsonValueMetric {
}
async fn update(&self) {
crate::metrics::set_gauge_value(&self.metric, self.read_value().await.map(Some));
let value = self.read_value().await;
crate::metrics::set_gauge_value(&self.metric, value.clone().map(Some));
*self.shared_value_ref.write().await = value.ok();
}
}
+9 -8
View File
@@ -105,7 +105,7 @@ impl<SC, TC, LM> Loop<SC, TC, LM> {
/// This function represents an outer loop, which in turn calls provided `run_loop` function to do
/// actual job. When `run_loop` returns, this outer loop reconnects to failed client (source,
/// target or both) and calls `run_loop` again.
pub async fn run<R, F>(mut self, loop_name: String, run_loop: R) -> Result<(), String>
pub async fn run<R, F>(mut self, loop_name: String, run_loop: R) -> anyhow::Result<()>
where
R: 'static + Send + Fn(SC, TC, Option<LM>) -> F,
F: 'static + Send + Future<Output = Result<(), FailedClient>>,
@@ -151,8 +151,8 @@ impl<SC, TC, LM> LoopMetrics<SC, TC, LM> {
pub fn loop_metric<NewLM: Metrics>(
self,
create_metric: impl FnOnce(&Registry, Option<&str>) -> Result<NewLM, PrometheusError>,
) -> Result<LoopMetrics<SC, TC, NewLM>, String> {
let loop_metric = create_metric(&self.registry, self.metrics_prefix.as_deref()).map_err(|e| e.to_string())?;
) -> anyhow::Result<LoopMetrics<SC, TC, NewLM>> {
let loop_metric = create_metric(&self.registry, self.metrics_prefix.as_deref())?;
Ok(LoopMetrics {
relay_loop: self.relay_loop,
@@ -167,13 +167,13 @@ impl<SC, TC, LM> LoopMetrics<SC, TC, LM> {
pub fn standalone_metric<M: StandaloneMetrics>(
self,
create_metric: impl FnOnce(&Registry, Option<&str>) -> Result<M, PrometheusError>,
) -> Result<Self, String> {
) -> anyhow::Result<Self> {
// since standalone metrics are updating themselves, we may just ignore the fact that the same
// standalone metric is exposed by several loops && only spawn single metric
match create_metric(&self.registry, self.metrics_prefix.as_deref()) {
Ok(standalone_metrics) => standalone_metrics.spawn(),
Err(PrometheusError::AlreadyReg) => (),
Err(e) => return Err(e.to_string()),
Err(e) => anyhow::bail!(e),
}
Ok(self)
@@ -191,13 +191,14 @@ impl<SC, TC, LM> LoopMetrics<SC, TC, LM> {
/// Expose metrics using address passed at creation.
///
/// If passed `address` is `None`, metrics are not exposed.
pub async fn expose(self) -> Result<Loop<SC, TC, LM>, String> {
pub async fn expose(self) -> anyhow::Result<Loop<SC, TC, LM>> {
if let Some(address) = self.address {
let socket_addr = SocketAddr::new(
address.host.parse().map_err(|err| {
format!(
anyhow::format_err!(
"Invalid host {} is used to expose Prometheus metrics: {}",
address.host, err,
address.host,
err,
)
})?,
address.port,