,
P::TargetHeaderNumber: Decode,
P::TargetHeaderHash: Decode,
R: Send + Sync + MessagesConfig,
@@ -226,7 +237,11 @@ where
Ok(())
}
- async fn activate_target_to_source_headers_relay(&self, _activate: bool) {}
+ async fn require_target_header_on_source(&self, id: TargetHeaderIdOf ) {
+ if let Some(ref target_to_source_headers_relay) = self.target_to_source_headers_relay {
+ target_to_source_headers_relay.require_finalized_header(id);
+ }
+ }
}
pub async fn read_client_state(
diff --git a/bridges/relays/bin-substrate/src/messages_target.rs b/bridges/relays/bin-substrate/src/messages_target.rs
index fe3f95ca8e..1760832730 100644
--- a/bridges/relays/bin-substrate/src/messages_target.rs
+++ b/bridges/relays/bin-substrate/src/messages_target.rs
@@ -20,6 +20,7 @@
use crate::messages_lane::SubstrateMessageLane;
use crate::messages_source::read_client_state;
+use crate::on_demand_headers::OnDemandHeadersRelay;
use async_trait::async_trait;
use bp_messages::{LaneId, MessageNonce, UnrewardedRelayersState};
@@ -45,22 +46,30 @@ pub type SubstrateMessagesReceivingProof = (
);
/// Substrate client as Substrate messages target.
-pub struct SubstrateMessagesTarget {
+pub struct SubstrateMessagesTarget {
client: Client,
lane: P,
lane_id: LaneId,
instance: InstanceId,
+ source_to_target_headers_relay: Option>,
_phantom: PhantomData<(R, I)>,
}
-impl SubstrateMessagesTarget {
+impl SubstrateMessagesTarget {
/// Create new Substrate headers target.
- pub fn new(client: Client, lane: P, lane_id: LaneId, instance: InstanceId) -> Self {
+ pub fn new(
+ client: Client,
+ lane: P,
+ lane_id: LaneId,
+ instance: InstanceId,
+ source_to_target_headers_relay: Option>,
+ ) -> Self {
SubstrateMessagesTarget {
client,
lane,
lane_id,
instance,
+ source_to_target_headers_relay,
_phantom: Default::default(),
}
}
@@ -73,6 +82,7 @@ impl Clone for SubstrateMessagesTarget<
lane: self.lane.clone(),
lane_id: self.lane_id,
instance: self.instance,
+ source_to_target_headers_relay: self.source_to_target_headers_relay.clone(),
_phantom: Default::default(),
}
}
@@ -106,6 +116,7 @@ where
TargetHeaderNumber = ::Number,
TargetHeaderHash = ::Hash,
>,
+ P::SourceChain: Chain,
P::SourceHeaderNumber: Decode,
P::SourceHeaderHash: Decode,
R: Send + Sync + MessagesConfig,
@@ -213,5 +224,9 @@ where
Ok(nonces)
}
- async fn activate_source_to_target_headers_relay(&self, _activate: bool) {}
+ async fn require_source_header_on_target(&self, id: SourceHeaderIdOf ) {
+ if let Some(ref source_to_target_headers_relay) = self.source_to_target_headers_relay {
+ source_to_target_headers_relay.require_finalized_header(id);
+ }
+ }
}
diff --git a/bridges/relays/bin-substrate/src/on_demand_headers.rs b/bridges/relays/bin-substrate/src/on_demand_headers.rs
new file mode 100644
index 0000000000..4c86b6a170
--- /dev/null
+++ b/bridges/relays/bin-substrate/src/on_demand_headers.rs
@@ -0,0 +1,255 @@
+// Copyright 2019-2021 Parity Technologies (UK) Ltd.
+// This file is part of Parity Bridges Common.
+
+// Parity Bridges Common is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Parity Bridges Common is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Parity Bridges Common. If not, see .
+
+//! On-demand Substrate -> Substrate headers relay.
+
+use crate::finality_pipeline::{SubstrateFinalitySyncPipeline, SubstrateFinalityToSubstrate};
+use crate::finality_target::SubstrateFinalityTarget;
+
+use bp_header_chain::justification::GrandpaJustification;
+use finality_relay::TargetClient as FinalityTargetClient;
+use futures::{
+ channel::{mpsc, oneshot},
+ select, FutureExt, StreamExt,
+};
+use num_traits::Zero;
+use relay_substrate_client::{BlockNumberOf, Chain, Client, HashOf, HeaderIdOf, SyncHeader};
+use relay_utils::{metrics::MetricsParams, BlockNumberBase, HeaderId};
+use std::fmt::Debug;
+
+/// On-demand Substrate <-> Substrate headers relay.
+///
+/// This relay may be started by messages whenever some other relay (e.g. messages relay) needs more
+/// headers to be relayed to continue its regular work. When enough headers are relayed, on-demand
+/// relay may be deactivated.
+#[derive(Clone)]
+pub struct OnDemandHeadersRelay {
+ /// Background task name.
+ background_task_name: String,
+ /// Required headers to background sender.
+ required_header_tx: mpsc::Sender>,
+}
+
+impl OnDemandHeadersRelay {
+ /// Create new on-demand headers relay.
+ pub fn new(
+ source_client: Client,
+ target_client: Client,
+ pipeline: SubstrateFinalityToSubstrate,
+ ) -> Self
+ where
+ SourceChain: Chain + Debug,
+ SourceChain::BlockNumber: BlockNumberBase,
+ TargetChain: Chain + Debug,
+ TargetChain::BlockNumber: BlockNumberBase,
+ TargetSign: Clone + Send + Sync + 'static,
+ SubstrateFinalityToSubstrate: SubstrateFinalitySyncPipeline<
+ Hash = HashOf,
+ Number = BlockNumberOf,
+ Header = SyncHeader,
+ FinalityProof = GrandpaJustification,
+ TargetChain = TargetChain,
+ >,
+ SubstrateFinalityTarget>:
+ FinalityTargetClient>,
+ {
+ let (required_header_tx, required_header_rx) = mpsc::channel(1);
+ async_std::task::spawn(async move {
+ background_task(source_client, target_client, pipeline, required_header_rx).await;
+ });
+
+ let background_task_name = format!(
+ "{}-background",
+ on_demand_headers_relay_name::()
+ );
+ OnDemandHeadersRelay {
+ background_task_name,
+ required_header_tx,
+ }
+ }
+
+ /// Someone is asking us to relay given finalized header.
+ pub fn require_finalized_header(&self, header_id: HeaderIdOf) {
+ if let Err(error) = self.required_header_tx.clone().try_send(header_id) {
+ log::error!(
+ target: "bridge",
+ "Failed to send require header id {:?} to {:?}: {:?}",
+ header_id,
+ self.background_task_name,
+ error,
+ );
+ }
+ }
+}
+
+/// Background task that is responsible for starting and stopping headers relay when required.
+async fn background_task(
+ source_client: Client,
+ target_client: Client,
+ pipeline: SubstrateFinalityToSubstrate,
+ mut required_header_rx: mpsc::Receiver>,
+) where
+ SourceChain: Chain + Debug,
+ SourceChain::BlockNumber: BlockNumberBase,
+ TargetChain: Chain + Debug,
+ TargetChain::BlockNumber: BlockNumberBase,
+ TargetSign: Clone + Send + Sync + 'static,
+ SubstrateFinalityToSubstrate: SubstrateFinalitySyncPipeline<
+ Hash = HashOf,
+ Number = BlockNumberOf,
+ Header = SyncHeader,
+ FinalityProof = GrandpaJustification,
+ TargetChain = TargetChain,
+ >,
+ SubstrateFinalityTarget>:
+ FinalityTargetClient>,
+{
+ let relay_task_name = on_demand_headers_relay_name::();
+ let finality_target = SubstrateFinalityTarget::new(target_client.clone(), pipeline.clone());
+
+ let mut active_headers_relay = None;
+ let mut required_header_number = Zero::zero();
+ let mut relay_exited_rx = futures::future::pending().left_future();
+
+ loop {
+ // wait for next target block or for new required header
+ select! {
+ _ = async_std::task::sleep(TargetChain::AVERAGE_BLOCK_INTERVAL).fuse() => {},
+ required_header_id = required_header_rx.next() => {
+ match required_header_id {
+ Some(required_header_id) => {
+ if required_header_id.0 > required_header_number {
+ required_header_number = required_header_id.0;
+ }
+ },
+ None => {
+ // that's the only way to exit background task - to drop `required_header_tx`
+ break
+ },
+ }
+ },
+ _ = relay_exited_rx => {
+ // there could be a situation when we're receiving exit signals after we
+ // have already stopped relay or when we have already started new relay.
+ // but it isn't critical, because even if we'll accidentally stop new relay
+ // we'll restart it almost immediately
+ stop_on_demand_headers_relay(active_headers_relay.take()).await;
+ },
+ }
+
+ // read best finalized source block from target
+ let available_header_number = match finality_target.best_finalized_source_block_number().await {
+ Ok(available_header_number) => available_header_number,
+ Err(error) => {
+ log::error!(
+ target: "bridge",
+ "Failed to read best finalized {} header from {} in {} relay: {:?}",
+ SourceChain::NAME,
+ TargetChain::NAME,
+ relay_task_name,
+ error,
+ );
+
+ // we don't know what's happening with target client, so better to stop on-demand relay than
+ // submit unneeded transactions
+ // => assume that required header is known to the target node
+ required_header_number
+ }
+ };
+
+ // start or stop headers relay if required
+ let activate = required_header_number > available_header_number;
+ match (activate, active_headers_relay.is_some()) {
+ (true, false) => {
+ let (relay_exited_tx, new_relay_exited_rx) = oneshot::channel();
+ active_headers_relay = start_on_demand_headers_relay(
+ relay_task_name.clone(),
+ relay_exited_tx,
+ source_client.clone(),
+ target_client.clone(),
+ pipeline.clone(),
+ );
+ if active_headers_relay.is_some() {
+ relay_exited_rx = new_relay_exited_rx.right_future();
+ }
+ }
+ (false, true) => {
+ stop_on_demand_headers_relay(active_headers_relay.take()).await;
+ }
+ _ => (),
+ }
+ }
+}
+
+/// On-demand headers relay task name.
+fn on_demand_headers_relay_name() -> String {
+ format!("on-demand-{}-to-{}", SourceChain::NAME, TargetChain::NAME)
+}
+
+/// Start on-demand headers relay task.
+fn start_on_demand_headers_relay(
+ task_name: String,
+ relay_exited_tx: oneshot::Sender<()>,
+ source_client: Client,
+ target_client: Client,
+ pipeline: SubstrateFinalityToSubstrate,
+) -> Option>
+where
+ SourceChain::BlockNumber: BlockNumberBase,
+ SubstrateFinalityToSubstrate: SubstrateFinalitySyncPipeline<
+ Hash = HashOf,
+ Number = BlockNumberOf,
+ Header = SyncHeader,
+ FinalityProof = GrandpaJustification,
+ TargetChain = TargetChain,
+ >,
+ TargetSign: 'static,
+{
+ let headers_relay_future =
+ crate::finality_pipeline::run(pipeline, source_client, target_client, MetricsParams::disabled());
+ let closure_task_name = task_name.clone();
+ async_std::task::Builder::new()
+ .name(task_name.clone())
+ .spawn(async move {
+ log::info!(target: "bridge", "Starting {} headers relay", closure_task_name);
+ let result = headers_relay_future.await;
+ log::trace!(target: "bridge", "{} headers relay has exited. Result: {:?}", closure_task_name, result);
+ let _ = relay_exited_tx.send(());
+ })
+ .map_err(|error| {
+ log::error!(
+ target: "bridge",
+ "Failed to start {} relay: {:?}",
+ task_name,
+ error,
+ );
+ })
+ .ok()
+}
+
+/// Stop on-demand headers relay task.
+async fn stop_on_demand_headers_relay(task: Option>) {
+ if let Some(task) = task {
+ let task_name = task
+ .task()
+ .name()
+ .expect("on-demand tasks are always started with name; qed")
+ .to_string();
+ log::trace!(target: "bridge", "Cancelling {} headers relay", task_name);
+ task.cancel().await;
+ log::info!(target: "bridge", "Cancelled {} headers relay", task_name);
+ }
+}
diff --git a/bridges/relays/bin-substrate/src/rialto_millau/millau_messages_to_rialto.rs b/bridges/relays/bin-substrate/src/rialto_millau/millau_messages_to_rialto.rs
index 31a0061819..d96fa7b797 100644
--- a/bridges/relays/bin-substrate/src/rialto_millau/millau_messages_to_rialto.rs
+++ b/bridges/relays/bin-substrate/src/rialto_millau/millau_messages_to_rialto.rs
@@ -16,12 +16,13 @@
//! Millau-to-Rialto messages sync entrypoint.
-use super::{MillauClient, RialtoClient};
-use crate::messages_lane::{select_delivery_transaction_limits, SubstrateMessageLane, SubstrateMessageLaneToSubstrate};
+use crate::messages_lane::{
+ select_delivery_transaction_limits, MessagesRelayParams, SubstrateMessageLane, SubstrateMessageLaneToSubstrate,
+};
use crate::messages_source::SubstrateMessagesSource;
use crate::messages_target::SubstrateMessagesTarget;
-use bp_messages::{LaneId, MessageNonce};
+use bp_messages::MessageNonce;
use bp_runtime::{MILLAU_BRIDGE_INSTANCE, RIALTO_BRIDGE_INSTANCE};
use bridge_runtime_common::messages::target::FromBridgedChainMessagesProof;
use codec::Encode;
@@ -33,12 +34,12 @@ use relay_substrate_client::{
metrics::{FloatStorageValueMetric, StorageProofOverheadMetric},
Chain, TransactionSignScheme,
};
-use relay_utils::metrics::MetricsParams;
use sp_core::{Bytes, Pair};
use std::{ops::RangeInclusive, time::Duration};
/// Millau-to-Rialto message lane.
-type MillauMessagesToRialto = SubstrateMessageLaneToSubstrate;
+pub type MillauMessagesToRialto =
+ SubstrateMessageLaneToSubstrate;
impl SubstrateMessageLane for MillauMessagesToRialto {
const OUTBOUND_LANE_MESSAGES_DISPATCH_WEIGHT_METHOD: &'static str =
@@ -143,21 +144,18 @@ type RialtoTargetClient = SubstrateMessagesTarget<
/// Run Millau-to-Rialto messages sync.
pub async fn run(
- millau_client: MillauClient,
- millau_sign: MillauSigningParams,
- rialto_client: RialtoClient,
- rialto_sign: RialtoSigningParams,
- lane_id: LaneId,
- metrics_params: MetricsParams,
+ params: MessagesRelayParams,
) -> Result<(), String> {
let stall_timeout = Duration::from_secs(5 * 60);
- let relayer_id_at_millau = (*millau_sign.public().as_array_ref()).into();
+ let relayer_id_at_millau = (*params.source_sign.public().as_array_ref()).into();
+ let lane_id = params.lane_id;
+ let source_client = params.source_client;
let lane = MillauMessagesToRialto {
- source_client: millau_client.clone(),
- source_sign: millau_sign,
- target_client: rialto_client.clone(),
- target_sign: rialto_sign,
+ source_client: source_client.clone(),
+ source_sign: params.source_sign,
+ target_client: params.target_client.clone(),
+ target_sign: params.target_sign,
relayer_id_at_source: relayer_id_at_millau,
};
@@ -198,24 +196,48 @@ pub async fn run(
max_messages_size_in_single_batch,
},
},
- MillauSourceClient::new(millau_client.clone(), lane.clone(), lane_id, RIALTO_BRIDGE_INSTANCE),
- RialtoTargetClient::new(rialto_client, lane, lane_id, MILLAU_BRIDGE_INSTANCE),
+ MillauSourceClient::new(
+ source_client.clone(),
+ lane.clone(),
+ lane_id,
+ RIALTO_BRIDGE_INSTANCE,
+ params.target_to_source_headers_relay,
+ ),
+ RialtoTargetClient::new(
+ params.target_client,
+ lane,
+ lane_id,
+ MILLAU_BRIDGE_INSTANCE,
+ params.source_to_target_headers_relay,
+ ),
relay_utils::relay_metrics(
- messages_relay::message_lane_loop::metrics_prefix::(&lane_id),
- metrics_params.address,
+ Some(messages_relay::message_lane_loop::metrics_prefix::<
+ MillauMessagesToRialto,
+ >(&lane_id)),
+ params.metrics_params,
)
- .standalone_metric(StorageProofOverheadMetric::new(
- millau_client.clone(),
- "millau_storage_proof_overhead".into(),
- "Millau storage proof overhead".into(),
- ))?
- .standalone_metric(FloatStorageValueMetric::<_, sp_runtime::FixedU128>::new(
- millau_client,
- sp_core::storage::StorageKey(millau_runtime::rialto_messages::RialtoToMillauConversionRate::key().to_vec()),
- Some(millau_runtime::rialto_messages::INITIAL_RIALTO_TO_MILLAU_CONVERSION_RATE),
- "millau_rialto_to_millau_conversion_rate".into(),
- "Rialto to Millau tokens conversion rate (used by Rialto)".into(),
- ))?
+ .standalone_metric(|registry, prefix| {
+ StorageProofOverheadMetric::new(
+ registry,
+ prefix,
+ source_client.clone(),
+ "millau_storage_proof_overhead".into(),
+ "Millau storage proof overhead".into(),
+ )
+ })?
+ .standalone_metric(|registry, prefix| {
+ FloatStorageValueMetric::<_, sp_runtime::FixedU128>::new(
+ registry,
+ prefix,
+ source_client,
+ sp_core::storage::StorageKey(
+ millau_runtime::rialto_messages::RialtoToMillauConversionRate::key().to_vec(),
+ ),
+ Some(millau_runtime::rialto_messages::INITIAL_RIALTO_TO_MILLAU_CONVERSION_RATE),
+ "millau_rialto_to_millau_conversion_rate".into(),
+ "Rialto to Millau tokens conversion rate (used by Rialto)".into(),
+ )
+ })?
.into_params(),
futures::future::pending(),
)
diff --git a/bridges/relays/bin-substrate/src/rialto_millau/mod.rs b/bridges/relays/bin-substrate/src/rialto_millau/mod.rs
index a04cd55f7d..af17f62a05 100644
--- a/bridges/relays/bin-substrate/src/rialto_millau/mod.rs
+++ b/bridges/relays/bin-substrate/src/rialto_millau/mod.rs
@@ -22,11 +22,6 @@ pub mod rialto_headers_to_millau;
pub mod rialto_messages_to_millau;
pub mod westend_headers_to_millau;
-/// Millau node client.
-pub type MillauClient = relay_substrate_client::Client;
-/// Rialto node client.
-pub type RialtoClient = relay_substrate_client::Client;
-
use crate::cli::{
bridge,
encode_call::{self, Call, CliEncodeCall},
diff --git a/bridges/relays/bin-substrate/src/rialto_millau/rialto_messages_to_millau.rs b/bridges/relays/bin-substrate/src/rialto_millau/rialto_messages_to_millau.rs
index 3b11d998b3..ec39a4caa3 100644
--- a/bridges/relays/bin-substrate/src/rialto_millau/rialto_messages_to_millau.rs
+++ b/bridges/relays/bin-substrate/src/rialto_millau/rialto_messages_to_millau.rs
@@ -16,12 +16,13 @@
//! Rialto-to-Millau messages sync entrypoint.
-use super::{MillauClient, RialtoClient};
-use crate::messages_lane::{select_delivery_transaction_limits, SubstrateMessageLane, SubstrateMessageLaneToSubstrate};
+use crate::messages_lane::{
+ select_delivery_transaction_limits, MessagesRelayParams, SubstrateMessageLane, SubstrateMessageLaneToSubstrate,
+};
use crate::messages_source::SubstrateMessagesSource;
use crate::messages_target::SubstrateMessagesTarget;
-use bp_messages::{LaneId, MessageNonce};
+use bp_messages::MessageNonce;
use bp_runtime::{MILLAU_BRIDGE_INSTANCE, RIALTO_BRIDGE_INSTANCE};
use bridge_runtime_common::messages::target::FromBridgedChainMessagesProof;
use codec::Encode;
@@ -33,12 +34,12 @@ use relay_substrate_client::{
metrics::{FloatStorageValueMetric, StorageProofOverheadMetric},
Chain, TransactionSignScheme,
};
-use relay_utils::metrics::MetricsParams;
use sp_core::{Bytes, Pair};
use std::{ops::RangeInclusive, time::Duration};
/// Rialto-to-Millau message lane.
-type RialtoMessagesToMillau = SubstrateMessageLaneToSubstrate;
+pub type RialtoMessagesToMillau =
+ SubstrateMessageLaneToSubstrate;
impl SubstrateMessageLane for RialtoMessagesToMillau {
const OUTBOUND_LANE_MESSAGES_DISPATCH_WEIGHT_METHOD: &'static str =
@@ -143,21 +144,18 @@ type MillauTargetClient = SubstrateMessagesTarget<
/// Run Rialto-to-Millau messages sync.
pub async fn run(
- rialto_client: RialtoClient,
- rialto_sign: RialtoSigningParams,
- millau_client: MillauClient,
- millau_sign: MillauSigningParams,
- lane_id: LaneId,
- metrics_params: MetricsParams,
+ params: MessagesRelayParams,
) -> Result<(), String> {
let stall_timeout = Duration::from_secs(5 * 60);
- let relayer_id_at_rialto = (*rialto_sign.public().as_array_ref()).into();
+ let relayer_id_at_rialto = (*params.source_sign.public().as_array_ref()).into();
+ let lane_id = params.lane_id;
+ let source_client = params.source_client;
let lane = RialtoMessagesToMillau {
- source_client: rialto_client.clone(),
- source_sign: rialto_sign,
- target_client: millau_client.clone(),
- target_sign: millau_sign,
+ source_client: source_client.clone(),
+ source_sign: params.source_sign,
+ target_client: params.target_client.clone(),
+ target_sign: params.target_sign,
relayer_id_at_source: relayer_id_at_rialto,
};
@@ -197,24 +195,48 @@ pub async fn run(
max_messages_size_in_single_batch,
},
},
- RialtoSourceClient::new(rialto_client.clone(), lane.clone(), lane_id, MILLAU_BRIDGE_INSTANCE),
- MillauTargetClient::new(millau_client, lane, lane_id, RIALTO_BRIDGE_INSTANCE),
+ RialtoSourceClient::new(
+ source_client.clone(),
+ lane.clone(),
+ lane_id,
+ MILLAU_BRIDGE_INSTANCE,
+ params.target_to_source_headers_relay,
+ ),
+ MillauTargetClient::new(
+ params.target_client,
+ lane,
+ lane_id,
+ RIALTO_BRIDGE_INSTANCE,
+ params.source_to_target_headers_relay,
+ ),
relay_utils::relay_metrics(
- messages_relay::message_lane_loop::metrics_prefix::(&lane_id),
- metrics_params.address,
+ Some(messages_relay::message_lane_loop::metrics_prefix::<
+ RialtoMessagesToMillau,
+ >(&lane_id)),
+ params.metrics_params,
)
- .standalone_metric(StorageProofOverheadMetric::new(
- rialto_client.clone(),
- "rialto_storage_proof_overhead".into(),
- "Rialto storage proof overhead".into(),
- ))?
- .standalone_metric(FloatStorageValueMetric::<_, sp_runtime::FixedU128>::new(
- rialto_client,
- sp_core::storage::StorageKey(rialto_runtime::millau_messages::MillauToRialtoConversionRate::key().to_vec()),
- Some(rialto_runtime::millau_messages::INITIAL_MILLAU_TO_RIALTO_CONVERSION_RATE),
- "rialto_millau_to_rialto_conversion_rate".into(),
- "Millau to Rialto tokens conversion rate (used by Millau)".into(),
- ))?
+ .standalone_metric(|registry, prefix| {
+ StorageProofOverheadMetric::new(
+ registry,
+ prefix,
+ source_client.clone(),
+ "rialto_storage_proof_overhead".into(),
+ "Rialto storage proof overhead".into(),
+ )
+ })?
+ .standalone_metric(|registry, prefix| {
+ FloatStorageValueMetric::<_, sp_runtime::FixedU128>::new(
+ registry,
+ prefix,
+ source_client,
+ sp_core::storage::StorageKey(
+ rialto_runtime::millau_messages::MillauToRialtoConversionRate::key().to_vec(),
+ ),
+ Some(rialto_runtime::millau_messages::INITIAL_MILLAU_TO_RIALTO_CONVERSION_RATE),
+ "rialto_millau_to_rialto_conversion_rate".into(),
+ "Millau to Rialto tokens conversion rate (used by Millau)".into(),
+ )
+ })?
.into_params(),
futures::future::pending(),
)
diff --git a/bridges/relays/bin-substrate/src/rialto_millau/westend_headers_to_millau.rs b/bridges/relays/bin-substrate/src/rialto_millau/westend_headers_to_millau.rs
index c6f9a1fd3d..f8f023da19 100644
--- a/bridges/relays/bin-substrate/src/rialto_millau/westend_headers_to_millau.rs
+++ b/bridges/relays/bin-substrate/src/rialto_millau/westend_headers_to_millau.rs
@@ -36,22 +36,30 @@ impl SubstrateFinalitySyncPipeline for WestendFinalityToMillau {
fn customize_metrics(params: MetricsParams) -> anyhow::Result {
Ok(
- relay_utils::relay_metrics(finality_relay::metrics_prefix::(), params.address)
+ relay_utils::relay_metrics(Some(finality_relay::metrics_prefix::()), params)
// Polkadot/Kusama prices are added as metrics here, because atm we don't have Polkadot <-> Kusama
// relays, but we want to test metrics/dashboards in advance
- .standalone_metric(FloatJsonValueMetric::new(
- "https://api.coingecko.com/api/v3/simple/price?ids=Polkadot&vs_currencies=usd".into(),
- "$.polkadot.usd".into(),
- "polkadot_price".into(),
- "Polkadot price in USD".into(),
- ))
+ .standalone_metric(|registry, prefix| {
+ FloatJsonValueMetric::new(
+ registry,
+ prefix,
+ "https://api.coingecko.com/api/v3/simple/price?ids=Polkadot&vs_currencies=usd".into(),
+ "$.polkadot.usd".into(),
+ "polkadot_price".into(),
+ "Polkadot price in USD".into(),
+ )
+ })
.map_err(|e| anyhow::format_err!("{}", e))?
- .standalone_metric(FloatJsonValueMetric::new(
- "https://api.coingecko.com/api/v3/simple/price?ids=Kusama&vs_currencies=usd".into(),
- "$.kusama.usd".into(),
- "kusama_price".into(),
- "Kusama price in USD".into(),
- ))
+ .standalone_metric(|registry, prefix| {
+ FloatJsonValueMetric::new(
+ registry,
+ prefix,
+ "https://api.coingecko.com/api/v3/simple/price?ids=Kusama&vs_currencies=usd".into(),
+ "$.kusama.usd".into(),
+ "kusama_price".into(),
+ "Kusama price in USD".into(),
+ )
+ })
.map_err(|e| anyhow::format_err!("{}", e))?
.into_params(),
)
diff --git a/bridges/relays/client-substrate/src/metrics/float_storage_value.rs b/bridges/relays/client-substrate/src/metrics/float_storage_value.rs
index e1647d4bcd..f3ba8988ee 100644
--- a/bridges/relays/client-substrate/src/metrics/float_storage_value.rs
+++ b/bridges/relays/client-substrate/src/metrics/float_storage_value.rs
@@ -19,7 +19,7 @@ use crate::client::Client;
use async_trait::async_trait;
use codec::Decode;
-use relay_utils::metrics::{register, Gauge, Metrics, Registry, StandaloneMetrics, F64};
+use relay_utils::metrics::{metric_name, register, Gauge, PrometheusError, Registry, StandaloneMetrics, F64};
use sp_core::storage::StorageKey;
use sp_runtime::{traits::UniqueSaturatedInto, FixedPointNumber};
use std::time::Duration;
@@ -39,29 +39,20 @@ pub struct FloatStorageValueMetric {
impl FloatStorageValueMetric {
/// Create new metric.
pub fn new(
+ registry: &Registry,
+ prefix: Option<&str>,
client: Client,
storage_key: StorageKey,
maybe_default_value: Option,
name: String,
help: String,
- ) -> Self {
- FloatStorageValueMetric {
+ ) -> Result {
+ Ok(FloatStorageValueMetric {
client,
storage_key,
maybe_default_value,
- metric: Gauge::new(name, help).expect(
- "only fails if gauge options are customized;\
- we use default options;\
- qed",
- ),
- }
- }
-}
-
-impl Metrics for FloatStorageValueMetric {
- fn register(&self, registry: &Registry) -> Result<(), String> {
- register(self.metric.clone(), registry).map_err(|e| e.to_string())?;
- Ok(())
+ metric: register(Gauge::new(metric_name(prefix, &name), help)?, registry)?,
+ })
}
}
diff --git a/bridges/relays/client-substrate/src/metrics/storage_proof_overhead.rs b/bridges/relays/client-substrate/src/metrics/storage_proof_overhead.rs
index e440683f52..526fe1e048 100644
--- a/bridges/relays/client-substrate/src/metrics/storage_proof_overhead.rs
+++ b/bridges/relays/client-substrate/src/metrics/storage_proof_overhead.rs
@@ -19,7 +19,7 @@ use crate::client::Client;
use crate::error::Error;
use async_trait::async_trait;
-use relay_utils::metrics::{register, Gauge, Metrics, Registry, StandaloneMetrics, U64};
+use relay_utils::metrics::{metric_name, register, Gauge, PrometheusError, Registry, StandaloneMetrics, U64};
use sp_core::storage::StorageKey;
use sp_runtime::traits::Header as HeaderT;
use sp_storage::well_known_keys::CODE;
@@ -49,15 +49,17 @@ impl Clone for StorageProofOverheadMetric {
impl StorageProofOverheadMetric {
/// Create new metric instance with given name and help.
- pub fn new(client: Client, name: String, help: String) -> Self {
- StorageProofOverheadMetric {
+ pub fn new(
+ registry: &Registry,
+ prefix: Option<&str>,
+ client: Client,
+ name: String,
+ help: String,
+ ) -> Result {
+ Ok(StorageProofOverheadMetric {
client,
- metric: Gauge::new(name, help).expect(
- "only fails if gauge options are customized;\
- we use default options;\
- qed",
- ),
- }
+ metric: register(Gauge::new(metric_name(prefix, &name), help)?, registry)?,
+ })
}
/// Returns approximate storage proof size overhead.
@@ -85,13 +87,6 @@ impl StorageProofOverheadMetric {
}
}
-impl Metrics for StorageProofOverheadMetric {
- fn register(&self, registry: &Registry) -> Result<(), String> {
- register(self.metric.clone(), registry).map_err(|e| e.to_string())?;
- Ok(())
- }
-}
-
#[async_trait]
impl StandaloneMetrics for StorageProofOverheadMetric {
fn update_interval(&self) -> Duration {
diff --git a/bridges/relays/exchange/src/exchange_loop.rs b/bridges/relays/exchange/src/exchange_loop.rs
index 7bb4abb746..b46d34e047 100644
--- a/bridges/relays/exchange/src/exchange_loop.rs
+++ b/bridges/relays/exchange/src/exchange_loop.rs
@@ -78,6 +78,11 @@ impl TransactionProofsRelayStorage for InMemoryStorag
}
}
+/// Return prefix that will be used by default to expose Prometheus metrics of the exchange loop.
+pub fn metrics_prefix() -> String {
+ format!("{}_to_{}_Exchange", P::SOURCE_NAME, P::TARGET_NAME)
+}
+
/// Run proofs synchronization.
pub async fn run(
storage: impl TransactionProofsRelayStorage>,
@@ -89,12 +94,9 @@ pub async fn run(
let exit_signal = exit_signal.shared();
relay_utils::relay_loop(source_client, target_client)
- .with_metrics(
- format!("{}_to_{}_Exchange", P::SOURCE_NAME, P::TARGET_NAME),
- metrics_params,
- )
- .loop_metric(ExchangeLoopMetrics::default())?
- .standalone_metric(GlobalMetrics::default())?
+ .with_metrics(Some(metrics_prefix::()), metrics_params)
+ .loop_metric(|registry, prefix| ExchangeLoopMetrics::new(registry, prefix))?
+ .standalone_metric(|registry, prefix| GlobalMetrics::new(registry, prefix))?
.expose()
.await?
.run(|source_client, target_client, metrics| {
diff --git a/bridges/relays/exchange/src/exchange_loop_metrics.rs b/bridges/relays/exchange/src/exchange_loop_metrics.rs
index 320f662c6d..82d3e649d4 100644
--- a/bridges/relays/exchange/src/exchange_loop_metrics.rs
+++ b/bridges/relays/exchange/src/exchange_loop_metrics.rs
@@ -17,7 +17,9 @@
//! Metrics for currency-exchange relay loop.
use crate::exchange::{BlockNumberOf, RelayedBlockTransactions, TransactionProofPipeline};
-use relay_utils::metrics::{register, Counter, CounterVec, GaugeVec, Metrics, Opts, Registry, U64};
+use relay_utils::metrics::{
+ metric_name, register, Counter, CounterVec, GaugeVec, Opts, PrometheusError, Registry, U64,
+};
/// Exchange transactions relay metrics.
#[derive(Clone)]
@@ -30,31 +32,38 @@ pub struct ExchangeLoopMetrics {
processed_transactions: CounterVec,
}
-impl Metrics for ExchangeLoopMetrics {
- fn register(&self, registry: &Registry) -> Result<(), String> {
- register(self.best_block_numbers.clone(), registry).map_err(|e| e.to_string())?;
- register(self.processed_blocks.clone(), registry).map_err(|e| e.to_string())?;
- register(self.processed_transactions.clone(), registry).map_err(|e| e.to_string())?;
- Ok(())
- }
-}
-
-impl Default for ExchangeLoopMetrics {
- fn default() -> Self {
- ExchangeLoopMetrics {
- best_block_numbers: GaugeVec::new(
- Opts::new("best_block_numbers", "Best finalized block numbers"),
- &["type"],
- )
- .expect("metric is static and thus valid; qed"),
- processed_blocks: Counter::new("processed_blocks", "Total number of processed blocks")
- .expect("metric is static and thus valid; qed"),
- processed_transactions: CounterVec::new(
- Opts::new("processed_transactions", "Total number of processed transactions"),
- &["type"],
- )
- .expect("metric is static and thus valid; qed"),
- }
+impl ExchangeLoopMetrics {
+ /// Create and register exchange loop metrics.
+ pub fn new(registry: &Registry, prefix: Option<&str>) -> Result {
+ Ok(ExchangeLoopMetrics {
+ best_block_numbers: register(
+ GaugeVec::new(
+ Opts::new(
+ metric_name(prefix, "best_block_numbers"),
+ "Best finalized block numbers",
+ ),
+ &["type"],
+ )?,
+ registry,
+ )?,
+ processed_blocks: register(
+ Counter::new(
+ metric_name(prefix, "processed_blocks"),
+ "Total number of processed blocks",
+ )?,
+ registry,
+ )?,
+ processed_transactions: register(
+ CounterVec::new(
+ Opts::new(
+ metric_name(prefix, "processed_transactions"),
+ "Total number of processed transactions",
+ ),
+ &["type"],
+ )?,
+ registry,
+ )?,
+ })
}
}
diff --git a/bridges/relays/finality/src/finality_loop.rs b/bridges/relays/finality/src/finality_loop.rs
index e0ad8a7c1b..aff32e46de 100644
--- a/bridges/relays/finality/src/finality_loop.rs
+++ b/bridges/relays/finality/src/finality_loop.rs
@@ -105,9 +105,9 @@ pub async fn run(
) -> Result<(), String> {
let exit_signal = exit_signal.shared();
relay_utils::relay_loop(source_client, target_client)
- .with_metrics(metrics_prefix::(), metrics_params)
- .loop_metric(SyncLoopMetrics::default())?
- .standalone_metric(GlobalMetrics::default())?
+ .with_metrics(Some(metrics_prefix::
()), metrics_params)
+ .loop_metric(|registry, prefix| SyncLoopMetrics::new(registry, prefix))?
+ .standalone_metric(|registry, prefix| GlobalMetrics::new(registry, prefix))?
.expose()
.await?
.run(|source_client, target_client, metrics| {
diff --git a/bridges/relays/headers/src/sync_loop.rs b/bridges/relays/headers/src/sync_loop.rs
index d193996d0c..e4f1b7b045 100644
--- a/bridges/relays/headers/src/sync_loop.rs
+++ b/bridges/relays/headers/src/sync_loop.rs
@@ -110,6 +110,11 @@ pub trait SyncMaintain: Clone + Send + Sync {
impl SyncMaintain for () {}
+/// Return prefix that will be used by default to expose Prometheus metrics of the finality proofs sync loop.
+pub fn metrics_prefix() -> String {
+ format!("{}_to_{}_Sync", P::SOURCE_NAME, P::TARGET_NAME)
+}
+
/// Run headers synchronization.
#[allow(clippy::too_many_arguments)]
pub async fn run>(
@@ -124,9 +129,9 @@ pub async fn run>(
) -> Result<(), String> {
let exit_signal = exit_signal.shared();
relay_utils::relay_loop(source_client, target_client)
- .with_metrics(format!("{}_to_{}_Sync", P::SOURCE_NAME, P::TARGET_NAME), metrics_params)
- .loop_metric(SyncLoopMetrics::default())?
- .standalone_metric(GlobalMetrics::default())?
+ .with_metrics(Some(metrics_prefix::()), metrics_params)
+ .loop_metric(|registry, prefix| SyncLoopMetrics::new(registry, prefix))?
+ .standalone_metric(|registry, prefix| GlobalMetrics::new(registry, prefix))?
.expose()
.await?
.run(|source_client, target_client, metrics| {
diff --git a/bridges/relays/headers/src/sync_loop_metrics.rs b/bridges/relays/headers/src/sync_loop_metrics.rs
index 616a04360d..37dae11340 100644
--- a/bridges/relays/headers/src/sync_loop_metrics.rs
+++ b/bridges/relays/headers/src/sync_loop_metrics.rs
@@ -20,7 +20,7 @@ use crate::sync::HeadersSync;
use crate::sync_types::{HeaderStatus, HeadersSyncPipeline};
use num_traits::Zero;
-use relay_utils::metrics::{register, GaugeVec, Metrics, Opts, Registry, U64};
+use relay_utils::metrics::{metric_name, register, GaugeVec, Opts, PrometheusError, Registry, U64};
/// Headers sync metrics.
#[derive(Clone)]
@@ -31,28 +31,31 @@ pub struct SyncLoopMetrics {
blocks_in_state: GaugeVec,
}
-impl Metrics for SyncLoopMetrics {
- fn register(&self, registry: &Registry) -> Result<(), String> {
- register(self.best_block_numbers.clone(), registry).map_err(|e| e.to_string())?;
- register(self.blocks_in_state.clone(), registry).map_err(|e| e.to_string())?;
- Ok(())
- }
-}
-
-impl Default for SyncLoopMetrics {
- fn default() -> Self {
- SyncLoopMetrics {
- best_block_numbers: GaugeVec::new(
- Opts::new("best_block_numbers", "Best block numbers on source and target nodes"),
- &["node"],
- )
- .expect("metric is static and thus valid; qed"),
- blocks_in_state: GaugeVec::new(
- Opts::new("blocks_in_state", "Number of blocks in given state"),
- &["state"],
- )
- .expect("metric is static and thus valid; qed"),
- }
+impl SyncLoopMetrics {
+ /// Create and register headers loop metrics.
+ pub fn new(registry: &Registry, prefix: Option<&str>) -> Result {
+ Ok(SyncLoopMetrics {
+ best_block_numbers: register(
+ GaugeVec::new(
+ Opts::new(
+ metric_name(prefix, "best_block_numbers"),
+ "Best block numbers on source and target nodes",
+ ),
+ &["node"],
+ )?,
+ registry,
+ )?,
+ blocks_in_state: register(
+ GaugeVec::new(
+ Opts::new(
+ metric_name(prefix, "blocks_in_state"),
+ "Number of blocks in given state",
+ ),
+ &["state"],
+ )?,
+ registry,
+ )?,
+ })
}
}
diff --git a/bridges/relays/messages/src/message_lane_loop.rs b/bridges/relays/messages/src/message_lane_loop.rs
index daa74b5073..41eee606d8 100644
--- a/bridges/relays/messages/src/message_lane_loop.rs
+++ b/bridges/relays/messages/src/message_lane_loop.rs
@@ -140,8 +140,8 @@ pub trait SourceClient: RelayClient {
proof: P::MessagesReceivingProof,
) -> Result<(), Self::Error>;
- /// Activate (or deactivate) headers relay that relays target headers to source node.
- async fn activate_target_to_source_headers_relay(&self, activate: bool);
+ /// We need given finalized target header on source to continue synchronization.
+ async fn require_target_header_on_source(&self, id: TargetHeaderIdOf);
}
/// Target client trait.
@@ -181,8 +181,8 @@ pub trait TargetClient: RelayClient {
proof: P::MessagesProof,
) -> Result, Self::Error>;
- /// Activate (or deactivate) headers relay that relays source headers to target node.
- async fn activate_source_to_target_headers_relay(&self, activate: bool);
+ /// We need given finalized source header on target to continue synchronization.
+ async fn require_source_header_on_target(&self, id: SourceHeaderIdOf);
}
/// State of the client.
@@ -232,9 +232,9 @@ pub async fn run(
let exit_signal = exit_signal.shared();
relay_utils::relay_loop(source_client, target_client)
.reconnect_delay(params.reconnect_delay)
- .with_metrics(metrics_prefix::(¶ms.lane), metrics_params)
- .loop_metric(MessageLaneLoopMetrics::default())?
- .standalone_metric(GlobalMetrics::default())?
+ .with_metrics(Some(metrics_prefix::
(¶ms.lane)), metrics_params)
+ .loop_metric(|registry, prefix| MessageLaneLoopMetrics::new(registry, prefix))?
+ .standalone_metric(|registry, prefix| GlobalMetrics::new(registry, prefix))?
.expose()
.await?
.run(|source_client, target_client, metrics| {
@@ -475,8 +475,10 @@ pub(crate) mod tests {
target_latest_received_nonce: MessageNonce,
target_latest_confirmed_received_nonce: MessageNonce,
submitted_messages_proofs: Vec,
- is_target_to_source_headers_relay_activated: bool,
- is_source_to_target_headers_relay_activated: bool,
+ target_to_source_header_required: Option,
+ target_to_source_header_requirements: Vec,
+ source_to_target_header_required: Option,
+ source_to_target_header_requirements: Vec,
}
#[derive(Clone)]
@@ -582,9 +584,10 @@ pub(crate) mod tests {
Ok(())
}
- async fn activate_target_to_source_headers_relay(&self, activate: bool) {
+ async fn require_target_header_on_source(&self, id: TargetHeaderIdOf) {
let mut data = self.data.lock();
- data.is_target_to_source_headers_relay_activated = activate;
+ data.target_to_source_header_required = Some(id);
+ data.target_to_source_header_requirements.push(id);
(self.tick)(&mut *data);
}
}
@@ -686,9 +689,10 @@ pub(crate) mod tests {
Ok(nonces)
}
- async fn activate_source_to_target_headers_relay(&self, activate: bool) {
+ async fn require_source_header_on_target(&self, id: SourceHeaderIdOf) {
let mut data = self.data.lock();
- data.is_source_to_target_headers_relay_activated = activate;
+ data.source_to_target_header_required = Some(id);
+ data.source_to_target_header_requirements.push(id);
(self.tick)(&mut *data);
}
}
@@ -806,16 +810,16 @@ pub(crate) mod tests {
},
Arc::new(|data: &mut TestClientData| {
// headers relay must only be started when we need new target headers at source node
- if data.is_target_to_source_headers_relay_activated {
+ if data.target_to_source_header_required.is_some() {
assert!(data.source_state.best_finalized_peer_at_best_self.0 < data.target_state.best_self.0);
- data.is_target_to_source_headers_relay_activated = false;
+ data.target_to_source_header_required = None;
}
}),
Arc::new(move |data: &mut TestClientData| {
// headers relay must only be started when we need new source headers at target node
- if data.is_target_to_source_headers_relay_activated {
+ if data.source_to_target_header_required.is_some() {
assert!(data.target_state.best_finalized_peer_at_best_self.0 < data.source_state.best_self.0);
- data.is_target_to_source_headers_relay_activated = false;
+ data.source_to_target_header_required = None;
}
// syncing source headers -> target chain (all at once)
if data.target_state.best_finalized_peer_at_best_self.0 < data.source_state.best_finalized_self.0 {
@@ -837,7 +841,7 @@ pub(crate) mod tests {
HeaderId(data.source_state.best_self.0 + 1, data.source_state.best_self.0 + 1);
data.source_state.best_finalized_self = data.source_state.best_self;
}
- // if source has received all messages receiving confirmations => increase source block so that confirmations may be sent
+ // if source has received all messages receiving confirmations => stop
if data.source_latest_confirmed_received_nonce == 10 {
exit_sender.unbounded_send(()).unwrap();
}
@@ -853,5 +857,9 @@ pub(crate) mod tests {
assert_eq!(result.submitted_messages_proofs[1].0, 5..=8);
assert_eq!(result.submitted_messages_proofs[2].0, 9..=10);
assert!(!result.submitted_messages_receiving_proofs.is_empty());
+
+ // check that we have at least once required new source->target or target->source headers
+ assert!(!result.target_to_source_header_requirements.is_empty());
+ assert!(!result.source_to_target_header_requirements.is_empty());
}
}
diff --git a/bridges/relays/messages/src/message_race_delivery.rs b/bridges/relays/messages/src/message_race_delivery.rs
index 3b1644b728..225c59f23c 100644
--- a/bridges/relays/messages/src/message_race_delivery.rs
+++ b/bridges/relays/messages/src/message_race_delivery.rs
@@ -166,8 +166,8 @@ where
type Error = C::Error;
type TargetNoncesData = DeliveryRaceTargetNoncesData;
- async fn require_more_source_headers(&self, activate: bool) {
- self.client.activate_source_to_target_headers_relay(activate).await
+ async fn require_source_header(&self, id: SourceHeaderIdOf) {
+ self.client.require_source_header_on_target(id).await
}
async fn nonces(
@@ -291,6 +291,10 @@ impl RaceStrategy, TargetHeaderIdOf, P::M
self.strategy.is_empty()
}
+ fn required_source_header_at_target(&self, current_best: &SourceHeaderIdOf
) -> Option> {
+ self.strategy.required_source_header_at_target(current_best)
+ }
+
fn best_at_source(&self) -> Option {
self.strategy.best_at_source()
}
diff --git a/bridges/relays/messages/src/message_race_loop.rs b/bridges/relays/messages/src/message_race_loop.rs
index f3bf2626ef..41f5ede103 100644
--- a/bridges/relays/messages/src/message_race_loop.rs
+++ b/bridges/relays/messages/src/message_race_loop.rs
@@ -123,8 +123,9 @@ pub trait TargetClient {
/// Type of the additional data from the target client, used by the race.
type TargetNoncesData: std::fmt::Debug;
- /// Ask headers relay to relay more headers from race source to race target.
- async fn require_more_source_headers(&self, activate: bool);
+ /// Ask headers relay to relay finalized headers up to (and including) given header
+ /// from race source to race target.
+ async fn require_source_header(&self, id: P::SourceHeaderId);
/// Return nonces that are known to the target client.
async fn nonces(
@@ -152,6 +153,8 @@ pub trait RaceStrategy: Debug {
/// Should return true if nothing has to be synced.
fn is_empty(&self) -> bool;
+ /// Return id of source header that is required to be on target to continue synchronization.
+ fn required_source_header_at_target(&self, current_best: &SourceHeaderId) -> Option;
/// Return best nonce at source node.
///
/// `Some` is returned only if we are sure that the value is greater or equal
@@ -219,7 +222,6 @@ pub async fn run, TC: TargetClient>(
TargetNoncesData = TC::TargetNoncesData,
>,
) -> Result<(), FailedClient> {
- let mut is_strategy_empty = true;
let mut progress_context = Instant::now();
let mut race_state = RaceState::default();
let mut stall_countdown = Instant::now();
@@ -307,6 +309,15 @@ pub async fn run, TC: TargetClient>(
async_std::task::sleep,
|| format!("Error retrieving nonces from {}", P::source_name()),
).fail_if_connection_error(FailedClient::Source)?;
+
+ // ask for more headers if we have nonces to deliver and required headers are missing
+ let required_source_header_id = race_state
+ .best_finalized_source_header_id_at_best_target
+ .as_ref()
+ .and_then(|best|strategy.required_source_header_at_target(best));
+ if let Some(required_source_header_id) = required_source_header_id {
+ race_target.require_source_header(required_source_header_id).await;
+ }
},
nonces = target_best_nonces => {
target_best_nonces_required = false;
@@ -408,13 +419,6 @@ pub async fn run, TC: TargetClient>(
progress_context = print_race_progress::
(progress_context, &strategy);
- // ask for more headers if we have nonces to deliver
- let prev_is_strategy_empty = is_strategy_empty;
- is_strategy_empty = strategy.is_empty();
- if is_strategy_empty != prev_is_strategy_empty {
- race_target.require_more_source_headers(!is_strategy_empty).await;
- }
-
if stall_countdown.elapsed() > stall_timeout {
log::warn!(
target: "bridge",
diff --git a/bridges/relays/messages/src/message_race_receiving.rs b/bridges/relays/messages/src/message_race_receiving.rs
index 6538061965..4381b63591 100644
--- a/bridges/relays/messages/src/message_race_receiving.rs
+++ b/bridges/relays/messages/src/message_race_receiving.rs
@@ -159,8 +159,8 @@ where
type Error = C::Error;
type TargetNoncesData = ();
- async fn require_more_source_headers(&self, activate: bool) {
- self.client.activate_target_to_source_headers_relay(activate).await
+ async fn require_source_header(&self, id: TargetHeaderIdOf
) {
+ self.client.require_target_header_on_source(id).await
}
async fn nonces(
diff --git a/bridges/relays/messages/src/message_race_strategy.rs b/bridges/relays/messages/src/message_race_strategy.rs
index 2d8e90d433..7088f8d74b 100644
--- a/bridges/relays/messages/src/message_race_strategy.rs
+++ b/bridges/relays/messages/src/message_race_strategy.rs
@@ -162,6 +162,15 @@ where
self.source_queue.is_empty()
}
+ fn required_source_header_at_target(
+ &self,
+ current_best: &HeaderId,
+ ) -> Option> {
+ self.source_queue
+ .back()
+ .and_then(|(h, _)| if h.0 > current_best.0 { Some(h.clone()) } else { None })
+ }
+
fn best_at_source(&self) -> Option {
let best_in_queue = self.source_queue.back().map(|(_, range)| range.end());
match (best_in_queue, self.best_target_nonce) {
diff --git a/bridges/relays/messages/src/metrics.rs b/bridges/relays/messages/src/metrics.rs
index a9a0ac4081..51a4118be8 100644
--- a/bridges/relays/messages/src/metrics.rs
+++ b/bridges/relays/messages/src/metrics.rs
@@ -20,7 +20,7 @@ use crate::message_lane::MessageLane;
use crate::message_lane_loop::{SourceClientState, TargetClientState};
use bp_messages::MessageNonce;
-use relay_utils::metrics::{register, GaugeVec, Metrics, Opts, Registry, U64};
+use relay_utils::metrics::{metric_name, register, GaugeVec, Opts, PrometheusError, Registry, U64};
/// Message lane relay metrics.
///
@@ -34,25 +34,28 @@ pub struct MessageLaneLoopMetrics {
lane_state_nonces: GaugeVec,
}
-impl Metrics for MessageLaneLoopMetrics {
- fn register(&self, registry: &Registry) -> Result<(), String> {
- register(self.best_block_numbers.clone(), registry).map_err(|e| e.to_string())?;
- register(self.lane_state_nonces.clone(), registry).map_err(|e| e.to_string())?;
- Ok(())
- }
-}
-
-impl Default for MessageLaneLoopMetrics {
- fn default() -> Self {
- MessageLaneLoopMetrics {
- best_block_numbers: GaugeVec::new(
- Opts::new("best_block_numbers", "Best finalized block numbers"),
- &["type"],
- )
- .expect("metric is static and thus valid; qed"),
- lane_state_nonces: GaugeVec::new(Opts::new("lane_state_nonces", "Nonces of the lane state"), &["type"])
- .expect("metric is static and thus valid; qed"),
- }
+impl MessageLaneLoopMetrics {
+ /// Create and register messages loop metrics.
+ pub fn new(registry: &Registry, prefix: Option<&str>) -> Result {
+ Ok(MessageLaneLoopMetrics {
+ best_block_numbers: register(
+ GaugeVec::new(
+ Opts::new(
+ metric_name(prefix, "best_block_numbers"),
+ "Best finalized block numbers",
+ ),
+ &["type"],
+ )?,
+ registry,
+ )?,
+ lane_state_nonces: register(
+ GaugeVec::new(
+ Opts::new(metric_name(prefix, "lane_state_nonces"), "Nonces of the lane state"),
+ &["type"],
+ )?,
+ registry,
+ )?,
+ })
}
}
diff --git a/bridges/relays/utils/src/metrics.rs b/bridges/relays/utils/src/metrics.rs
index 789ff8a108..c0eaeae337 100644
--- a/bridges/relays/utils/src/metrics.rs
+++ b/bridges/relays/utils/src/metrics.rs
@@ -18,7 +18,7 @@ pub use float_json_value::FloatJsonValueMetric;
pub use global::GlobalMetrics;
pub use substrate_prometheus_endpoint::{
prometheus::core::{Atomic, Collector},
- register, Counter, CounterVec, Gauge, GaugeVec, Opts, Registry, F64, U64,
+ register, Counter, CounterVec, Gauge, GaugeVec, Opts, PrometheusError, Registry, F64, U64,
};
use async_trait::async_trait;
@@ -43,13 +43,14 @@ pub struct MetricsParams {
pub address: Option,
/// Metrics registry. May be `Some(_)` if several components share the same endpoint.
pub registry: Option,
+ /// Prefix that must be used in metric names.
+ pub metrics_prefix: Option,
}
/// Metrics API.
-pub trait Metrics: Clone + Send + Sync + 'static {
- /// Register metrics in the registry.
- fn register(&self, registry: &Registry) -> Result<(), String>;
-}
+pub trait Metrics: Clone + Send + Sync + 'static {}
+
+impl Metrics for T {}
/// Standalone metrics API.
///
@@ -90,8 +91,21 @@ impl MetricsParams {
MetricsParams {
address: None,
registry: None,
+ metrics_prefix: None,
}
}
+
+ /// Do not expose metrics.
+ pub fn disable(mut self) -> Self {
+ self.address = None;
+ self
+ }
+
+ /// Set prefix to use in metric names.
+ pub fn metrics_prefix(mut self, prefix: String) -> Self {
+ self.metrics_prefix = Some(prefix);
+ self
+ }
}
impl From> for MetricsParams {
@@ -99,10 +113,20 @@ impl From > for MetricsParams {
MetricsParams {
address,
registry: None,
+ metrics_prefix: None,
}
}
}
+/// Returns metric name optionally prefixed with given prefix.
+pub fn metric_name(prefix: Option<&str>, name: &str) -> String {
+ if let Some(prefix) = prefix {
+ format!("{}_{}", prefix, name)
+ } else {
+ name.into()
+ }
+}
+
/// Set value of gauge metric.
///
/// If value is `Ok(None)` or `Err(_)`, metric would have default value.
diff --git a/bridges/relays/utils/src/metrics/float_json_value.rs b/bridges/relays/utils/src/metrics/float_json_value.rs
index 902b3e8195..d61f9cac7c 100644
--- a/bridges/relays/utils/src/metrics/float_json_value.rs
+++ b/bridges/relays/utils/src/metrics/float_json_value.rs
@@ -14,7 +14,7 @@
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see .
-use crate::metrics::{register, Gauge, Metrics, Registry, StandaloneMetrics, F64};
+use crate::metrics::{metric_name, register, Gauge, PrometheusError, Registry, StandaloneMetrics, F64};
use async_trait::async_trait;
use std::time::Duration;
@@ -32,16 +32,19 @@ pub struct FloatJsonValueMetric {
impl FloatJsonValueMetric {
/// Create new metric instance with given name and help.
- pub fn new(url: String, json_path: String, name: String, help: String) -> Self {
- FloatJsonValueMetric {
+ pub fn new(
+ registry: &Registry,
+ prefix: Option<&str>,
+ url: String,
+ json_path: String,
+ name: String,
+ help: String,
+ ) -> Result {
+ Ok(FloatJsonValueMetric {
url,
json_path,
- metric: Gauge::new(name, help).expect(
- "only fails if gauge options are customized;\
- we use default options;\
- qed",
- ),
- }
+ metric: register(Gauge::new(metric_name(prefix, &name), help)?, registry)?,
+ })
}
/// Read value from HTTP service.
@@ -69,13 +72,6 @@ impl FloatJsonValueMetric {
}
}
-impl Metrics for FloatJsonValueMetric {
- fn register(&self, registry: &Registry) -> Result<(), String> {
- register(self.metric.clone(), registry).map_err(|e| e.to_string())?;
- Ok(())
- }
-}
-
#[async_trait]
impl StandaloneMetrics for FloatJsonValueMetric {
fn update_interval(&self) -> Duration {
diff --git a/bridges/relays/utils/src/metrics/global.rs b/bridges/relays/utils/src/metrics/global.rs
index 98c7d9a570..d212480510 100644
--- a/bridges/relays/utils/src/metrics/global.rs
+++ b/bridges/relays/utils/src/metrics/global.rs
@@ -16,7 +16,9 @@
//! Global system-wide Prometheus metrics exposed by relays.
-use crate::metrics::{register, Gauge, GaugeVec, Metrics, Opts, Registry, StandaloneMetrics, F64, U64};
+use crate::metrics::{
+ metric_name, register, Gauge, GaugeVec, Opts, PrometheusError, Registry, StandaloneMetrics, F64, U64,
+};
use async_std::sync::{Arc, Mutex};
use async_trait::async_trait;
@@ -35,12 +37,30 @@ pub struct GlobalMetrics {
process_memory_usage_bytes: Gauge,
}
-impl Metrics for GlobalMetrics {
- fn register(&self, registry: &Registry) -> Result<(), String> {
- register(self.system_average_load.clone(), registry).map_err(|e| e.to_string())?;
- register(self.process_cpu_usage_percentage.clone(), registry).map_err(|e| e.to_string())?;
- register(self.process_memory_usage_bytes.clone(), registry).map_err(|e| e.to_string())?;
- Ok(())
+impl GlobalMetrics {
+ /// Create and register global metrics.
+ pub fn new(registry: &Registry, prefix: Option<&str>) -> Result {
+ Ok(GlobalMetrics {
+ system: Arc::new(Mutex::new(System::new_with_specifics(RefreshKind::everything()))),
+ system_average_load: register(
+ GaugeVec::new(
+ Opts::new(metric_name(prefix, "system_average_load"), "System load average"),
+ &["over"],
+ )?,
+ registry,
+ )?,
+ process_cpu_usage_percentage: register(
+ Gauge::new(metric_name(prefix, "process_cpu_usage_percentage"), "Process CPU usage")?,
+ registry,
+ )?,
+ process_memory_usage_bytes: register(
+ Gauge::new(
+ metric_name(prefix, "process_memory_usage_bytes"),
+ "Process memory (resident set size) usage",
+ )?,
+ registry,
+ )?,
+ })
}
}
@@ -89,20 +109,3 @@ impl StandaloneMetrics for GlobalMetrics {
UPDATE_INTERVAL
}
}
-
-impl Default for GlobalMetrics {
- fn default() -> Self {
- GlobalMetrics {
- system: Arc::new(Mutex::new(System::new_with_specifics(RefreshKind::everything()))),
- system_average_load: GaugeVec::new(Opts::new("system_average_load", "System load average"), &["over"])
- .expect("metric is static and thus valid; qed"),
- process_cpu_usage_percentage: Gauge::new("process_cpu_usage_percentage", "Process CPU usage")
- .expect("metric is static and thus valid; qed"),
- process_memory_usage_bytes: Gauge::new(
- "process_memory_usage_bytes",
- "Process memory (resident set size) usage",
- )
- .expect("metric is static and thus valid; qed"),
- }
- }
-}
diff --git a/bridges/relays/utils/src/relay_loop.rs b/bridges/relays/utils/src/relay_loop.rs
index 1a8d63e1db..8790b0913e 100644
--- a/bridges/relays/utils/src/relay_loop.rs
+++ b/bridges/relays/utils/src/relay_loop.rs
@@ -14,7 +14,7 @@
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see .
-use crate::metrics::{Metrics, MetricsAddress, MetricsParams, StandaloneMetrics};
+use crate::metrics::{Metrics, MetricsAddress, MetricsParams, PrometheusError, StandaloneMetrics};
use crate::{FailedClient, MaybeConnectionError};
use async_trait::async_trait;
@@ -45,9 +45,7 @@ pub fn relay_loop(source_client: SC, target_client: TC) -> Loop) -> LoopMetrics<(), (), ()> {
- assert!(!prefix.is_empty(), "Metrics prefix can not be empty");
-
+pub fn relay_metrics(prefix: Option, params: MetricsParams) -> LoopMetrics<(), (), ()> {
LoopMetrics {
relay_loop: Loop {
reconnect_delay: RECONNECT_DELAY,
@@ -55,9 +53,9 @@ pub fn relay_metrics(prefix: String, address: Option) -> LoopMet
target_client: (),
loop_metric: None,
},
- address,
- registry: Registry::new_custom(Some(prefix), None)
- .expect("only fails if prefix is empty; prefix is not empty; qed"),
+ address: params.address,
+ registry: params.registry.unwrap_or_else(|| create_metrics_registry(prefix)),
+ metrics_prefix: params.metrics_prefix,
loop_metric: None,
}
}
@@ -75,6 +73,7 @@ pub struct LoopMetrics {
relay_loop: Loop,
address: Option,
registry: Registry,
+ metrics_prefix: Option,
loop_metric: Option,
}
@@ -86,11 +85,7 @@ impl Loop {
}
/// Start building loop metrics using given prefix.
- ///
- /// Panics if `prefix` is empty.
- pub fn with_metrics(self, prefix: String, params: MetricsParams) -> LoopMetrics {
- assert!(!prefix.is_empty(), "Metrics prefix can not be empty");
-
+ pub fn with_metrics(self, prefix: Option, params: MetricsParams) -> LoopMetrics {
LoopMetrics {
relay_loop: Loop {
reconnect_delay: self.reconnect_delay,
@@ -99,11 +94,8 @@ impl Loop {
loop_metric: None,
},
address: params.address,
- registry: match params.registry {
- Some(registry) => registry,
- None => Registry::new_custom(Some(prefix), None)
- .expect("only fails if prefix is empty; prefix is not empty; qed"),
- },
+ registry: params.registry.unwrap_or_else(|| create_metrics_registry(prefix)),
+ metrics_prefix: params.metrics_prefix,
loop_metric: None,
}
}
@@ -177,21 +169,34 @@ impl LoopMetrics {
/// Add relay loop metrics.
///
/// Loop metrics will be passed to the loop callback.
- pub fn loop_metric(self, loop_metric: NewLM) -> Result, String> {
- loop_metric.register(&self.registry)?;
+ pub fn loop_metric(
+ self,
+ create_metric: impl FnOnce(&Registry, Option<&str>) -> Result,
+ ) -> Result, String> {
+ let loop_metric = create_metric(&self.registry, self.metrics_prefix.as_deref()).map_err(|e| e.to_string())?;
Ok(LoopMetrics {
relay_loop: self.relay_loop,
address: self.address,
registry: self.registry,
+ metrics_prefix: self.metrics_prefix,
loop_metric: Some(loop_metric),
})
}
/// Add standalone metrics.
- pub fn standalone_metric(self, standalone_metrics: M) -> Result {
- standalone_metrics.register(&self.registry)?;
- standalone_metrics.spawn();
+ pub fn standalone_metric(
+ self,
+ create_metric: impl FnOnce(&Registry, Option<&str>) -> Result,
+ ) -> Result {
+ // since standalone metrics are updating themselves, we may just ignore the fact that the same
+ // standalone metric is exposed by several loops && only spawn single metric
+ match create_metric(&self.registry, self.metrics_prefix.as_deref()) {
+ Ok(standalone_metrics) => standalone_metrics.spawn(),
+ Err(PrometheusError::AlreadyReg) => (),
+ Err(e) => return Err(e.to_string()),
+ }
+
Ok(self)
}
@@ -200,6 +205,7 @@ impl LoopMetrics {
MetricsParams {
address: self.address,
registry: Some(self.registry),
+ metrics_prefix: self.metrics_prefix,
}
}
@@ -237,3 +243,14 @@ impl LoopMetrics {
})
}
}
+
+/// Create new registry with global metrics.
+fn create_metrics_registry(prefix: Option) -> Registry {
+ match prefix {
+ Some(prefix) => {
+ assert!(!prefix.is_empty(), "Metrics prefix can not be empty");
+ Registry::new_custom(Some(prefix), None).expect("only fails if prefix is empty; prefix is not empty; qed")
+ }
+ None => Registry::new(),
+ }
+}