diff --git a/bridges/relays/messages-relay/Cargo.toml b/bridges/relays/messages-relay/Cargo.toml index f876b38742..aa76679351 100644 --- a/bridges/relays/messages-relay/Cargo.toml +++ b/bridges/relays/messages-relay/Cargo.toml @@ -9,7 +9,12 @@ license = "GPL-3.0-or-later WITH Classpath-exception-2.0" async-std = "1.6.5" async-trait = "0.1.40" futures = "0.3.5" +hex = "0.4" log = "0.4.11" num-traits = "0.2" parking_lot = "0.11.0" + +# Bridge Dependencies + +bp-message-lane = { path = "../../primitives/message-lane" } relay-utils = { path = "../utils" } diff --git a/bridges/relays/messages-relay/src/lib.rs b/bridges/relays/messages-relay/src/lib.rs index fe764071cf..d1820140d8 100644 --- a/bridges/relays/messages-relay/src/lib.rs +++ b/bridges/relays/messages-relay/src/lib.rs @@ -25,6 +25,8 @@ #![recursion_limit = "1024"] #![warn(missing_docs)] +mod metrics; + pub mod message_lane; pub mod message_lane_loop; pub mod message_race_delivery; diff --git a/bridges/relays/messages-relay/src/message_lane.rs b/bridges/relays/messages-relay/src/message_lane.rs index 9db30926a0..3869e6f24d 100644 --- a/bridges/relays/messages-relay/src/message_lane.rs +++ b/bridges/relays/messages-relay/src/message_lane.rs @@ -37,6 +37,7 @@ pub trait MessageLane { + Debug + Default + From + + Into + Ord + std::ops::Add + One @@ -48,12 +49,12 @@ pub trait MessageLane { type MessagesReceivingProof: Clone; /// Number of the source header. - type SourceHeaderNumber: Clone + Debug + Default + Ord + PartialEq; + type SourceHeaderNumber: Clone + Debug + Default + Ord + PartialEq + Into; /// Hash of the source header. type SourceHeaderHash: Clone + Debug + Default + PartialEq; /// Number of the target header. - type TargetHeaderNumber: Clone + Debug + Default + Ord + PartialEq; + type TargetHeaderNumber: Clone + Debug + Default + Ord + PartialEq + Into; /// Hash of the target header. type TargetHeaderHash: Clone + Debug + Default + PartialEq; } diff --git a/bridges/relays/messages-relay/src/message_lane_loop.rs b/bridges/relays/messages-relay/src/message_lane_loop.rs index 91d3017e93..b2eb7d2820 100644 --- a/bridges/relays/messages-relay/src/message_lane_loop.rs +++ b/bridges/relays/messages-relay/src/message_lane_loop.rs @@ -30,10 +30,16 @@ use crate::message_lane::{MessageLane, SourceHeaderIdOf, TargetHeaderIdOf}; use crate::message_race_delivery::run as run_message_delivery_race; use crate::message_race_receiving::run as run_message_receiving_race; +use crate::metrics::MessageLaneLoopMetrics; use async_trait::async_trait; +use bp_message_lane::LaneId; use futures::{channel::mpsc::unbounded, future::FutureExt, stream::StreamExt}; -use relay_utils::{interval, process_future_result, retry_backoff, FailedClient, MaybeConnectionError}; +use relay_utils::{ + interval, + metrics::{start as metrics_start, GlobalMetrics, MetricsParams}, + process_future_result, retry_backoff, FailedClient, MaybeConnectionError, +}; use std::{fmt::Debug, future::Future, ops::RangeInclusive, time::Duration}; /// Source client trait. @@ -132,19 +138,37 @@ pub struct ClientsState { } /// Run message lane service loop. +#[allow(clippy::too_many_arguments)] pub fn run( + lane: LaneId, mut source_client: impl SourceClient

, source_tick: Duration, mut target_client: impl TargetClient

, target_tick: Duration, reconnect_delay: Duration, stall_timeout: Duration, + metrics_params: Option, exit_signal: impl Future, ) { let mut local_pool = futures::executor::LocalPool::new(); let exit_signal = exit_signal.shared(); local_pool.run_until(async move { + let mut metrics_global = GlobalMetrics::default(); + let metrics_msg = MessageLaneLoopMetrics::default(); + let metrics_enabled = metrics_params.is_some(); + metrics_start( + format!( + "{}_to_{}_MessageLoop/{}", + P::SOURCE_NAME, + P::TARGET_NAME, + hex::encode(lane) + ), + metrics_params, + &metrics_global, + &metrics_msg, + ); + loop { let result = run_until_connection_lost( source_client.clone(), @@ -152,6 +176,16 @@ pub fn run( target_client.clone(), target_tick, stall_timeout, + if metrics_enabled { + Some(&mut metrics_global) + } else { + None + }, + if metrics_enabled { + Some(metrics_msg.clone()) + } else { + None + }, exit_signal.clone(), ) .await; @@ -180,12 +214,15 @@ pub fn run( } /// Run one-way message delivery loop until connection with target or source node is lost, or exit signal is received. +#[allow(clippy::too_many_arguments)] async fn run_until_connection_lost, TC: TargetClient

>( source_client: SC, source_tick: Duration, target_client: TC, target_tick: Duration, stall_timeout: Duration, + mut metrics_global: Option<&mut GlobalMetrics>, + metrics_msg: Option, exit_signal: impl Future, ) -> Result<(), FailedClient> { let mut source_retry_backoff = retry_backoff(); @@ -212,6 +249,7 @@ async fn run_until_connection_lost, TC: Targ target_client.clone(), delivery_target_state_receiver, stall_timeout, + metrics_msg.clone(), ) .fuse(); @@ -225,6 +263,7 @@ async fn run_until_connection_lost, TC: Targ target_client.clone(), receiving_target_state_receiver, stall_timeout, + metrics_msg.clone(), ) .fuse(); @@ -259,6 +298,10 @@ async fn run_until_connection_lost, TC: Targ ); let _ = delivery_source_state_sender.unbounded_send(new_source_state.clone()); let _ = receiving_source_state_sender.unbounded_send(new_source_state.clone()); + + if let Some(metrics_msg) = metrics_msg.as_ref() { + metrics_msg.update_source_state::

(new_source_state); + } }, &mut source_go_offline_future, |delay| async_std::task::sleep(delay), @@ -286,6 +329,10 @@ async fn run_until_connection_lost, TC: Targ ); let _ = delivery_target_state_sender.unbounded_send(new_target_state.clone()); let _ = receiving_target_state_sender.unbounded_send(new_target_state.clone()); + + if let Some(metrics_msg) = metrics_msg.as_ref() { + metrics_msg.update_target_state::

(new_target_state); + } }, &mut target_go_offline_future, |delay| async_std::task::sleep(delay), @@ -317,6 +364,10 @@ async fn run_until_connection_lost, TC: Targ } } + if let Some(metrics_global) = metrics_global.as_mut() { + metrics_global.update(); + } + if source_client_is_online && source_state_required { log::debug!(target: "bridge", "Asking {} node about its state", P::SOURCE_NAME); source_state.set(source_client.state().fuse()); @@ -562,12 +613,14 @@ pub(crate) mod tests { tick: target_tick, }; run( + [0, 0, 0, 0], source_client, Duration::from_millis(100), target_client, Duration::from_millis(100), Duration::from_millis(0), Duration::from_secs(60), + None, exit_signal, ); diff --git a/bridges/relays/messages-relay/src/message_race_delivery.rs b/bridges/relays/messages-relay/src/message_race_delivery.rs index 7cac96c01b..d725f25687 100644 --- a/bridges/relays/messages-relay/src/message_race_delivery.rs +++ b/bridges/relays/messages-relay/src/message_race_delivery.rs @@ -19,6 +19,7 @@ use crate::message_lane_loop::{ TargetClientState, }; use crate::message_race_loop::{MessageRace, RaceState, RaceStrategy, SourceClient, TargetClient}; +use crate::metrics::MessageLaneLoopMetrics; use async_trait::async_trait; use futures::stream::FusedStream; @@ -36,15 +37,18 @@ pub async fn run( target_client: impl MessageLaneTargetClient

, target_state_updates: impl FusedStream>, stall_timeout: Duration, + metrics_msg: Option, ) -> Result<(), FailedClient> { crate::message_race_loop::run( MessageDeliveryRaceSource { client: source_client, + metrics_msg: metrics_msg.clone(), _phantom: Default::default(), }, source_state_updates, MessageDeliveryRaceTarget { client: target_client, + metrics_msg, _phantom: Default::default(), }, target_state_updates, @@ -76,6 +80,7 @@ impl MessageRace for MessageDeliveryRace

{ /// Message delivery race source, which is a source of the lane. struct MessageDeliveryRaceSource { client: C, + metrics_msg: Option, _phantom: PhantomData

, } @@ -91,7 +96,13 @@ where &self, at_block: SourceHeaderIdOf

, ) -> Result<(SourceHeaderIdOf

, P::MessageNonce), Self::Error> { - self.client.latest_generated_nonce(at_block).await + let result = self.client.latest_generated_nonce(at_block).await; + if let Some(metrics_msg) = self.metrics_msg.as_ref() { + if let Ok((_, source_latest_generated_nonce)) = result.as_ref() { + metrics_msg.update_target_latest_received_nonce::

(*source_latest_generated_nonce); + } + } + result } async fn generate_proof( @@ -106,6 +117,7 @@ where /// Message delivery race target, which is a target of the lane. struct MessageDeliveryRaceTarget { client: C, + metrics_msg: Option, _phantom: PhantomData

, } @@ -121,7 +133,13 @@ where &self, at_block: TargetHeaderIdOf

, ) -> Result<(TargetHeaderIdOf

, P::MessageNonce), Self::Error> { - self.client.latest_received_nonce(at_block).await + let result = self.client.latest_received_nonce(at_block).await; + if let Some(metrics_msg) = self.metrics_msg.as_ref() { + if let Ok((_, target_latest_received_nonce)) = result.as_ref() { + metrics_msg.update_target_latest_received_nonce::

(*target_latest_received_nonce); + } + } + result } async fn submit_proof( diff --git a/bridges/relays/messages-relay/src/message_race_receiving.rs b/bridges/relays/messages-relay/src/message_race_receiving.rs index 4eeea7adbd..d985020fdd 100644 --- a/bridges/relays/messages-relay/src/message_race_receiving.rs +++ b/bridges/relays/messages-relay/src/message_race_receiving.rs @@ -20,6 +20,7 @@ use crate::message_lane_loop::{ }; use crate::message_race_delivery::DeliveryStrategy; use crate::message_race_loop::{MessageRace, SourceClient, TargetClient}; +use crate::metrics::MessageLaneLoopMetrics; use async_trait::async_trait; use futures::stream::FusedStream; @@ -43,15 +44,18 @@ pub async fn run( target_client: impl MessageLaneTargetClient

, target_state_updates: impl FusedStream>, stall_timeout: Duration, + metrics_msg: Option, ) -> Result<(), FailedClient> { crate::message_race_loop::run( ReceivingConfirmationsRaceSource { client: target_client, + metrics_msg: metrics_msg.clone(), _phantom: Default::default(), }, target_state_updates, ReceivingConfirmationsRaceTarget { client: source_client, + metrics_msg, _phantom: Default::default(), }, source_state_updates, @@ -83,6 +87,7 @@ impl MessageRace for ReceivingConfirmationsRace

{ /// Message receiving confirmations race source, which is a target of the lane. struct ReceivingConfirmationsRaceSource { client: C, + metrics_msg: Option, _phantom: PhantomData

, } @@ -98,7 +103,13 @@ where &self, at_block: TargetHeaderIdOf

, ) -> Result<(TargetHeaderIdOf

, P::MessageNonce), Self::Error> { - self.client.latest_received_nonce(at_block).await + let result = self.client.latest_received_nonce(at_block).await; + if let Some(metrics_msg) = self.metrics_msg.as_ref() { + if let Ok((_, target_latest_received_nonce)) = result.as_ref() { + metrics_msg.update_target_latest_received_nonce::

(*target_latest_received_nonce); + } + } + result } async fn generate_proof( @@ -123,6 +134,7 @@ where /// Message receiving confirmations race target, which is a source of the lane. struct ReceivingConfirmationsRaceTarget { client: C, + metrics_msg: Option, _phantom: PhantomData

, } @@ -138,7 +150,13 @@ where &self, at_block: SourceHeaderIdOf

, ) -> Result<(SourceHeaderIdOf

, P::MessageNonce), Self::Error> { - self.client.latest_confirmed_received_nonce(at_block).await + let result = self.client.latest_confirmed_received_nonce(at_block).await; + if let Some(metrics_msg) = self.metrics_msg.as_ref() { + if let Ok((_, source_latest_confirmed_nonce)) = result.as_ref() { + metrics_msg.update_source_latest_confirmed_nonce::

(*source_latest_confirmed_nonce); + } + } + result } async fn submit_proof( diff --git a/bridges/relays/messages-relay/src/metrics.rs b/bridges/relays/messages-relay/src/metrics.rs new file mode 100644 index 0000000000..93f4259c10 --- /dev/null +++ b/bridges/relays/messages-relay/src/metrics.rs @@ -0,0 +1,106 @@ +// Copyright 2019-2020 Parity Technologies (UK) Ltd. +// This file is part of Parity Bridges Common. + +// Parity Bridges Common is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity Bridges Common is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity Bridges Common. If not, see . + +//! Metrics for message lane relay loop. + +use crate::message_lane::MessageLane; +use crate::message_lane_loop::{SourceClientState, TargetClientState}; + +use relay_utils::metrics::{register, GaugeVec, Metrics, Opts, Registry, U64}; + +/// Message lane relay metrics. +/// +/// Cloning only clones references. +#[derive(Clone)] +pub struct MessageLaneLoopMetrics { + /// Best finalized block numbers - "source", "target", "source_at_target", "target_at_source". + best_block_numbers: GaugeVec, + /// Lane state nonces: "source_latest_generated", "source_latest_confirmed", + /// "target_latest_received", "target_latest_confirmed". + lane_state_nonces: GaugeVec, +} + +impl Metrics for MessageLaneLoopMetrics { + fn register(&self, registry: &Registry) -> Result<(), String> { + register(self.best_block_numbers.clone(), registry).map_err(|e| e.to_string())?; + register(self.lane_state_nonces.clone(), registry).map_err(|e| e.to_string())?; + Ok(()) + } +} + +impl Default for MessageLaneLoopMetrics { + fn default() -> Self { + MessageLaneLoopMetrics { + best_block_numbers: GaugeVec::new( + Opts::new("best_block_numbers", "Best finalized block numbers"), + &["type"], + ) + .expect("metric is static and thus valid; qed"), + lane_state_nonces: GaugeVec::new(Opts::new("lane_state_nonces", "Nonces of the lane state"), &["type"]) + .expect("metric is static and thus valid; qed"), + } + } +} + +impl MessageLaneLoopMetrics { + /// Update source client state metrics. + pub fn update_source_state(&self, source_client_state: SourceClientState

) { + self.best_block_numbers + .with_label_values(&["source"]) + .set(source_client_state.best_self.0.into()); + self.best_block_numbers + .with_label_values(&["target_at_source"]) + .set(source_client_state.best_peer.0.into()); + } + + /// Update target client state metrics. + pub fn update_target_state(&self, target_client_state: TargetClientState

) { + self.best_block_numbers + .with_label_values(&["target"]) + .set(target_client_state.best_self.0.into()); + self.best_block_numbers + .with_label_values(&["source_at_target"]) + .set(target_client_state.best_peer.0.into()); + } + + /// Update latest generated nonce at source. + pub fn update_source_latest_generated_nonce(&self, source_latest_generated_nonce: P::MessageNonce) { + self.lane_state_nonces + .with_label_values(&["source_latest_generated"]) + .set(source_latest_generated_nonce.into()); + } + + /// Update latest confirmed nonce at source. + pub fn update_source_latest_confirmed_nonce(&self, source_latest_confirmed_nonce: P::MessageNonce) { + self.lane_state_nonces + .with_label_values(&["source_latest_confirmed"]) + .set(source_latest_confirmed_nonce.into()); + } + + /// Update latest received nonce at target. + pub fn update_target_latest_received_nonce(&self, target_latest_generated_nonce: P::MessageNonce) { + self.lane_state_nonces + .with_label_values(&["target_latest_received"]) + .set(target_latest_generated_nonce.into()); + } + + /// Update latest confirmed nonce at target. + pub fn update_target_latest_confirmed_nonce(&self, target_latest_confirmed_nonce: P::MessageNonce) { + self.lane_state_nonces + .with_label_values(&["target_latest_confirmed"]) + .set(target_latest_confirmed_nonce.into()); + } +}