Complex headers+messages Millau<->Rialto relay (#878)

* complex headers+messages relay

* post-merge fix

* fix + test issue with on-demand not starting
This commit is contained in:
Svyatoslav Nikolsky
2021-04-14 13:01:03 +03:00
committed by Bastian Köcher
parent 0d60f42b5e
commit e2131724fb
29 changed files with 931 additions and 308 deletions
@@ -33,6 +33,7 @@ pub(crate) mod send_message;
mod derive_account;
mod init_bridge;
mod relay_headers;
mod relay_headers_and_messages;
mod relay_messages;
/// Parse relay CLI args.
@@ -54,6 +55,13 @@ pub enum Command {
/// Ties up to `Messages` pallets on both chains and starts relaying messages.
/// Requires the header relay to be already running.
RelayMessages(relay_messages::RelayMessages),
/// Start headers and messages relay between two Substrate chains.
///
/// This high-level relay internally starts four low-level relays: two `RelayHeaders`
/// and two `RelayMessages` relays. Headers are only relayed when they are required by
/// the message relays - i.e. when there are messages or confirmations that needs to be
/// relayed between chains.
RelayHeadersAndMessages(relay_headers_and_messages::RelayHeadersAndMessages),
/// Initialize on-chain bridge pallet with current header data.
///
/// Sends initialization transaction to bootstrap the bridge with current finalized block data.
@@ -86,6 +94,7 @@ impl Command {
match self {
Self::RelayHeaders(arg) => arg.run().await?,
Self::RelayMessages(arg) => arg.run().await?,
Self::RelayHeadersAndMessages(arg) => arg.run().await?,
Self::InitBridge(arg) => arg.run().await?,
Self::SendMessage(arg) => arg.run().await?,
Self::EncodeCall(arg) => arg.run().await?,
@@ -0,0 +1,183 @@
// Copyright 2019-2021 Parity Technologies (UK) Ltd.
// This file is part of Parity Bridges Common.
// Parity Bridges Common is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Bridges Common is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
//! Complex headers+messages relays support.
//!
//! To add new complex relay between `ChainA` and `ChainB`, you must:
//!
//! 1) ensure that there's a `declare_chain_options!(...)` for both chains;
//! 2) add `declare_bridge_options!(...)` for the bridge;
//! 3) add bridge support to the `select_bridge! { ... }` macro.
use crate::cli::{CliChain, HexLaneId, PrometheusParams};
use crate::declare_chain_options;
use crate::messages_lane::MessagesRelayParams;
use crate::on_demand_headers::OnDemandHeadersRelay;
use futures::{FutureExt, TryFutureExt};
use relay_utils::metrics::MetricsParams;
use structopt::StructOpt;
/// Start headers+messages relayer process.
#[derive(StructOpt)]
pub enum RelayHeadersAndMessages {
MillauRialto(MillauRialtoHeadersAndMessages),
}
/// Parameters that have the same names across all bridges.
#[derive(StructOpt)]
pub struct HeadersAndMessagesSharedParams {
/// Hex-encoded lane id that should be served by the relay. Defaults to `00000000`.
#[structopt(long, default_value = "00000000")]
lane: HexLaneId,
#[structopt(flatten)]
prometheus_params: PrometheusParams,
}
// The reason behind this macro is that 'normal' relays are using source and target chains terminology,
// which is unusable for both-way relays (if you're relaying headers from Rialto to Millau and from
// Millau to Rialto, then which chain is source?).
macro_rules! declare_bridge_options {
($chain1:ident, $chain2:ident) => {
paste::item! {
#[doc = $chain1 " and " $chain2 " headers+messages relay params."]
#[derive(StructOpt)]
pub struct [<$chain1 $chain2 HeadersAndMessages>] {
#[structopt(flatten)]
shared: HeadersAndMessagesSharedParams,
#[structopt(flatten)]
left: [<$chain1 ConnectionParams>],
#[structopt(flatten)]
left_sign: [<$chain1 SigningParams>],
#[structopt(flatten)]
right: [<$chain2 ConnectionParams>],
#[structopt(flatten)]
right_sign: [<$chain2 SigningParams>],
}
#[allow(unreachable_patterns)]
impl From<RelayHeadersAndMessages> for [<$chain1 $chain2 HeadersAndMessages>] {
fn from(relay_params: RelayHeadersAndMessages) -> [<$chain1 $chain2 HeadersAndMessages>] {
match relay_params {
RelayHeadersAndMessages::[<$chain1 $chain2>](params) => params,
_ => unreachable!(),
}
}
}
}
};
}
macro_rules! select_bridge {
($bridge: expr, $generic: tt) => {
match $bridge {
RelayHeadersAndMessages::MillauRialto(_) => {
type Params = MillauRialtoHeadersAndMessages;
type Left = relay_millau_client::Millau;
type Right = relay_rialto_client::Rialto;
type LeftToRightFinality = crate::rialto_millau::millau_headers_to_rialto::MillauFinalityToRialto;
type RightToLeftFinality = crate::rialto_millau::rialto_headers_to_millau::RialtoFinalityToMillau;
type LeftToRightMessages = crate::rialto_millau::millau_messages_to_rialto::MillauMessagesToRialto;
type RightToLeftMessages = crate::rialto_millau::rialto_messages_to_millau::RialtoMessagesToMillau;
use crate::rialto_millau::millau_messages_to_rialto::run as left_to_right_messages;
use crate::rialto_millau::rialto_messages_to_millau::run as right_to_left_messages;
$generic
}
}
};
}
// All supported chains.
declare_chain_options!(Millau, millau);
declare_chain_options!(Rialto, rialto);
// All supported bridges.
declare_bridge_options!(Millau, Rialto);
impl RelayHeadersAndMessages {
/// Run the command.
pub async fn run(self) -> anyhow::Result<()> {
select_bridge!(self, {
let params: Params = self.into();
let left_client = params.left.to_client::<Left>().await?;
let left_sign = params.left_sign.to_keypair::<Left>()?;
let right_client = params.right.to_client::<Right>().await?;
let right_sign = params.right_sign.to_keypair::<Right>()?;
let lane = params.shared.lane.into();
let metrics_params: MetricsParams = params.shared.prometheus_params.into();
let metrics_params = relay_utils::relay_metrics(None, metrics_params).into_params();
let left_to_right_on_demand_headers = OnDemandHeadersRelay::new(
left_client.clone(),
right_client.clone(),
LeftToRightFinality::new(right_client.clone(), right_sign.clone()),
);
let right_to_left_on_demand_headers = OnDemandHeadersRelay::new(
right_client.clone(),
left_client.clone(),
RightToLeftFinality::new(left_client.clone(), left_sign.clone()),
);
let left_to_right_messages = left_to_right_messages(MessagesRelayParams {
source_client: left_client.clone(),
source_sign: left_sign.clone(),
target_client: right_client.clone(),
target_sign: right_sign.clone(),
source_to_target_headers_relay: Some(left_to_right_on_demand_headers.clone()),
target_to_source_headers_relay: Some(right_to_left_on_demand_headers.clone()),
lane_id: lane,
metrics_params: metrics_params
.clone()
.disable()
.metrics_prefix(messages_relay::message_lane_loop::metrics_prefix::<LeftToRightMessages>(&lane)),
})
.map_err(|e| anyhow::format_err!("{}", e))
.boxed();
let right_to_left_messages = right_to_left_messages(MessagesRelayParams {
source_client: right_client,
source_sign: right_sign,
target_client: left_client.clone(),
target_sign: left_sign.clone(),
source_to_target_headers_relay: Some(right_to_left_on_demand_headers),
target_to_source_headers_relay: Some(left_to_right_on_demand_headers),
lane_id: lane,
metrics_params: metrics_params
.clone()
.disable()
.metrics_prefix(messages_relay::message_lane_loop::metrics_prefix::<RightToLeftMessages>(&lane)),
})
.map_err(|e| anyhow::format_err!("{}", e))
.boxed();
relay_utils::relay_metrics(None, metrics_params)
.expose()
.await
.map_err(|e| anyhow::format_err!("{}", e))?;
futures::future::select(left_to_right_messages, right_to_left_messages)
.await
.factor_first()
.0
})
}
}
@@ -19,7 +19,9 @@ use crate::cli::{
HexLaneId, PrometheusParams, SourceConnectionParams, SourceSigningParams, TargetConnectionParams,
TargetSigningParams,
};
use crate::messages_lane::MessagesRelayParams;
use crate::select_full_bridge;
use structopt::StructOpt;
/// Start messages relayer process.
@@ -52,14 +54,16 @@ impl RelayMessages {
let target_client = self.target.to_client::<Target>().await?;
let target_sign = self.target_sign.to_keypair::<Target>()?;
relay_messages(
relay_messages(MessagesRelayParams {
source_client,
source_sign,
target_client,
target_sign,
self.lane.into(),
self.prometheus_params.into(),
)
source_to_target_headers_relay: None,
target_to_source_headers_relay: None,
lane_id: self.lane.into(),
metrics_params: self.prometheus_params.into(),
})
.await
.map_err(|e| anyhow::format_err!("{}", e))
})
+1
View File
@@ -27,6 +27,7 @@ mod headers_initialize;
mod messages_lane;
mod messages_source;
mod messages_target;
mod on_demand_headers;
mod rialto_millau;
@@ -16,15 +16,36 @@
use crate::messages_source::SubstrateMessagesProof;
use crate::messages_target::SubstrateMessagesReceivingProof;
use crate::on_demand_headers::OnDemandHeadersRelay;
use bp_messages::MessageNonce;
use bp_messages::{LaneId, MessageNonce};
use frame_support::weights::Weight;
use messages_relay::message_lane::{MessageLane, SourceHeaderIdOf, TargetHeaderIdOf};
use relay_substrate_client::{BlockNumberOf, Chain, Client, HashOf};
use relay_utils::BlockNumberBase;
use relay_utils::{metrics::MetricsParams, BlockNumberBase};
use sp_core::Bytes;
use std::ops::RangeInclusive;
/// Substrate <-> Substrate messages relay parameters.
pub struct MessagesRelayParams<SC: Chain, SS, TC: Chain, TS> {
/// Messages source client.
pub source_client: Client<SC>,
/// Sign parameters for messages source chain.
pub source_sign: SS,
/// Messages target client.
pub target_client: Client<TC>,
/// Sign parameters for messages target chain.
pub target_sign: TS,
/// Optional on-demand source to target headers relay.
pub source_to_target_headers_relay: Option<OnDemandHeadersRelay<SC>>,
/// Optional on-demand target to source headers relay.
pub target_to_source_headers_relay: Option<OnDemandHeadersRelay<TC>>,
/// Identifier of lane that needs to be served.
pub lane_id: LaneId,
/// Metrics parameters.
pub metrics_params: MetricsParams,
}
/// Message sync pipeline for Substrate <-> Substrate relays.
pub trait SubstrateMessageLane: MessageLane {
/// Name of the runtime method that returns dispatch weight of outbound messages at the source chain.
@@ -19,6 +19,7 @@
//! <BridgedName> chain.
use crate::messages_lane::SubstrateMessageLane;
use crate::on_demand_headers::OnDemandHeadersRelay;
use async_trait::async_trait;
use bp_messages::{LaneId, MessageNonce};
@@ -45,22 +46,30 @@ use std::{marker::PhantomData, ops::RangeInclusive};
pub type SubstrateMessagesProof<C> = (Weight, FromBridgedChainMessagesProof<HashOf<C>>);
/// Substrate client as Substrate messages source.
pub struct SubstrateMessagesSource<C: Chain, P, R, I> {
pub struct SubstrateMessagesSource<C: Chain, P: SubstrateMessageLane, R, I> {
client: Client<C>,
lane: P,
lane_id: LaneId,
instance: InstanceId,
target_to_source_headers_relay: Option<OnDemandHeadersRelay<P::TargetChain>>,
_phantom: PhantomData<(R, I)>,
}
impl<C: Chain, P, R, I> SubstrateMessagesSource<C, P, R, I> {
impl<C: Chain, P: SubstrateMessageLane, R, I> SubstrateMessagesSource<C, P, R, I> {
/// Create new Substrate headers source.
pub fn new(client: Client<C>, lane: P, lane_id: LaneId, instance: InstanceId) -> Self {
pub fn new(
client: Client<C>,
lane: P,
lane_id: LaneId,
instance: InstanceId,
target_to_source_headers_relay: Option<OnDemandHeadersRelay<P::TargetChain>>,
) -> Self {
SubstrateMessagesSource {
client,
lane,
lane_id,
instance,
target_to_source_headers_relay,
_phantom: Default::default(),
}
}
@@ -73,6 +82,7 @@ impl<C: Chain, P: SubstrateMessageLane, R, I> Clone for SubstrateMessagesSource<
lane: self.lane.clone(),
lane_id: self.lane_id,
instance: self.instance,
target_to_source_headers_relay: self.target_to_source_headers_relay.clone(),
_phantom: Default::default(),
}
}
@@ -106,6 +116,7 @@ where
SourceHeaderHash = <C::Header as HeaderT>::Hash,
SourceChain = C,
>,
P::TargetChain: Chain<Hash = P::TargetHeaderHash, BlockNumber = P::TargetHeaderNumber>,
P::TargetHeaderNumber: Decode,
P::TargetHeaderHash: Decode,
R: Send + Sync + MessagesConfig<I>,
@@ -226,7 +237,11 @@ where
Ok(())
}
async fn activate_target_to_source_headers_relay(&self, _activate: bool) {}
async fn require_target_header_on_source(&self, id: TargetHeaderIdOf<P>) {
if let Some(ref target_to_source_headers_relay) = self.target_to_source_headers_relay {
target_to_source_headers_relay.require_finalized_header(id);
}
}
}
pub async fn read_client_state<SelfChain, BridgedHeaderHash, BridgedHeaderNumber>(
@@ -20,6 +20,7 @@
use crate::messages_lane::SubstrateMessageLane;
use crate::messages_source::read_client_state;
use crate::on_demand_headers::OnDemandHeadersRelay;
use async_trait::async_trait;
use bp_messages::{LaneId, MessageNonce, UnrewardedRelayersState};
@@ -45,22 +46,30 @@ pub type SubstrateMessagesReceivingProof<C> = (
);
/// Substrate client as Substrate messages target.
pub struct SubstrateMessagesTarget<C: Chain, P, R, I> {
pub struct SubstrateMessagesTarget<C: Chain, P: SubstrateMessageLane, R, I> {
client: Client<C>,
lane: P,
lane_id: LaneId,
instance: InstanceId,
source_to_target_headers_relay: Option<OnDemandHeadersRelay<P::SourceChain>>,
_phantom: PhantomData<(R, I)>,
}
impl<C: Chain, P, R, I> SubstrateMessagesTarget<C, P, R, I> {
impl<C: Chain, P: SubstrateMessageLane, R, I> SubstrateMessagesTarget<C, P, R, I> {
/// Create new Substrate headers target.
pub fn new(client: Client<C>, lane: P, lane_id: LaneId, instance: InstanceId) -> Self {
pub fn new(
client: Client<C>,
lane: P,
lane_id: LaneId,
instance: InstanceId,
source_to_target_headers_relay: Option<OnDemandHeadersRelay<P::SourceChain>>,
) -> Self {
SubstrateMessagesTarget {
client,
lane,
lane_id,
instance,
source_to_target_headers_relay,
_phantom: Default::default(),
}
}
@@ -73,6 +82,7 @@ impl<C: Chain, P: SubstrateMessageLane, R, I> Clone for SubstrateMessagesTarget<
lane: self.lane.clone(),
lane_id: self.lane_id,
instance: self.instance,
source_to_target_headers_relay: self.source_to_target_headers_relay.clone(),
_phantom: Default::default(),
}
}
@@ -106,6 +116,7 @@ where
TargetHeaderNumber = <C::Header as HeaderT>::Number,
TargetHeaderHash = <C::Header as HeaderT>::Hash,
>,
P::SourceChain: Chain<Hash = P::SourceHeaderHash, BlockNumber = P::SourceHeaderNumber>,
P::SourceHeaderNumber: Decode,
P::SourceHeaderHash: Decode,
R: Send + Sync + MessagesConfig<I>,
@@ -213,5 +224,9 @@ where
Ok(nonces)
}
async fn activate_source_to_target_headers_relay(&self, _activate: bool) {}
async fn require_source_header_on_target(&self, id: SourceHeaderIdOf<P>) {
if let Some(ref source_to_target_headers_relay) = self.source_to_target_headers_relay {
source_to_target_headers_relay.require_finalized_header(id);
}
}
}
@@ -0,0 +1,255 @@
// Copyright 2019-2021 Parity Technologies (UK) Ltd.
// This file is part of Parity Bridges Common.
// Parity Bridges Common is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Bridges Common is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
//! On-demand Substrate -> Substrate headers relay.
use crate::finality_pipeline::{SubstrateFinalitySyncPipeline, SubstrateFinalityToSubstrate};
use crate::finality_target::SubstrateFinalityTarget;
use bp_header_chain::justification::GrandpaJustification;
use finality_relay::TargetClient as FinalityTargetClient;
use futures::{
channel::{mpsc, oneshot},
select, FutureExt, StreamExt,
};
use num_traits::Zero;
use relay_substrate_client::{BlockNumberOf, Chain, Client, HashOf, HeaderIdOf, SyncHeader};
use relay_utils::{metrics::MetricsParams, BlockNumberBase, HeaderId};
use std::fmt::Debug;
/// On-demand Substrate <-> Substrate headers relay.
///
/// This relay may be started by messages whenever some other relay (e.g. messages relay) needs more
/// headers to be relayed to continue its regular work. When enough headers are relayed, on-demand
/// relay may be deactivated.
#[derive(Clone)]
pub struct OnDemandHeadersRelay<SourceChain: Chain> {
/// Background task name.
background_task_name: String,
/// Required headers to background sender.
required_header_tx: mpsc::Sender<HeaderId<SourceChain::Hash, SourceChain::BlockNumber>>,
}
impl<SourceChain: Chain> OnDemandHeadersRelay<SourceChain> {
/// Create new on-demand headers relay.
pub fn new<TargetChain: Chain, TargetSign>(
source_client: Client<SourceChain>,
target_client: Client<TargetChain>,
pipeline: SubstrateFinalityToSubstrate<SourceChain, TargetChain, TargetSign>,
) -> Self
where
SourceChain: Chain + Debug,
SourceChain::BlockNumber: BlockNumberBase,
TargetChain: Chain + Debug,
TargetChain::BlockNumber: BlockNumberBase,
TargetSign: Clone + Send + Sync + 'static,
SubstrateFinalityToSubstrate<SourceChain, TargetChain, TargetSign>: SubstrateFinalitySyncPipeline<
Hash = HashOf<SourceChain>,
Number = BlockNumberOf<SourceChain>,
Header = SyncHeader<SourceChain::Header>,
FinalityProof = GrandpaJustification<SourceChain::Header>,
TargetChain = TargetChain,
>,
SubstrateFinalityTarget<TargetChain, SubstrateFinalityToSubstrate<SourceChain, TargetChain, TargetSign>>:
FinalityTargetClient<SubstrateFinalityToSubstrate<SourceChain, TargetChain, TargetSign>>,
{
let (required_header_tx, required_header_rx) = mpsc::channel(1);
async_std::task::spawn(async move {
background_task(source_client, target_client, pipeline, required_header_rx).await;
});
let background_task_name = format!(
"{}-background",
on_demand_headers_relay_name::<SourceChain, TargetChain>()
);
OnDemandHeadersRelay {
background_task_name,
required_header_tx,
}
}
/// Someone is asking us to relay given finalized header.
pub fn require_finalized_header(&self, header_id: HeaderIdOf<SourceChain>) {
if let Err(error) = self.required_header_tx.clone().try_send(header_id) {
log::error!(
target: "bridge",
"Failed to send require header id {:?} to {:?}: {:?}",
header_id,
self.background_task_name,
error,
);
}
}
}
/// Background task that is responsible for starting and stopping headers relay when required.
async fn background_task<SourceChain, TargetChain, TargetSign>(
source_client: Client<SourceChain>,
target_client: Client<TargetChain>,
pipeline: SubstrateFinalityToSubstrate<SourceChain, TargetChain, TargetSign>,
mut required_header_rx: mpsc::Receiver<HeaderIdOf<SourceChain>>,
) where
SourceChain: Chain + Debug,
SourceChain::BlockNumber: BlockNumberBase,
TargetChain: Chain + Debug,
TargetChain::BlockNumber: BlockNumberBase,
TargetSign: Clone + Send + Sync + 'static,
SubstrateFinalityToSubstrate<SourceChain, TargetChain, TargetSign>: SubstrateFinalitySyncPipeline<
Hash = HashOf<SourceChain>,
Number = BlockNumberOf<SourceChain>,
Header = SyncHeader<SourceChain::Header>,
FinalityProof = GrandpaJustification<SourceChain::Header>,
TargetChain = TargetChain,
>,
SubstrateFinalityTarget<TargetChain, SubstrateFinalityToSubstrate<SourceChain, TargetChain, TargetSign>>:
FinalityTargetClient<SubstrateFinalityToSubstrate<SourceChain, TargetChain, TargetSign>>,
{
let relay_task_name = on_demand_headers_relay_name::<SourceChain, TargetChain>();
let finality_target = SubstrateFinalityTarget::new(target_client.clone(), pipeline.clone());
let mut active_headers_relay = None;
let mut required_header_number = Zero::zero();
let mut relay_exited_rx = futures::future::pending().left_future();
loop {
// wait for next target block or for new required header
select! {
_ = async_std::task::sleep(TargetChain::AVERAGE_BLOCK_INTERVAL).fuse() => {},
required_header_id = required_header_rx.next() => {
match required_header_id {
Some(required_header_id) => {
if required_header_id.0 > required_header_number {
required_header_number = required_header_id.0;
}
},
None => {
// that's the only way to exit background task - to drop `required_header_tx`
break
},
}
},
_ = relay_exited_rx => {
// there could be a situation when we're receiving exit signals after we
// have already stopped relay or when we have already started new relay.
// but it isn't critical, because even if we'll accidentally stop new relay
// we'll restart it almost immediately
stop_on_demand_headers_relay(active_headers_relay.take()).await;
},
}
// read best finalized source block from target
let available_header_number = match finality_target.best_finalized_source_block_number().await {
Ok(available_header_number) => available_header_number,
Err(error) => {
log::error!(
target: "bridge",
"Failed to read best finalized {} header from {} in {} relay: {:?}",
SourceChain::NAME,
TargetChain::NAME,
relay_task_name,
error,
);
// we don't know what's happening with target client, so better to stop on-demand relay than
// submit unneeded transactions
// => assume that required header is known to the target node
required_header_number
}
};
// start or stop headers relay if required
let activate = required_header_number > available_header_number;
match (activate, active_headers_relay.is_some()) {
(true, false) => {
let (relay_exited_tx, new_relay_exited_rx) = oneshot::channel();
active_headers_relay = start_on_demand_headers_relay(
relay_task_name.clone(),
relay_exited_tx,
source_client.clone(),
target_client.clone(),
pipeline.clone(),
);
if active_headers_relay.is_some() {
relay_exited_rx = new_relay_exited_rx.right_future();
}
}
(false, true) => {
stop_on_demand_headers_relay(active_headers_relay.take()).await;
}
_ => (),
}
}
}
/// On-demand headers relay task name.
fn on_demand_headers_relay_name<SourceChain: Chain, TargetChain: Chain>() -> String {
format!("on-demand-{}-to-{}", SourceChain::NAME, TargetChain::NAME)
}
/// Start on-demand headers relay task.
fn start_on_demand_headers_relay<SourceChain: Chain, TargetChain: Chain, TargetSign>(
task_name: String,
relay_exited_tx: oneshot::Sender<()>,
source_client: Client<SourceChain>,
target_client: Client<TargetChain>,
pipeline: SubstrateFinalityToSubstrate<SourceChain, TargetChain, TargetSign>,
) -> Option<async_std::task::JoinHandle<()>>
where
SourceChain::BlockNumber: BlockNumberBase,
SubstrateFinalityToSubstrate<SourceChain, TargetChain, TargetSign>: SubstrateFinalitySyncPipeline<
Hash = HashOf<SourceChain>,
Number = BlockNumberOf<SourceChain>,
Header = SyncHeader<SourceChain::Header>,
FinalityProof = GrandpaJustification<SourceChain::Header>,
TargetChain = TargetChain,
>,
TargetSign: 'static,
{
let headers_relay_future =
crate::finality_pipeline::run(pipeline, source_client, target_client, MetricsParams::disabled());
let closure_task_name = task_name.clone();
async_std::task::Builder::new()
.name(task_name.clone())
.spawn(async move {
log::info!(target: "bridge", "Starting {} headers relay", closure_task_name);
let result = headers_relay_future.await;
log::trace!(target: "bridge", "{} headers relay has exited. Result: {:?}", closure_task_name, result);
let _ = relay_exited_tx.send(());
})
.map_err(|error| {
log::error!(
target: "bridge",
"Failed to start {} relay: {:?}",
task_name,
error,
);
})
.ok()
}
/// Stop on-demand headers relay task.
async fn stop_on_demand_headers_relay(task: Option<async_std::task::JoinHandle<()>>) {
if let Some(task) = task {
let task_name = task
.task()
.name()
.expect("on-demand tasks are always started with name; qed")
.to_string();
log::trace!(target: "bridge", "Cancelling {} headers relay", task_name);
task.cancel().await;
log::info!(target: "bridge", "Cancelled {} headers relay", task_name);
}
}
@@ -16,12 +16,13 @@
//! Millau-to-Rialto messages sync entrypoint.
use super::{MillauClient, RialtoClient};
use crate::messages_lane::{select_delivery_transaction_limits, SubstrateMessageLane, SubstrateMessageLaneToSubstrate};
use crate::messages_lane::{
select_delivery_transaction_limits, MessagesRelayParams, SubstrateMessageLane, SubstrateMessageLaneToSubstrate,
};
use crate::messages_source::SubstrateMessagesSource;
use crate::messages_target::SubstrateMessagesTarget;
use bp_messages::{LaneId, MessageNonce};
use bp_messages::MessageNonce;
use bp_runtime::{MILLAU_BRIDGE_INSTANCE, RIALTO_BRIDGE_INSTANCE};
use bridge_runtime_common::messages::target::FromBridgedChainMessagesProof;
use codec::Encode;
@@ -33,12 +34,12 @@ use relay_substrate_client::{
metrics::{FloatStorageValueMetric, StorageProofOverheadMetric},
Chain, TransactionSignScheme,
};
use relay_utils::metrics::MetricsParams;
use sp_core::{Bytes, Pair};
use std::{ops::RangeInclusive, time::Duration};
/// Millau-to-Rialto message lane.
type MillauMessagesToRialto = SubstrateMessageLaneToSubstrate<Millau, MillauSigningParams, Rialto, RialtoSigningParams>;
pub type MillauMessagesToRialto =
SubstrateMessageLaneToSubstrate<Millau, MillauSigningParams, Rialto, RialtoSigningParams>;
impl SubstrateMessageLane for MillauMessagesToRialto {
const OUTBOUND_LANE_MESSAGES_DISPATCH_WEIGHT_METHOD: &'static str =
@@ -143,21 +144,18 @@ type RialtoTargetClient = SubstrateMessagesTarget<
/// Run Millau-to-Rialto messages sync.
pub async fn run(
millau_client: MillauClient,
millau_sign: MillauSigningParams,
rialto_client: RialtoClient,
rialto_sign: RialtoSigningParams,
lane_id: LaneId,
metrics_params: MetricsParams,
params: MessagesRelayParams<Millau, MillauSigningParams, Rialto, RialtoSigningParams>,
) -> Result<(), String> {
let stall_timeout = Duration::from_secs(5 * 60);
let relayer_id_at_millau = (*millau_sign.public().as_array_ref()).into();
let relayer_id_at_millau = (*params.source_sign.public().as_array_ref()).into();
let lane_id = params.lane_id;
let source_client = params.source_client;
let lane = MillauMessagesToRialto {
source_client: millau_client.clone(),
source_sign: millau_sign,
target_client: rialto_client.clone(),
target_sign: rialto_sign,
source_client: source_client.clone(),
source_sign: params.source_sign,
target_client: params.target_client.clone(),
target_sign: params.target_sign,
relayer_id_at_source: relayer_id_at_millau,
};
@@ -198,24 +196,48 @@ pub async fn run(
max_messages_size_in_single_batch,
},
},
MillauSourceClient::new(millau_client.clone(), lane.clone(), lane_id, RIALTO_BRIDGE_INSTANCE),
RialtoTargetClient::new(rialto_client, lane, lane_id, MILLAU_BRIDGE_INSTANCE),
MillauSourceClient::new(
source_client.clone(),
lane.clone(),
lane_id,
RIALTO_BRIDGE_INSTANCE,
params.target_to_source_headers_relay,
),
RialtoTargetClient::new(
params.target_client,
lane,
lane_id,
MILLAU_BRIDGE_INSTANCE,
params.source_to_target_headers_relay,
),
relay_utils::relay_metrics(
messages_relay::message_lane_loop::metrics_prefix::<MillauMessagesToRialto>(&lane_id),
metrics_params.address,
Some(messages_relay::message_lane_loop::metrics_prefix::<
MillauMessagesToRialto,
>(&lane_id)),
params.metrics_params,
)
.standalone_metric(StorageProofOverheadMetric::new(
millau_client.clone(),
"millau_storage_proof_overhead".into(),
"Millau storage proof overhead".into(),
))?
.standalone_metric(FloatStorageValueMetric::<_, sp_runtime::FixedU128>::new(
millau_client,
sp_core::storage::StorageKey(millau_runtime::rialto_messages::RialtoToMillauConversionRate::key().to_vec()),
Some(millau_runtime::rialto_messages::INITIAL_RIALTO_TO_MILLAU_CONVERSION_RATE),
"millau_rialto_to_millau_conversion_rate".into(),
"Rialto to Millau tokens conversion rate (used by Rialto)".into(),
))?
.standalone_metric(|registry, prefix| {
StorageProofOverheadMetric::new(
registry,
prefix,
source_client.clone(),
"millau_storage_proof_overhead".into(),
"Millau storage proof overhead".into(),
)
})?
.standalone_metric(|registry, prefix| {
FloatStorageValueMetric::<_, sp_runtime::FixedU128>::new(
registry,
prefix,
source_client,
sp_core::storage::StorageKey(
millau_runtime::rialto_messages::RialtoToMillauConversionRate::key().to_vec(),
),
Some(millau_runtime::rialto_messages::INITIAL_RIALTO_TO_MILLAU_CONVERSION_RATE),
"millau_rialto_to_millau_conversion_rate".into(),
"Rialto to Millau tokens conversion rate (used by Rialto)".into(),
)
})?
.into_params(),
futures::future::pending(),
)
@@ -22,11 +22,6 @@ pub mod rialto_headers_to_millau;
pub mod rialto_messages_to_millau;
pub mod westend_headers_to_millau;
/// Millau node client.
pub type MillauClient = relay_substrate_client::Client<Millau>;
/// Rialto node client.
pub type RialtoClient = relay_substrate_client::Client<Rialto>;
use crate::cli::{
bridge,
encode_call::{self, Call, CliEncodeCall},
@@ -16,12 +16,13 @@
//! Rialto-to-Millau messages sync entrypoint.
use super::{MillauClient, RialtoClient};
use crate::messages_lane::{select_delivery_transaction_limits, SubstrateMessageLane, SubstrateMessageLaneToSubstrate};
use crate::messages_lane::{
select_delivery_transaction_limits, MessagesRelayParams, SubstrateMessageLane, SubstrateMessageLaneToSubstrate,
};
use crate::messages_source::SubstrateMessagesSource;
use crate::messages_target::SubstrateMessagesTarget;
use bp_messages::{LaneId, MessageNonce};
use bp_messages::MessageNonce;
use bp_runtime::{MILLAU_BRIDGE_INSTANCE, RIALTO_BRIDGE_INSTANCE};
use bridge_runtime_common::messages::target::FromBridgedChainMessagesProof;
use codec::Encode;
@@ -33,12 +34,12 @@ use relay_substrate_client::{
metrics::{FloatStorageValueMetric, StorageProofOverheadMetric},
Chain, TransactionSignScheme,
};
use relay_utils::metrics::MetricsParams;
use sp_core::{Bytes, Pair};
use std::{ops::RangeInclusive, time::Duration};
/// Rialto-to-Millau message lane.
type RialtoMessagesToMillau = SubstrateMessageLaneToSubstrate<Rialto, RialtoSigningParams, Millau, MillauSigningParams>;
pub type RialtoMessagesToMillau =
SubstrateMessageLaneToSubstrate<Rialto, RialtoSigningParams, Millau, MillauSigningParams>;
impl SubstrateMessageLane for RialtoMessagesToMillau {
const OUTBOUND_LANE_MESSAGES_DISPATCH_WEIGHT_METHOD: &'static str =
@@ -143,21 +144,18 @@ type MillauTargetClient = SubstrateMessagesTarget<
/// Run Rialto-to-Millau messages sync.
pub async fn run(
rialto_client: RialtoClient,
rialto_sign: RialtoSigningParams,
millau_client: MillauClient,
millau_sign: MillauSigningParams,
lane_id: LaneId,
metrics_params: MetricsParams,
params: MessagesRelayParams<Rialto, RialtoSigningParams, Millau, MillauSigningParams>,
) -> Result<(), String> {
let stall_timeout = Duration::from_secs(5 * 60);
let relayer_id_at_rialto = (*rialto_sign.public().as_array_ref()).into();
let relayer_id_at_rialto = (*params.source_sign.public().as_array_ref()).into();
let lane_id = params.lane_id;
let source_client = params.source_client;
let lane = RialtoMessagesToMillau {
source_client: rialto_client.clone(),
source_sign: rialto_sign,
target_client: millau_client.clone(),
target_sign: millau_sign,
source_client: source_client.clone(),
source_sign: params.source_sign,
target_client: params.target_client.clone(),
target_sign: params.target_sign,
relayer_id_at_source: relayer_id_at_rialto,
};
@@ -197,24 +195,48 @@ pub async fn run(
max_messages_size_in_single_batch,
},
},
RialtoSourceClient::new(rialto_client.clone(), lane.clone(), lane_id, MILLAU_BRIDGE_INSTANCE),
MillauTargetClient::new(millau_client, lane, lane_id, RIALTO_BRIDGE_INSTANCE),
RialtoSourceClient::new(
source_client.clone(),
lane.clone(),
lane_id,
MILLAU_BRIDGE_INSTANCE,
params.target_to_source_headers_relay,
),
MillauTargetClient::new(
params.target_client,
lane,
lane_id,
RIALTO_BRIDGE_INSTANCE,
params.source_to_target_headers_relay,
),
relay_utils::relay_metrics(
messages_relay::message_lane_loop::metrics_prefix::<RialtoMessagesToMillau>(&lane_id),
metrics_params.address,
Some(messages_relay::message_lane_loop::metrics_prefix::<
RialtoMessagesToMillau,
>(&lane_id)),
params.metrics_params,
)
.standalone_metric(StorageProofOverheadMetric::new(
rialto_client.clone(),
"rialto_storage_proof_overhead".into(),
"Rialto storage proof overhead".into(),
))?
.standalone_metric(FloatStorageValueMetric::<_, sp_runtime::FixedU128>::new(
rialto_client,
sp_core::storage::StorageKey(rialto_runtime::millau_messages::MillauToRialtoConversionRate::key().to_vec()),
Some(rialto_runtime::millau_messages::INITIAL_MILLAU_TO_RIALTO_CONVERSION_RATE),
"rialto_millau_to_rialto_conversion_rate".into(),
"Millau to Rialto tokens conversion rate (used by Millau)".into(),
))?
.standalone_metric(|registry, prefix| {
StorageProofOverheadMetric::new(
registry,
prefix,
source_client.clone(),
"rialto_storage_proof_overhead".into(),
"Rialto storage proof overhead".into(),
)
})?
.standalone_metric(|registry, prefix| {
FloatStorageValueMetric::<_, sp_runtime::FixedU128>::new(
registry,
prefix,
source_client,
sp_core::storage::StorageKey(
rialto_runtime::millau_messages::MillauToRialtoConversionRate::key().to_vec(),
),
Some(rialto_runtime::millau_messages::INITIAL_MILLAU_TO_RIALTO_CONVERSION_RATE),
"rialto_millau_to_rialto_conversion_rate".into(),
"Millau to Rialto tokens conversion rate (used by Millau)".into(),
)
})?
.into_params(),
futures::future::pending(),
)
@@ -36,22 +36,30 @@ impl SubstrateFinalitySyncPipeline for WestendFinalityToMillau {
fn customize_metrics(params: MetricsParams) -> anyhow::Result<MetricsParams> {
Ok(
relay_utils::relay_metrics(finality_relay::metrics_prefix::<Self>(), params.address)
relay_utils::relay_metrics(Some(finality_relay::metrics_prefix::<Self>()), params)
// Polkadot/Kusama prices are added as metrics here, because atm we don't have Polkadot <-> Kusama
// relays, but we want to test metrics/dashboards in advance
.standalone_metric(FloatJsonValueMetric::new(
"https://api.coingecko.com/api/v3/simple/price?ids=Polkadot&vs_currencies=usd".into(),
"$.polkadot.usd".into(),
"polkadot_price".into(),
"Polkadot price in USD".into(),
))
.standalone_metric(|registry, prefix| {
FloatJsonValueMetric::new(
registry,
prefix,
"https://api.coingecko.com/api/v3/simple/price?ids=Polkadot&vs_currencies=usd".into(),
"$.polkadot.usd".into(),
"polkadot_price".into(),
"Polkadot price in USD".into(),
)
})
.map_err(|e| anyhow::format_err!("{}", e))?
.standalone_metric(FloatJsonValueMetric::new(
"https://api.coingecko.com/api/v3/simple/price?ids=Kusama&vs_currencies=usd".into(),
"$.kusama.usd".into(),
"kusama_price".into(),
"Kusama price in USD".into(),
))
.standalone_metric(|registry, prefix| {
FloatJsonValueMetric::new(
registry,
prefix,
"https://api.coingecko.com/api/v3/simple/price?ids=Kusama&vs_currencies=usd".into(),
"$.kusama.usd".into(),
"kusama_price".into(),
"Kusama price in USD".into(),
)
})
.map_err(|e| anyhow::format_err!("{}", e))?
.into_params(),
)
@@ -19,7 +19,7 @@ use crate::client::Client;
use async_trait::async_trait;
use codec::Decode;
use relay_utils::metrics::{register, Gauge, Metrics, Registry, StandaloneMetrics, F64};
use relay_utils::metrics::{metric_name, register, Gauge, PrometheusError, Registry, StandaloneMetrics, F64};
use sp_core::storage::StorageKey;
use sp_runtime::{traits::UniqueSaturatedInto, FixedPointNumber};
use std::time::Duration;
@@ -39,29 +39,20 @@ pub struct FloatStorageValueMetric<C: Chain, T: Clone> {
impl<C: Chain, T: Decode + FixedPointNumber> FloatStorageValueMetric<C, T> {
/// Create new metric.
pub fn new(
registry: &Registry,
prefix: Option<&str>,
client: Client<C>,
storage_key: StorageKey,
maybe_default_value: Option<T>,
name: String,
help: String,
) -> Self {
FloatStorageValueMetric {
) -> Result<Self, PrometheusError> {
Ok(FloatStorageValueMetric {
client,
storage_key,
maybe_default_value,
metric: Gauge::new(name, help).expect(
"only fails if gauge options are customized;\
we use default options;\
qed",
),
}
}
}
impl<C: Chain, T: Clone + Send + Sync + 'static> Metrics for FloatStorageValueMetric<C, T> {
fn register(&self, registry: &Registry) -> Result<(), String> {
register(self.metric.clone(), registry).map_err(|e| e.to_string())?;
Ok(())
metric: register(Gauge::new(metric_name(prefix, &name), help)?, registry)?,
})
}
}
@@ -19,7 +19,7 @@ use crate::client::Client;
use crate::error::Error;
use async_trait::async_trait;
use relay_utils::metrics::{register, Gauge, Metrics, Registry, StandaloneMetrics, U64};
use relay_utils::metrics::{metric_name, register, Gauge, PrometheusError, Registry, StandaloneMetrics, U64};
use sp_core::storage::StorageKey;
use sp_runtime::traits::Header as HeaderT;
use sp_storage::well_known_keys::CODE;
@@ -49,15 +49,17 @@ impl<C: Chain> Clone for StorageProofOverheadMetric<C> {
impl<C: Chain> StorageProofOverheadMetric<C> {
/// Create new metric instance with given name and help.
pub fn new(client: Client<C>, name: String, help: String) -> Self {
StorageProofOverheadMetric {
pub fn new(
registry: &Registry,
prefix: Option<&str>,
client: Client<C>,
name: String,
help: String,
) -> Result<Self, PrometheusError> {
Ok(StorageProofOverheadMetric {
client,
metric: Gauge::new(name, help).expect(
"only fails if gauge options are customized;\
we use default options;\
qed",
),
}
metric: register(Gauge::new(metric_name(prefix, &name), help)?, registry)?,
})
}
/// Returns approximate storage proof size overhead.
@@ -85,13 +87,6 @@ impl<C: Chain> StorageProofOverheadMetric<C> {
}
}
impl<C: Chain> Metrics for StorageProofOverheadMetric<C> {
fn register(&self, registry: &Registry) -> Result<(), String> {
register(self.metric.clone(), registry).map_err(|e| e.to_string())?;
Ok(())
}
}
#[async_trait]
impl<C: Chain> StandaloneMetrics for StorageProofOverheadMetric<C> {
fn update_interval(&self) -> Duration {
+8 -6
View File
@@ -78,6 +78,11 @@ impl<BlockNumber: Clone + Copy> TransactionProofsRelayStorage for InMemoryStorag
}
}
/// Return prefix that will be used by default to expose Prometheus metrics of the exchange loop.
pub fn metrics_prefix<P: TransactionProofPipeline>() -> String {
format!("{}_to_{}_Exchange", P::SOURCE_NAME, P::TARGET_NAME)
}
/// Run proofs synchronization.
pub async fn run<P: TransactionProofPipeline>(
storage: impl TransactionProofsRelayStorage<BlockNumber = BlockNumberOf<P>>,
@@ -89,12 +94,9 @@ pub async fn run<P: TransactionProofPipeline>(
let exit_signal = exit_signal.shared();
relay_utils::relay_loop(source_client, target_client)
.with_metrics(
format!("{}_to_{}_Exchange", P::SOURCE_NAME, P::TARGET_NAME),
metrics_params,
)
.loop_metric(ExchangeLoopMetrics::default())?
.standalone_metric(GlobalMetrics::default())?
.with_metrics(Some(metrics_prefix::<P>()), metrics_params)
.loop_metric(|registry, prefix| ExchangeLoopMetrics::new(registry, prefix))?
.standalone_metric(|registry, prefix| GlobalMetrics::new(registry, prefix))?
.expose()
.await?
.run(|source_client, target_client, metrics| {
@@ -17,7 +17,9 @@
//! Metrics for currency-exchange relay loop.
use crate::exchange::{BlockNumberOf, RelayedBlockTransactions, TransactionProofPipeline};
use relay_utils::metrics::{register, Counter, CounterVec, GaugeVec, Metrics, Opts, Registry, U64};
use relay_utils::metrics::{
metric_name, register, Counter, CounterVec, GaugeVec, Opts, PrometheusError, Registry, U64,
};
/// Exchange transactions relay metrics.
#[derive(Clone)]
@@ -30,31 +32,38 @@ pub struct ExchangeLoopMetrics {
processed_transactions: CounterVec<U64>,
}
impl Metrics for ExchangeLoopMetrics {
fn register(&self, registry: &Registry) -> Result<(), String> {
register(self.best_block_numbers.clone(), registry).map_err(|e| e.to_string())?;
register(self.processed_blocks.clone(), registry).map_err(|e| e.to_string())?;
register(self.processed_transactions.clone(), registry).map_err(|e| e.to_string())?;
Ok(())
}
}
impl Default for ExchangeLoopMetrics {
fn default() -> Self {
ExchangeLoopMetrics {
best_block_numbers: GaugeVec::new(
Opts::new("best_block_numbers", "Best finalized block numbers"),
&["type"],
)
.expect("metric is static and thus valid; qed"),
processed_blocks: Counter::new("processed_blocks", "Total number of processed blocks")
.expect("metric is static and thus valid; qed"),
processed_transactions: CounterVec::new(
Opts::new("processed_transactions", "Total number of processed transactions"),
&["type"],
)
.expect("metric is static and thus valid; qed"),
}
impl ExchangeLoopMetrics {
/// Create and register exchange loop metrics.
pub fn new(registry: &Registry, prefix: Option<&str>) -> Result<Self, PrometheusError> {
Ok(ExchangeLoopMetrics {
best_block_numbers: register(
GaugeVec::new(
Opts::new(
metric_name(prefix, "best_block_numbers"),
"Best finalized block numbers",
),
&["type"],
)?,
registry,
)?,
processed_blocks: register(
Counter::new(
metric_name(prefix, "processed_blocks"),
"Total number of processed blocks",
)?,
registry,
)?,
processed_transactions: register(
CounterVec::new(
Opts::new(
metric_name(prefix, "processed_transactions"),
"Total number of processed transactions",
),
&["type"],
)?,
registry,
)?,
})
}
}
+3 -3
View File
@@ -105,9 +105,9 @@ pub async fn run<P: FinalitySyncPipeline>(
) -> Result<(), String> {
let exit_signal = exit_signal.shared();
relay_utils::relay_loop(source_client, target_client)
.with_metrics(metrics_prefix::<P>(), metrics_params)
.loop_metric(SyncLoopMetrics::default())?
.standalone_metric(GlobalMetrics::default())?
.with_metrics(Some(metrics_prefix::<P>()), metrics_params)
.loop_metric(|registry, prefix| SyncLoopMetrics::new(registry, prefix))?
.standalone_metric(|registry, prefix| GlobalMetrics::new(registry, prefix))?
.expose()
.await?
.run(|source_client, target_client, metrics| {
+8 -3
View File
@@ -110,6 +110,11 @@ pub trait SyncMaintain<P: HeadersSyncPipeline>: Clone + Send + Sync {
impl<P: HeadersSyncPipeline> SyncMaintain<P> for () {}
/// Return prefix that will be used by default to expose Prometheus metrics of the finality proofs sync loop.
pub fn metrics_prefix<P: HeadersSyncPipeline>() -> String {
format!("{}_to_{}_Sync", P::SOURCE_NAME, P::TARGET_NAME)
}
/// Run headers synchronization.
#[allow(clippy::too_many_arguments)]
pub async fn run<P: HeadersSyncPipeline, TC: TargetClient<P>>(
@@ -124,9 +129,9 @@ pub async fn run<P: HeadersSyncPipeline, TC: TargetClient<P>>(
) -> Result<(), String> {
let exit_signal = exit_signal.shared();
relay_utils::relay_loop(source_client, target_client)
.with_metrics(format!("{}_to_{}_Sync", P::SOURCE_NAME, P::TARGET_NAME), metrics_params)
.loop_metric(SyncLoopMetrics::default())?
.standalone_metric(GlobalMetrics::default())?
.with_metrics(Some(metrics_prefix::<P>()), metrics_params)
.loop_metric(|registry, prefix| SyncLoopMetrics::new(registry, prefix))?
.standalone_metric(|registry, prefix| GlobalMetrics::new(registry, prefix))?
.expose()
.await?
.run(|source_client, target_client, metrics| {
+26 -23
View File
@@ -20,7 +20,7 @@ use crate::sync::HeadersSync;
use crate::sync_types::{HeaderStatus, HeadersSyncPipeline};
use num_traits::Zero;
use relay_utils::metrics::{register, GaugeVec, Metrics, Opts, Registry, U64};
use relay_utils::metrics::{metric_name, register, GaugeVec, Opts, PrometheusError, Registry, U64};
/// Headers sync metrics.
#[derive(Clone)]
@@ -31,28 +31,31 @@ pub struct SyncLoopMetrics {
blocks_in_state: GaugeVec<U64>,
}
impl Metrics for SyncLoopMetrics {
fn register(&self, registry: &Registry) -> Result<(), String> {
register(self.best_block_numbers.clone(), registry).map_err(|e| e.to_string())?;
register(self.blocks_in_state.clone(), registry).map_err(|e| e.to_string())?;
Ok(())
}
}
impl Default for SyncLoopMetrics {
fn default() -> Self {
SyncLoopMetrics {
best_block_numbers: GaugeVec::new(
Opts::new("best_block_numbers", "Best block numbers on source and target nodes"),
&["node"],
)
.expect("metric is static and thus valid; qed"),
blocks_in_state: GaugeVec::new(
Opts::new("blocks_in_state", "Number of blocks in given state"),
&["state"],
)
.expect("metric is static and thus valid; qed"),
}
impl SyncLoopMetrics {
/// Create and register headers loop metrics.
pub fn new(registry: &Registry, prefix: Option<&str>) -> Result<Self, PrometheusError> {
Ok(SyncLoopMetrics {
best_block_numbers: register(
GaugeVec::new(
Opts::new(
metric_name(prefix, "best_block_numbers"),
"Best block numbers on source and target nodes",
),
&["node"],
)?,
registry,
)?,
blocks_in_state: register(
GaugeVec::new(
Opts::new(
metric_name(prefix, "blocks_in_state"),
"Number of blocks in given state",
),
&["state"],
)?,
registry,
)?,
})
}
}
@@ -140,8 +140,8 @@ pub trait SourceClient<P: MessageLane>: RelayClient {
proof: P::MessagesReceivingProof,
) -> Result<(), Self::Error>;
/// Activate (or deactivate) headers relay that relays target headers to source node.
async fn activate_target_to_source_headers_relay(&self, activate: bool);
/// We need given finalized target header on source to continue synchronization.
async fn require_target_header_on_source(&self, id: TargetHeaderIdOf<P>);
}
/// Target client trait.
@@ -181,8 +181,8 @@ pub trait TargetClient<P: MessageLane>: RelayClient {
proof: P::MessagesProof,
) -> Result<RangeInclusive<MessageNonce>, Self::Error>;
/// Activate (or deactivate) headers relay that relays source headers to target node.
async fn activate_source_to_target_headers_relay(&self, activate: bool);
/// We need given finalized source header on target to continue synchronization.
async fn require_source_header_on_target(&self, id: SourceHeaderIdOf<P>);
}
/// State of the client.
@@ -232,9 +232,9 @@ pub async fn run<P: MessageLane>(
let exit_signal = exit_signal.shared();
relay_utils::relay_loop(source_client, target_client)
.reconnect_delay(params.reconnect_delay)
.with_metrics(metrics_prefix::<P>(&params.lane), metrics_params)
.loop_metric(MessageLaneLoopMetrics::default())?
.standalone_metric(GlobalMetrics::default())?
.with_metrics(Some(metrics_prefix::<P>(&params.lane)), metrics_params)
.loop_metric(|registry, prefix| MessageLaneLoopMetrics::new(registry, prefix))?
.standalone_metric(|registry, prefix| GlobalMetrics::new(registry, prefix))?
.expose()
.await?
.run(|source_client, target_client, metrics| {
@@ -475,8 +475,10 @@ pub(crate) mod tests {
target_latest_received_nonce: MessageNonce,
target_latest_confirmed_received_nonce: MessageNonce,
submitted_messages_proofs: Vec<TestMessagesProof>,
is_target_to_source_headers_relay_activated: bool,
is_source_to_target_headers_relay_activated: bool,
target_to_source_header_required: Option<TestTargetHeaderId>,
target_to_source_header_requirements: Vec<TestTargetHeaderId>,
source_to_target_header_required: Option<TestSourceHeaderId>,
source_to_target_header_requirements: Vec<TestSourceHeaderId>,
}
#[derive(Clone)]
@@ -582,9 +584,10 @@ pub(crate) mod tests {
Ok(())
}
async fn activate_target_to_source_headers_relay(&self, activate: bool) {
async fn require_target_header_on_source(&self, id: TargetHeaderIdOf<TestMessageLane>) {
let mut data = self.data.lock();
data.is_target_to_source_headers_relay_activated = activate;
data.target_to_source_header_required = Some(id);
data.target_to_source_header_requirements.push(id);
(self.tick)(&mut *data);
}
}
@@ -686,9 +689,10 @@ pub(crate) mod tests {
Ok(nonces)
}
async fn activate_source_to_target_headers_relay(&self, activate: bool) {
async fn require_source_header_on_target(&self, id: SourceHeaderIdOf<TestMessageLane>) {
let mut data = self.data.lock();
data.is_source_to_target_headers_relay_activated = activate;
data.source_to_target_header_required = Some(id);
data.source_to_target_header_requirements.push(id);
(self.tick)(&mut *data);
}
}
@@ -806,16 +810,16 @@ pub(crate) mod tests {
},
Arc::new(|data: &mut TestClientData| {
// headers relay must only be started when we need new target headers at source node
if data.is_target_to_source_headers_relay_activated {
if data.target_to_source_header_required.is_some() {
assert!(data.source_state.best_finalized_peer_at_best_self.0 < data.target_state.best_self.0);
data.is_target_to_source_headers_relay_activated = false;
data.target_to_source_header_required = None;
}
}),
Arc::new(move |data: &mut TestClientData| {
// headers relay must only be started when we need new source headers at target node
if data.is_target_to_source_headers_relay_activated {
if data.source_to_target_header_required.is_some() {
assert!(data.target_state.best_finalized_peer_at_best_self.0 < data.source_state.best_self.0);
data.is_target_to_source_headers_relay_activated = false;
data.source_to_target_header_required = None;
}
// syncing source headers -> target chain (all at once)
if data.target_state.best_finalized_peer_at_best_self.0 < data.source_state.best_finalized_self.0 {
@@ -837,7 +841,7 @@ pub(crate) mod tests {
HeaderId(data.source_state.best_self.0 + 1, data.source_state.best_self.0 + 1);
data.source_state.best_finalized_self = data.source_state.best_self;
}
// if source has received all messages receiving confirmations => increase source block so that confirmations may be sent
// if source has received all messages receiving confirmations => stop
if data.source_latest_confirmed_received_nonce == 10 {
exit_sender.unbounded_send(()).unwrap();
}
@@ -853,5 +857,9 @@ pub(crate) mod tests {
assert_eq!(result.submitted_messages_proofs[1].0, 5..=8);
assert_eq!(result.submitted_messages_proofs[2].0, 9..=10);
assert!(!result.submitted_messages_receiving_proofs.is_empty());
// check that we have at least once required new source->target or target->source headers
assert!(!result.target_to_source_header_requirements.is_empty());
assert!(!result.source_to_target_header_requirements.is_empty());
}
}
@@ -166,8 +166,8 @@ where
type Error = C::Error;
type TargetNoncesData = DeliveryRaceTargetNoncesData;
async fn require_more_source_headers(&self, activate: bool) {
self.client.activate_source_to_target_headers_relay(activate).await
async fn require_source_header(&self, id: SourceHeaderIdOf<P>) {
self.client.require_source_header_on_target(id).await
}
async fn nonces(
@@ -291,6 +291,10 @@ impl<P: MessageLane> RaceStrategy<SourceHeaderIdOf<P>, TargetHeaderIdOf<P>, P::M
self.strategy.is_empty()
}
fn required_source_header_at_target(&self, current_best: &SourceHeaderIdOf<P>) -> Option<SourceHeaderIdOf<P>> {
self.strategy.required_source_header_at_target(current_best)
}
fn best_at_source(&self) -> Option<MessageNonce> {
self.strategy.best_at_source()
}
@@ -123,8 +123,9 @@ pub trait TargetClient<P: MessageRace> {
/// Type of the additional data from the target client, used by the race.
type TargetNoncesData: std::fmt::Debug;
/// Ask headers relay to relay more headers from race source to race target.
async fn require_more_source_headers(&self, activate: bool);
/// Ask headers relay to relay finalized headers up to (and including) given header
/// from race source to race target.
async fn require_source_header(&self, id: P::SourceHeaderId);
/// Return nonces that are known to the target client.
async fn nonces(
@@ -152,6 +153,8 @@ pub trait RaceStrategy<SourceHeaderId, TargetHeaderId, Proof>: Debug {
/// Should return true if nothing has to be synced.
fn is_empty(&self) -> bool;
/// Return id of source header that is required to be on target to continue synchronization.
fn required_source_header_at_target(&self, current_best: &SourceHeaderId) -> Option<SourceHeaderId>;
/// Return best nonce at source node.
///
/// `Some` is returned only if we are sure that the value is greater or equal
@@ -219,7 +222,6 @@ pub async fn run<P: MessageRace, SC: SourceClient<P>, TC: TargetClient<P>>(
TargetNoncesData = TC::TargetNoncesData,
>,
) -> Result<(), FailedClient> {
let mut is_strategy_empty = true;
let mut progress_context = Instant::now();
let mut race_state = RaceState::default();
let mut stall_countdown = Instant::now();
@@ -307,6 +309,15 @@ pub async fn run<P: MessageRace, SC: SourceClient<P>, TC: TargetClient<P>>(
async_std::task::sleep,
|| format!("Error retrieving nonces from {}", P::source_name()),
).fail_if_connection_error(FailedClient::Source)?;
// ask for more headers if we have nonces to deliver and required headers are missing
let required_source_header_id = race_state
.best_finalized_source_header_id_at_best_target
.as_ref()
.and_then(|best|strategy.required_source_header_at_target(best));
if let Some(required_source_header_id) = required_source_header_id {
race_target.require_source_header(required_source_header_id).await;
}
},
nonces = target_best_nonces => {
target_best_nonces_required = false;
@@ -408,13 +419,6 @@ pub async fn run<P: MessageRace, SC: SourceClient<P>, TC: TargetClient<P>>(
progress_context = print_race_progress::<P, _>(progress_context, &strategy);
// ask for more headers if we have nonces to deliver
let prev_is_strategy_empty = is_strategy_empty;
is_strategy_empty = strategy.is_empty();
if is_strategy_empty != prev_is_strategy_empty {
race_target.require_more_source_headers(!is_strategy_empty).await;
}
if stall_countdown.elapsed() > stall_timeout {
log::warn!(
target: "bridge",
@@ -159,8 +159,8 @@ where
type Error = C::Error;
type TargetNoncesData = ();
async fn require_more_source_headers(&self, activate: bool) {
self.client.activate_target_to_source_headers_relay(activate).await
async fn require_source_header(&self, id: TargetHeaderIdOf<P>) {
self.client.require_target_header_on_source(id).await
}
async fn nonces(
@@ -162,6 +162,15 @@ where
self.source_queue.is_empty()
}
fn required_source_header_at_target(
&self,
current_best: &HeaderId<SourceHeaderHash, SourceHeaderNumber>,
) -> Option<HeaderId<SourceHeaderHash, SourceHeaderNumber>> {
self.source_queue
.back()
.and_then(|(h, _)| if h.0 > current_best.0 { Some(h.clone()) } else { None })
}
fn best_at_source(&self) -> Option<MessageNonce> {
let best_in_queue = self.source_queue.back().map(|(_, range)| range.end());
match (best_in_queue, self.best_target_nonce) {
+23 -20
View File
@@ -20,7 +20,7 @@ use crate::message_lane::MessageLane;
use crate::message_lane_loop::{SourceClientState, TargetClientState};
use bp_messages::MessageNonce;
use relay_utils::metrics::{register, GaugeVec, Metrics, Opts, Registry, U64};
use relay_utils::metrics::{metric_name, register, GaugeVec, Opts, PrometheusError, Registry, U64};
/// Message lane relay metrics.
///
@@ -34,25 +34,28 @@ pub struct MessageLaneLoopMetrics {
lane_state_nonces: GaugeVec<U64>,
}
impl Metrics for MessageLaneLoopMetrics {
fn register(&self, registry: &Registry) -> Result<(), String> {
register(self.best_block_numbers.clone(), registry).map_err(|e| e.to_string())?;
register(self.lane_state_nonces.clone(), registry).map_err(|e| e.to_string())?;
Ok(())
}
}
impl Default for MessageLaneLoopMetrics {
fn default() -> Self {
MessageLaneLoopMetrics {
best_block_numbers: GaugeVec::new(
Opts::new("best_block_numbers", "Best finalized block numbers"),
&["type"],
)
.expect("metric is static and thus valid; qed"),
lane_state_nonces: GaugeVec::new(Opts::new("lane_state_nonces", "Nonces of the lane state"), &["type"])
.expect("metric is static and thus valid; qed"),
}
impl MessageLaneLoopMetrics {
/// Create and register messages loop metrics.
pub fn new(registry: &Registry, prefix: Option<&str>) -> Result<Self, PrometheusError> {
Ok(MessageLaneLoopMetrics {
best_block_numbers: register(
GaugeVec::new(
Opts::new(
metric_name(prefix, "best_block_numbers"),
"Best finalized block numbers",
),
&["type"],
)?,
registry,
)?,
lane_state_nonces: register(
GaugeVec::new(
Opts::new(metric_name(prefix, "lane_state_nonces"), "Nonces of the lane state"),
&["type"],
)?,
registry,
)?,
})
}
}
+29 -5
View File
@@ -18,7 +18,7 @@ pub use float_json_value::FloatJsonValueMetric;
pub use global::GlobalMetrics;
pub use substrate_prometheus_endpoint::{
prometheus::core::{Atomic, Collector},
register, Counter, CounterVec, Gauge, GaugeVec, Opts, Registry, F64, U64,
register, Counter, CounterVec, Gauge, GaugeVec, Opts, PrometheusError, Registry, F64, U64,
};
use async_trait::async_trait;
@@ -43,13 +43,14 @@ pub struct MetricsParams {
pub address: Option<MetricsAddress>,
/// Metrics registry. May be `Some(_)` if several components share the same endpoint.
pub registry: Option<Registry>,
/// Prefix that must be used in metric names.
pub metrics_prefix: Option<String>,
}
/// Metrics API.
pub trait Metrics: Clone + Send + Sync + 'static {
/// Register metrics in the registry.
fn register(&self, registry: &Registry) -> Result<(), String>;
}
pub trait Metrics: Clone + Send + Sync + 'static {}
impl<T: Clone + Send + Sync + 'static> Metrics for T {}
/// Standalone metrics API.
///
@@ -90,8 +91,21 @@ impl MetricsParams {
MetricsParams {
address: None,
registry: None,
metrics_prefix: None,
}
}
/// Do not expose metrics.
pub fn disable(mut self) -> Self {
self.address = None;
self
}
/// Set prefix to use in metric names.
pub fn metrics_prefix(mut self, prefix: String) -> Self {
self.metrics_prefix = Some(prefix);
self
}
}
impl From<Option<MetricsAddress>> for MetricsParams {
@@ -99,10 +113,20 @@ impl From<Option<MetricsAddress>> for MetricsParams {
MetricsParams {
address,
registry: None,
metrics_prefix: None,
}
}
}
/// Returns metric name optionally prefixed with given prefix.
pub fn metric_name(prefix: Option<&str>, name: &str) -> String {
if let Some(prefix) = prefix {
format!("{}_{}", prefix, name)
} else {
name.into()
}
}
/// Set value of gauge metric.
///
/// If value is `Ok(None)` or `Err(_)`, metric would have default value.
@@ -14,7 +14,7 @@
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
use crate::metrics::{register, Gauge, Metrics, Registry, StandaloneMetrics, F64};
use crate::metrics::{metric_name, register, Gauge, PrometheusError, Registry, StandaloneMetrics, F64};
use async_trait::async_trait;
use std::time::Duration;
@@ -32,16 +32,19 @@ pub struct FloatJsonValueMetric {
impl FloatJsonValueMetric {
/// Create new metric instance with given name and help.
pub fn new(url: String, json_path: String, name: String, help: String) -> Self {
FloatJsonValueMetric {
pub fn new(
registry: &Registry,
prefix: Option<&str>,
url: String,
json_path: String,
name: String,
help: String,
) -> Result<Self, PrometheusError> {
Ok(FloatJsonValueMetric {
url,
json_path,
metric: Gauge::new(name, help).expect(
"only fails if gauge options are customized;\
we use default options;\
qed",
),
}
metric: register(Gauge::new(metric_name(prefix, &name), help)?, registry)?,
})
}
/// Read value from HTTP service.
@@ -69,13 +72,6 @@ impl FloatJsonValueMetric {
}
}
impl Metrics for FloatJsonValueMetric {
fn register(&self, registry: &Registry) -> Result<(), String> {
register(self.metric.clone(), registry).map_err(|e| e.to_string())?;
Ok(())
}
}
#[async_trait]
impl StandaloneMetrics for FloatJsonValueMetric {
fn update_interval(&self) -> Duration {
+27 -24
View File
@@ -16,7 +16,9 @@
//! Global system-wide Prometheus metrics exposed by relays.
use crate::metrics::{register, Gauge, GaugeVec, Metrics, Opts, Registry, StandaloneMetrics, F64, U64};
use crate::metrics::{
metric_name, register, Gauge, GaugeVec, Opts, PrometheusError, Registry, StandaloneMetrics, F64, U64,
};
use async_std::sync::{Arc, Mutex};
use async_trait::async_trait;
@@ -35,12 +37,30 @@ pub struct GlobalMetrics {
process_memory_usage_bytes: Gauge<U64>,
}
impl Metrics for GlobalMetrics {
fn register(&self, registry: &Registry) -> Result<(), String> {
register(self.system_average_load.clone(), registry).map_err(|e| e.to_string())?;
register(self.process_cpu_usage_percentage.clone(), registry).map_err(|e| e.to_string())?;
register(self.process_memory_usage_bytes.clone(), registry).map_err(|e| e.to_string())?;
Ok(())
impl GlobalMetrics {
/// Create and register global metrics.
pub fn new(registry: &Registry, prefix: Option<&str>) -> Result<Self, PrometheusError> {
Ok(GlobalMetrics {
system: Arc::new(Mutex::new(System::new_with_specifics(RefreshKind::everything()))),
system_average_load: register(
GaugeVec::new(
Opts::new(metric_name(prefix, "system_average_load"), "System load average"),
&["over"],
)?,
registry,
)?,
process_cpu_usage_percentage: register(
Gauge::new(metric_name(prefix, "process_cpu_usage_percentage"), "Process CPU usage")?,
registry,
)?,
process_memory_usage_bytes: register(
Gauge::new(
metric_name(prefix, "process_memory_usage_bytes"),
"Process memory (resident set size) usage",
)?,
registry,
)?,
})
}
}
@@ -89,20 +109,3 @@ impl StandaloneMetrics for GlobalMetrics {
UPDATE_INTERVAL
}
}
impl Default for GlobalMetrics {
fn default() -> Self {
GlobalMetrics {
system: Arc::new(Mutex::new(System::new_with_specifics(RefreshKind::everything()))),
system_average_load: GaugeVec::new(Opts::new("system_average_load", "System load average"), &["over"])
.expect("metric is static and thus valid; qed"),
process_cpu_usage_percentage: Gauge::new("process_cpu_usage_percentage", "Process CPU usage")
.expect("metric is static and thus valid; qed"),
process_memory_usage_bytes: Gauge::new(
"process_memory_usage_bytes",
"Process memory (resident set size) usage",
)
.expect("metric is static and thus valid; qed"),
}
}
}
+39 -22
View File
@@ -14,7 +14,7 @@
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
use crate::metrics::{Metrics, MetricsAddress, MetricsParams, StandaloneMetrics};
use crate::metrics::{Metrics, MetricsAddress, MetricsParams, PrometheusError, StandaloneMetrics};
use crate::{FailedClient, MaybeConnectionError};
use async_trait::async_trait;
@@ -45,9 +45,7 @@ pub fn relay_loop<SC, TC>(source_client: SC, target_client: TC) -> Loop<SC, TC,
}
/// Returns generic relay loop metrics that may be customized and used in one or several relay loops.
pub fn relay_metrics(prefix: String, address: Option<MetricsAddress>) -> LoopMetrics<(), (), ()> {
assert!(!prefix.is_empty(), "Metrics prefix can not be empty");
pub fn relay_metrics(prefix: Option<String>, params: MetricsParams) -> LoopMetrics<(), (), ()> {
LoopMetrics {
relay_loop: Loop {
reconnect_delay: RECONNECT_DELAY,
@@ -55,9 +53,9 @@ pub fn relay_metrics(prefix: String, address: Option<MetricsAddress>) -> LoopMet
target_client: (),
loop_metric: None,
},
address,
registry: Registry::new_custom(Some(prefix), None)
.expect("only fails if prefix is empty; prefix is not empty; qed"),
address: params.address,
registry: params.registry.unwrap_or_else(|| create_metrics_registry(prefix)),
metrics_prefix: params.metrics_prefix,
loop_metric: None,
}
}
@@ -75,6 +73,7 @@ pub struct LoopMetrics<SC, TC, LM> {
relay_loop: Loop<SC, TC, ()>,
address: Option<MetricsAddress>,
registry: Registry,
metrics_prefix: Option<String>,
loop_metric: Option<LM>,
}
@@ -86,11 +85,7 @@ impl<SC, TC, LM> Loop<SC, TC, LM> {
}
/// Start building loop metrics using given prefix.
///
/// Panics if `prefix` is empty.
pub fn with_metrics(self, prefix: String, params: MetricsParams) -> LoopMetrics<SC, TC, ()> {
assert!(!prefix.is_empty(), "Metrics prefix can not be empty");
pub fn with_metrics(self, prefix: Option<String>, params: MetricsParams) -> LoopMetrics<SC, TC, ()> {
LoopMetrics {
relay_loop: Loop {
reconnect_delay: self.reconnect_delay,
@@ -99,11 +94,8 @@ impl<SC, TC, LM> Loop<SC, TC, LM> {
loop_metric: None,
},
address: params.address,
registry: match params.registry {
Some(registry) => registry,
None => Registry::new_custom(Some(prefix), None)
.expect("only fails if prefix is empty; prefix is not empty; qed"),
},
registry: params.registry.unwrap_or_else(|| create_metrics_registry(prefix)),
metrics_prefix: params.metrics_prefix,
loop_metric: None,
}
}
@@ -177,21 +169,34 @@ impl<SC, TC, LM> LoopMetrics<SC, TC, LM> {
/// Add relay loop metrics.
///
/// Loop metrics will be passed to the loop callback.
pub fn loop_metric<NewLM: Metrics>(self, loop_metric: NewLM) -> Result<LoopMetrics<SC, TC, NewLM>, String> {
loop_metric.register(&self.registry)?;
pub fn loop_metric<NewLM: Metrics>(
self,
create_metric: impl FnOnce(&Registry, Option<&str>) -> Result<NewLM, PrometheusError>,
) -> Result<LoopMetrics<SC, TC, NewLM>, String> {
let loop_metric = create_metric(&self.registry, self.metrics_prefix.as_deref()).map_err(|e| e.to_string())?;
Ok(LoopMetrics {
relay_loop: self.relay_loop,
address: self.address,
registry: self.registry,
metrics_prefix: self.metrics_prefix,
loop_metric: Some(loop_metric),
})
}
/// Add standalone metrics.
pub fn standalone_metric<M: StandaloneMetrics>(self, standalone_metrics: M) -> Result<Self, String> {
standalone_metrics.register(&self.registry)?;
standalone_metrics.spawn();
pub fn standalone_metric<M: StandaloneMetrics>(
self,
create_metric: impl FnOnce(&Registry, Option<&str>) -> Result<M, PrometheusError>,
) -> Result<Self, String> {
// since standalone metrics are updating themselves, we may just ignore the fact that the same
// standalone metric is exposed by several loops && only spawn single metric
match create_metric(&self.registry, self.metrics_prefix.as_deref()) {
Ok(standalone_metrics) => standalone_metrics.spawn(),
Err(PrometheusError::AlreadyReg) => (),
Err(e) => return Err(e.to_string()),
}
Ok(self)
}
@@ -200,6 +205,7 @@ impl<SC, TC, LM> LoopMetrics<SC, TC, LM> {
MetricsParams {
address: self.address,
registry: Some(self.registry),
metrics_prefix: self.metrics_prefix,
}
}
@@ -237,3 +243,14 @@ impl<SC, TC, LM> LoopMetrics<SC, TC, LM> {
})
}
}
/// Create new registry with global metrics.
fn create_metrics_registry(prefix: Option<String>) -> Registry {
match prefix {
Some(prefix) => {
assert!(!prefix.is_empty(), "Metrics prefix can not be empty");
Registry::new_custom(Some(prefix), None).expect("only fails if prefix is empty; prefix is not empty; qed")
}
None => Registry::new(),
}
}