Message lane loop metrics (#400)

* message lane metrics

* fmt and clippy
This commit is contained in:
Svyatoslav Nikolsky
2020-10-06 18:17:24 +03:00
committed by Bastian Köcher
parent 8b8248f83f
commit cfe1e43473
7 changed files with 210 additions and 7 deletions
+5
View File
@@ -9,7 +9,12 @@ license = "GPL-3.0-or-later WITH Classpath-exception-2.0"
async-std = "1.6.5"
async-trait = "0.1.40"
futures = "0.3.5"
hex = "0.4"
log = "0.4.11"
num-traits = "0.2"
parking_lot = "0.11.0"
# Bridge Dependencies
bp-message-lane = { path = "../../primitives/message-lane" }
relay-utils = { path = "../utils" }
+2
View File
@@ -25,6 +25,8 @@
#![recursion_limit = "1024"]
#![warn(missing_docs)]
mod metrics;
pub mod message_lane;
pub mod message_lane_loop;
pub mod message_race_delivery;
@@ -37,6 +37,7 @@ pub trait MessageLane {
+ Debug
+ Default
+ From<u32>
+ Into<u64>
+ Ord
+ std::ops::Add<Output = Self::MessageNonce>
+ One
@@ -48,12 +49,12 @@ pub trait MessageLane {
type MessagesReceivingProof: Clone;
/// Number of the source header.
type SourceHeaderNumber: Clone + Debug + Default + Ord + PartialEq;
type SourceHeaderNumber: Clone + Debug + Default + Ord + PartialEq + Into<u64>;
/// Hash of the source header.
type SourceHeaderHash: Clone + Debug + Default + PartialEq;
/// Number of the target header.
type TargetHeaderNumber: Clone + Debug + Default + Ord + PartialEq;
type TargetHeaderNumber: Clone + Debug + Default + Ord + PartialEq + Into<u64>;
/// Hash of the target header.
type TargetHeaderHash: Clone + Debug + Default + PartialEq;
}
@@ -30,10 +30,16 @@
use crate::message_lane::{MessageLane, SourceHeaderIdOf, TargetHeaderIdOf};
use crate::message_race_delivery::run as run_message_delivery_race;
use crate::message_race_receiving::run as run_message_receiving_race;
use crate::metrics::MessageLaneLoopMetrics;
use async_trait::async_trait;
use bp_message_lane::LaneId;
use futures::{channel::mpsc::unbounded, future::FutureExt, stream::StreamExt};
use relay_utils::{interval, process_future_result, retry_backoff, FailedClient, MaybeConnectionError};
use relay_utils::{
interval,
metrics::{start as metrics_start, GlobalMetrics, MetricsParams},
process_future_result, retry_backoff, FailedClient, MaybeConnectionError,
};
use std::{fmt::Debug, future::Future, ops::RangeInclusive, time::Duration};
/// Source client trait.
@@ -132,19 +138,37 @@ pub struct ClientsState<P: MessageLane> {
}
/// Run message lane service loop.
#[allow(clippy::too_many_arguments)]
pub fn run<P: MessageLane>(
lane: LaneId,
mut source_client: impl SourceClient<P>,
source_tick: Duration,
mut target_client: impl TargetClient<P>,
target_tick: Duration,
reconnect_delay: Duration,
stall_timeout: Duration,
metrics_params: Option<MetricsParams>,
exit_signal: impl Future<Output = ()>,
) {
let mut local_pool = futures::executor::LocalPool::new();
let exit_signal = exit_signal.shared();
local_pool.run_until(async move {
let mut metrics_global = GlobalMetrics::default();
let metrics_msg = MessageLaneLoopMetrics::default();
let metrics_enabled = metrics_params.is_some();
metrics_start(
format!(
"{}_to_{}_MessageLoop/{}",
P::SOURCE_NAME,
P::TARGET_NAME,
hex::encode(lane)
),
metrics_params,
&metrics_global,
&metrics_msg,
);
loop {
let result = run_until_connection_lost(
source_client.clone(),
@@ -152,6 +176,16 @@ pub fn run<P: MessageLane>(
target_client.clone(),
target_tick,
stall_timeout,
if metrics_enabled {
Some(&mut metrics_global)
} else {
None
},
if metrics_enabled {
Some(metrics_msg.clone())
} else {
None
},
exit_signal.clone(),
)
.await;
@@ -180,12 +214,15 @@ pub fn run<P: MessageLane>(
}
/// Run one-way message delivery loop until connection with target or source node is lost, or exit signal is received.
#[allow(clippy::too_many_arguments)]
async fn run_until_connection_lost<P: MessageLane, SC: SourceClient<P>, TC: TargetClient<P>>(
source_client: SC,
source_tick: Duration,
target_client: TC,
target_tick: Duration,
stall_timeout: Duration,
mut metrics_global: Option<&mut GlobalMetrics>,
metrics_msg: Option<MessageLaneLoopMetrics>,
exit_signal: impl Future<Output = ()>,
) -> Result<(), FailedClient> {
let mut source_retry_backoff = retry_backoff();
@@ -212,6 +249,7 @@ async fn run_until_connection_lost<P: MessageLane, SC: SourceClient<P>, TC: Targ
target_client.clone(),
delivery_target_state_receiver,
stall_timeout,
metrics_msg.clone(),
)
.fuse();
@@ -225,6 +263,7 @@ async fn run_until_connection_lost<P: MessageLane, SC: SourceClient<P>, TC: Targ
target_client.clone(),
receiving_target_state_receiver,
stall_timeout,
metrics_msg.clone(),
)
.fuse();
@@ -259,6 +298,10 @@ async fn run_until_connection_lost<P: MessageLane, SC: SourceClient<P>, TC: Targ
);
let _ = delivery_source_state_sender.unbounded_send(new_source_state.clone());
let _ = receiving_source_state_sender.unbounded_send(new_source_state.clone());
if let Some(metrics_msg) = metrics_msg.as_ref() {
metrics_msg.update_source_state::<P>(new_source_state);
}
},
&mut source_go_offline_future,
|delay| async_std::task::sleep(delay),
@@ -286,6 +329,10 @@ async fn run_until_connection_lost<P: MessageLane, SC: SourceClient<P>, TC: Targ
);
let _ = delivery_target_state_sender.unbounded_send(new_target_state.clone());
let _ = receiving_target_state_sender.unbounded_send(new_target_state.clone());
if let Some(metrics_msg) = metrics_msg.as_ref() {
metrics_msg.update_target_state::<P>(new_target_state);
}
},
&mut target_go_offline_future,
|delay| async_std::task::sleep(delay),
@@ -317,6 +364,10 @@ async fn run_until_connection_lost<P: MessageLane, SC: SourceClient<P>, TC: Targ
}
}
if let Some(metrics_global) = metrics_global.as_mut() {
metrics_global.update();
}
if source_client_is_online && source_state_required {
log::debug!(target: "bridge", "Asking {} node about its state", P::SOURCE_NAME);
source_state.set(source_client.state().fuse());
@@ -562,12 +613,14 @@ pub(crate) mod tests {
tick: target_tick,
};
run(
[0, 0, 0, 0],
source_client,
Duration::from_millis(100),
target_client,
Duration::from_millis(100),
Duration::from_millis(0),
Duration::from_secs(60),
None,
exit_signal,
);
@@ -19,6 +19,7 @@ use crate::message_lane_loop::{
TargetClientState,
};
use crate::message_race_loop::{MessageRace, RaceState, RaceStrategy, SourceClient, TargetClient};
use crate::metrics::MessageLaneLoopMetrics;
use async_trait::async_trait;
use futures::stream::FusedStream;
@@ -36,15 +37,18 @@ pub async fn run<P: MessageLane>(
target_client: impl MessageLaneTargetClient<P>,
target_state_updates: impl FusedStream<Item = TargetClientState<P>>,
stall_timeout: Duration,
metrics_msg: Option<MessageLaneLoopMetrics>,
) -> Result<(), FailedClient> {
crate::message_race_loop::run(
MessageDeliveryRaceSource {
client: source_client,
metrics_msg: metrics_msg.clone(),
_phantom: Default::default(),
},
source_state_updates,
MessageDeliveryRaceTarget {
client: target_client,
metrics_msg,
_phantom: Default::default(),
},
target_state_updates,
@@ -76,6 +80,7 @@ impl<P: MessageLane> MessageRace for MessageDeliveryRace<P> {
/// Message delivery race source, which is a source of the lane.
struct MessageDeliveryRaceSource<P: MessageLane, C> {
client: C,
metrics_msg: Option<MessageLaneLoopMetrics>,
_phantom: PhantomData<P>,
}
@@ -91,7 +96,13 @@ where
&self,
at_block: SourceHeaderIdOf<P>,
) -> Result<(SourceHeaderIdOf<P>, P::MessageNonce), Self::Error> {
self.client.latest_generated_nonce(at_block).await
let result = self.client.latest_generated_nonce(at_block).await;
if let Some(metrics_msg) = self.metrics_msg.as_ref() {
if let Ok((_, source_latest_generated_nonce)) = result.as_ref() {
metrics_msg.update_target_latest_received_nonce::<P>(*source_latest_generated_nonce);
}
}
result
}
async fn generate_proof(
@@ -106,6 +117,7 @@ where
/// Message delivery race target, which is a target of the lane.
struct MessageDeliveryRaceTarget<P: MessageLane, C> {
client: C,
metrics_msg: Option<MessageLaneLoopMetrics>,
_phantom: PhantomData<P>,
}
@@ -121,7 +133,13 @@ where
&self,
at_block: TargetHeaderIdOf<P>,
) -> Result<(TargetHeaderIdOf<P>, P::MessageNonce), Self::Error> {
self.client.latest_received_nonce(at_block).await
let result = self.client.latest_received_nonce(at_block).await;
if let Some(metrics_msg) = self.metrics_msg.as_ref() {
if let Ok((_, target_latest_received_nonce)) = result.as_ref() {
metrics_msg.update_target_latest_received_nonce::<P>(*target_latest_received_nonce);
}
}
result
}
async fn submit_proof(
@@ -20,6 +20,7 @@ use crate::message_lane_loop::{
};
use crate::message_race_delivery::DeliveryStrategy;
use crate::message_race_loop::{MessageRace, SourceClient, TargetClient};
use crate::metrics::MessageLaneLoopMetrics;
use async_trait::async_trait;
use futures::stream::FusedStream;
@@ -43,15 +44,18 @@ pub async fn run<P: MessageLane>(
target_client: impl MessageLaneTargetClient<P>,
target_state_updates: impl FusedStream<Item = TargetClientState<P>>,
stall_timeout: Duration,
metrics_msg: Option<MessageLaneLoopMetrics>,
) -> Result<(), FailedClient> {
crate::message_race_loop::run(
ReceivingConfirmationsRaceSource {
client: target_client,
metrics_msg: metrics_msg.clone(),
_phantom: Default::default(),
},
target_state_updates,
ReceivingConfirmationsRaceTarget {
client: source_client,
metrics_msg,
_phantom: Default::default(),
},
source_state_updates,
@@ -83,6 +87,7 @@ impl<P: MessageLane> MessageRace for ReceivingConfirmationsRace<P> {
/// Message receiving confirmations race source, which is a target of the lane.
struct ReceivingConfirmationsRaceSource<P: MessageLane, C> {
client: C,
metrics_msg: Option<MessageLaneLoopMetrics>,
_phantom: PhantomData<P>,
}
@@ -98,7 +103,13 @@ where
&self,
at_block: TargetHeaderIdOf<P>,
) -> Result<(TargetHeaderIdOf<P>, P::MessageNonce), Self::Error> {
self.client.latest_received_nonce(at_block).await
let result = self.client.latest_received_nonce(at_block).await;
if let Some(metrics_msg) = self.metrics_msg.as_ref() {
if let Ok((_, target_latest_received_nonce)) = result.as_ref() {
metrics_msg.update_target_latest_received_nonce::<P>(*target_latest_received_nonce);
}
}
result
}
async fn generate_proof(
@@ -123,6 +134,7 @@ where
/// Message receiving confirmations race target, which is a source of the lane.
struct ReceivingConfirmationsRaceTarget<P: MessageLane, C> {
client: C,
metrics_msg: Option<MessageLaneLoopMetrics>,
_phantom: PhantomData<P>,
}
@@ -138,7 +150,13 @@ where
&self,
at_block: SourceHeaderIdOf<P>,
) -> Result<(SourceHeaderIdOf<P>, P::MessageNonce), Self::Error> {
self.client.latest_confirmed_received_nonce(at_block).await
let result = self.client.latest_confirmed_received_nonce(at_block).await;
if let Some(metrics_msg) = self.metrics_msg.as_ref() {
if let Ok((_, source_latest_confirmed_nonce)) = result.as_ref() {
metrics_msg.update_source_latest_confirmed_nonce::<P>(*source_latest_confirmed_nonce);
}
}
result
}
async fn submit_proof(
@@ -0,0 +1,106 @@
// Copyright 2019-2020 Parity Technologies (UK) Ltd.
// This file is part of Parity Bridges Common.
// Parity Bridges Common is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Bridges Common is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
//! Metrics for message lane relay loop.
use crate::message_lane::MessageLane;
use crate::message_lane_loop::{SourceClientState, TargetClientState};
use relay_utils::metrics::{register, GaugeVec, Metrics, Opts, Registry, U64};
/// Message lane relay metrics.
///
/// Cloning only clones references.
#[derive(Clone)]
pub struct MessageLaneLoopMetrics {
/// Best finalized block numbers - "source", "target", "source_at_target", "target_at_source".
best_block_numbers: GaugeVec<U64>,
/// Lane state nonces: "source_latest_generated", "source_latest_confirmed",
/// "target_latest_received", "target_latest_confirmed".
lane_state_nonces: GaugeVec<U64>,
}
impl Metrics for MessageLaneLoopMetrics {
fn register(&self, registry: &Registry) -> Result<(), String> {
register(self.best_block_numbers.clone(), registry).map_err(|e| e.to_string())?;
register(self.lane_state_nonces.clone(), registry).map_err(|e| e.to_string())?;
Ok(())
}
}
impl Default for MessageLaneLoopMetrics {
fn default() -> Self {
MessageLaneLoopMetrics {
best_block_numbers: GaugeVec::new(
Opts::new("best_block_numbers", "Best finalized block numbers"),
&["type"],
)
.expect("metric is static and thus valid; qed"),
lane_state_nonces: GaugeVec::new(Opts::new("lane_state_nonces", "Nonces of the lane state"), &["type"])
.expect("metric is static and thus valid; qed"),
}
}
}
impl MessageLaneLoopMetrics {
/// Update source client state metrics.
pub fn update_source_state<P: MessageLane>(&self, source_client_state: SourceClientState<P>) {
self.best_block_numbers
.with_label_values(&["source"])
.set(source_client_state.best_self.0.into());
self.best_block_numbers
.with_label_values(&["target_at_source"])
.set(source_client_state.best_peer.0.into());
}
/// Update target client state metrics.
pub fn update_target_state<P: MessageLane>(&self, target_client_state: TargetClientState<P>) {
self.best_block_numbers
.with_label_values(&["target"])
.set(target_client_state.best_self.0.into());
self.best_block_numbers
.with_label_values(&["source_at_target"])
.set(target_client_state.best_peer.0.into());
}
/// Update latest generated nonce at source.
pub fn update_source_latest_generated_nonce<P: MessageLane>(&self, source_latest_generated_nonce: P::MessageNonce) {
self.lane_state_nonces
.with_label_values(&["source_latest_generated"])
.set(source_latest_generated_nonce.into());
}
/// Update latest confirmed nonce at source.
pub fn update_source_latest_confirmed_nonce<P: MessageLane>(&self, source_latest_confirmed_nonce: P::MessageNonce) {
self.lane_state_nonces
.with_label_values(&["source_latest_confirmed"])
.set(source_latest_confirmed_nonce.into());
}
/// Update latest received nonce at target.
pub fn update_target_latest_received_nonce<P: MessageLane>(&self, target_latest_generated_nonce: P::MessageNonce) {
self.lane_state_nonces
.with_label_values(&["target_latest_received"])
.set(target_latest_generated_nonce.into());
}
/// Update latest confirmed nonce at target.
pub fn update_target_latest_confirmed_nonce<P: MessageLane>(&self, target_latest_confirmed_nonce: P::MessageNonce) {
self.lane_state_nonces
.with_label_values(&["target_latest_confirmed"])
.set(target_latest_confirmed_nonce.into());
}
}