mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-04-28 22:37:57 +00:00
Changed how relay loops are started (#840)
* slightly changed relay loop initialization * git mv * clippy * more clippy * loop_run -> run_loop * review and clippy * clippy
This commit is contained in:
committed by
Bastian Köcher
parent
8d122b03f1
commit
a17c7eb80c
@@ -385,7 +385,7 @@ async fn run_auto_transactions_relay_loop(
|
||||
metrics_params,
|
||||
futures::future::pending(),
|
||||
)
|
||||
.await;
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -258,8 +258,8 @@ pub async fn run(params: EthereumSyncParams) -> Result<(), RpcError> {
|
||||
instance,
|
||||
} = params;
|
||||
|
||||
let eth_client = async_std::task::block_on(EthereumClient::new(eth_params))?;
|
||||
let sub_client = async_std::task::block_on(SubstrateClient::<Rialto>::new(sub_params))?;
|
||||
let eth_client = EthereumClient::new(eth_params).await?;
|
||||
let sub_client = SubstrateClient::<Rialto>::new(sub_params).await?;
|
||||
|
||||
let sign_sub_transactions = match sync_params.target_tx_mode {
|
||||
TargetTransactionMode::Signed | TargetTransactionMode::Backup => true,
|
||||
@@ -279,7 +279,8 @@ pub async fn run(params: EthereumSyncParams) -> Result<(), RpcError> {
|
||||
metrics_params,
|
||||
futures::future::pending(),
|
||||
)
|
||||
.await;
|
||||
.await
|
||||
.map_err(RpcError::SyncLoop)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -29,6 +29,8 @@ pub enum RpcError {
|
||||
Ethereum(EthereumNodeError),
|
||||
/// An error occured when interacting with a Substrate node.
|
||||
Substrate(SubstrateNodeError),
|
||||
/// Error running relay loop.
|
||||
SyncLoop(String),
|
||||
}
|
||||
|
||||
impl From<RpcError> for String {
|
||||
@@ -37,6 +39,7 @@ impl From<RpcError> for String {
|
||||
RpcError::Serialization(e) => e.to_string(),
|
||||
RpcError::Ethereum(e) => e.to_string(),
|
||||
RpcError::Substrate(e) => e.to_string(),
|
||||
RpcError::SyncLoop(e) => e,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -173,8 +173,8 @@ pub async fn run(params: SubstrateSyncParams) -> Result<(), RpcError> {
|
||||
metrics_params,
|
||||
} = params;
|
||||
|
||||
let eth_client = async_std::task::block_on(EthereumClient::new(eth_params))?;
|
||||
let sub_client = async_std::task::block_on(SubstrateClient::<Rialto>::new(sub_params))?;
|
||||
let eth_client = EthereumClient::new(eth_params).await?;
|
||||
let sub_client = SubstrateClient::<Rialto>::new(sub_params).await?;
|
||||
|
||||
let target = EthereumHeadersTarget::new(eth_client, eth_contract_address, eth_sign);
|
||||
let source = SubstrateHeadersSource::new(sub_client);
|
||||
@@ -189,7 +189,8 @@ pub async fn run(params: SubstrateSyncParams) -> Result<(), RpcError> {
|
||||
metrics_params,
|
||||
futures::future::pending(),
|
||||
)
|
||||
.await;
|
||||
.await
|
||||
.map_err(RpcError::SyncLoop)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -98,7 +98,8 @@ pub async fn run<SourceChain, TargetChain, P>(
|
||||
source_client: Client<SourceChain>,
|
||||
target_client: Client<TargetChain>,
|
||||
metrics_params: Option<relay_utils::metrics::MetricsParams>,
|
||||
) where
|
||||
) -> Result<(), String>
|
||||
where
|
||||
P: SubstrateFinalitySyncPipeline<
|
||||
Hash = HashOf<SourceChain>,
|
||||
Number = BlockNumberOf<SourceChain>,
|
||||
@@ -127,5 +128,5 @@ pub async fn run<SourceChain, TargetChain, P>(
|
||||
metrics_params,
|
||||
futures::future::pending(),
|
||||
)
|
||||
.await;
|
||||
.await
|
||||
}
|
||||
|
||||
@@ -58,12 +58,12 @@ pub async fn run(
|
||||
rialto_client: RialtoClient,
|
||||
rialto_sign: RialtoSigningParams,
|
||||
metrics_params: Option<relay_utils::metrics::MetricsParams>,
|
||||
) {
|
||||
) -> Result<(), String> {
|
||||
crate::finality_pipeline::run(
|
||||
MillauFinalityToRialto::new(rialto_client.clone(), rialto_sign),
|
||||
millau_client,
|
||||
rialto_client,
|
||||
metrics_params,
|
||||
)
|
||||
.await;
|
||||
.await
|
||||
}
|
||||
|
||||
@@ -132,7 +132,7 @@ pub async fn run(
|
||||
rialto_sign: RialtoSigningParams,
|
||||
lane_id: LaneId,
|
||||
metrics_params: Option<MetricsParams>,
|
||||
) {
|
||||
) -> Result<(), String> {
|
||||
let stall_timeout = Duration::from_secs(5 * 60);
|
||||
let relayer_id_at_millau = millau_sign.signer.public().as_array_ref().clone().into();
|
||||
|
||||
@@ -186,5 +186,5 @@ pub async fn run(
|
||||
metrics_params,
|
||||
futures::future::pending(),
|
||||
)
|
||||
.await;
|
||||
.await
|
||||
}
|
||||
|
||||
@@ -151,7 +151,7 @@ async fn run_relay_headers(command: cli::RelayHeaders) -> Result<(), String> {
|
||||
let millau_client = millau.into_client().await?;
|
||||
let rialto_client = rialto.into_client().await?;
|
||||
let rialto_sign = rialto_sign.parse()?;
|
||||
millau_headers_to_rialto::run(millau_client, rialto_client, rialto_sign, prometheus_params.into()).await;
|
||||
millau_headers_to_rialto::run(millau_client, rialto_client, rialto_sign, prometheus_params.into()).await
|
||||
}
|
||||
cli::RelayHeaders::RialtoToMillau {
|
||||
rialto,
|
||||
@@ -162,7 +162,7 @@ async fn run_relay_headers(command: cli::RelayHeaders) -> Result<(), String> {
|
||||
let rialto_client = rialto.into_client().await?;
|
||||
let millau_client = millau.into_client().await?;
|
||||
let millau_sign = millau_sign.parse()?;
|
||||
rialto_headers_to_millau::run(rialto_client, millau_client, millau_sign, prometheus_params.into()).await;
|
||||
rialto_headers_to_millau::run(rialto_client, millau_client, millau_sign, prometheus_params.into()).await
|
||||
}
|
||||
cli::RelayHeaders::WestendToMillau {
|
||||
westend,
|
||||
@@ -173,10 +173,9 @@ async fn run_relay_headers(command: cli::RelayHeaders) -> Result<(), String> {
|
||||
let westend_client = westend.into_client().await?;
|
||||
let millau_client = millau.into_client().await?;
|
||||
let millau_sign = millau_sign.parse()?;
|
||||
westend_headers_to_millau::run(westend_client, millau_client, millau_sign, prometheus_params.into()).await;
|
||||
westend_headers_to_millau::run(westend_client, millau_client, millau_sign, prometheus_params.into()).await
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn run_relay_messages(command: cli::RelayMessages) -> Result<(), String> {
|
||||
@@ -202,7 +201,7 @@ async fn run_relay_messages(command: cli::RelayMessages) -> Result<(), String> {
|
||||
lane.into(),
|
||||
prometheus_params.into(),
|
||||
)
|
||||
.await;
|
||||
.await
|
||||
}
|
||||
cli::RelayMessages::RialtoToMillau {
|
||||
rialto,
|
||||
@@ -225,10 +224,9 @@ async fn run_relay_messages(command: cli::RelayMessages) -> Result<(), String> {
|
||||
lane.into(),
|
||||
prometheus_params.into(),
|
||||
)
|
||||
.await;
|
||||
.await
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn run_send_message(command: cli::SendMessage) -> Result<(), String> {
|
||||
|
||||
@@ -61,12 +61,12 @@ pub async fn run(
|
||||
millau_client: MillauClient,
|
||||
millau_sign: MillauSigningParams,
|
||||
metrics_params: Option<relay_utils::metrics::MetricsParams>,
|
||||
) {
|
||||
) -> Result<(), String> {
|
||||
crate::finality_pipeline::run(
|
||||
RialtoFinalityToMillau::new(millau_client.clone(), millau_sign),
|
||||
rialto_client,
|
||||
millau_client,
|
||||
metrics_params,
|
||||
)
|
||||
.await;
|
||||
.await
|
||||
}
|
||||
|
||||
@@ -132,7 +132,7 @@ pub async fn run(
|
||||
millau_sign: MillauSigningParams,
|
||||
lane_id: LaneId,
|
||||
metrics_params: Option<MetricsParams>,
|
||||
) {
|
||||
) -> Result<(), String> {
|
||||
let stall_timeout = Duration::from_secs(5 * 60);
|
||||
let relayer_id_at_rialto = rialto_sign.signer.public().as_array_ref().clone().into();
|
||||
|
||||
@@ -185,5 +185,5 @@ pub async fn run(
|
||||
metrics_params,
|
||||
futures::future::pending(),
|
||||
)
|
||||
.await;
|
||||
.await
|
||||
}
|
||||
|
||||
@@ -61,12 +61,12 @@ pub async fn run(
|
||||
millau_client: MillauClient,
|
||||
millau_sign: MillauSigningParams,
|
||||
metrics_params: Option<relay_utils::metrics::MetricsParams>,
|
||||
) {
|
||||
) -> Result<(), String> {
|
||||
crate::finality_pipeline::run(
|
||||
WestendFinalityToMillau::new(millau_client.clone(), millau_sign),
|
||||
westend_client,
|
||||
millau_client,
|
||||
metrics_params,
|
||||
)
|
||||
.await;
|
||||
.await
|
||||
}
|
||||
|
||||
@@ -26,7 +26,7 @@ use backoff::backoff::Backoff;
|
||||
use futures::{future::FutureExt, select};
|
||||
use num_traits::One;
|
||||
use relay_utils::{
|
||||
metrics::{start as metrics_start, GlobalMetrics, MetricsParams},
|
||||
metrics::{GlobalMetrics, MetricsParams},
|
||||
retry_backoff, FailedClient, MaybeConnectionError,
|
||||
};
|
||||
use std::future::Future;
|
||||
@@ -85,42 +85,25 @@ pub async fn run<P: TransactionProofPipeline>(
|
||||
target_client: impl TargetClient<P>,
|
||||
metrics_params: Option<MetricsParams>,
|
||||
exit_signal: impl Future<Output = ()>,
|
||||
) {
|
||||
) -> Result<(), String> {
|
||||
let exit_signal = exit_signal.shared();
|
||||
let metrics_global = GlobalMetrics::default();
|
||||
let metrics_exch = ExchangeLoopMetrics::default();
|
||||
let metrics_enabled = metrics_params.is_some();
|
||||
metrics_start(
|
||||
format!("{}_to_{}_Exchange", P::SOURCE_NAME, P::TARGET_NAME),
|
||||
metrics_params,
|
||||
&metrics_global,
|
||||
&metrics_exch,
|
||||
);
|
||||
|
||||
relay_utils::relay_loop::run(
|
||||
relay_utils::relay_loop::RECONNECT_DELAY,
|
||||
source_client,
|
||||
target_client,
|
||||
|source_client, target_client| {
|
||||
relay_utils::relay_loop(source_client, target_client)
|
||||
.with_metrics(format!("{}_to_{}_Exchange", P::SOURCE_NAME, P::TARGET_NAME))
|
||||
.loop_metric(ExchangeLoopMetrics::default())?
|
||||
.standalone_metric(GlobalMetrics::default())?
|
||||
.expose(metrics_params)
|
||||
.await?
|
||||
.run(|source_client, target_client, metrics| {
|
||||
run_until_connection_lost(
|
||||
storage.clone(),
|
||||
source_client,
|
||||
target_client,
|
||||
if metrics_enabled {
|
||||
Some(metrics_global.clone())
|
||||
} else {
|
||||
None
|
||||
},
|
||||
if metrics_enabled {
|
||||
Some(metrics_exch.clone())
|
||||
} else {
|
||||
None
|
||||
},
|
||||
metrics,
|
||||
exit_signal.clone(),
|
||||
)
|
||||
},
|
||||
)
|
||||
.await;
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
/// Run proofs synchronization.
|
||||
@@ -128,7 +111,6 @@ async fn run_until_connection_lost<P: TransactionProofPipeline>(
|
||||
mut storage: impl TransactionProofsRelayStorage<BlockNumber = BlockNumberOf<P>>,
|
||||
source_client: impl SourceClient<P>,
|
||||
target_client: impl TargetClient<P>,
|
||||
metrics_global: Option<GlobalMetrics>,
|
||||
metrics_exch: Option<ExchangeLoopMetrics>,
|
||||
exit_signal: impl Future<Output = ()>,
|
||||
) -> Result<(), FailedClient> {
|
||||
@@ -151,10 +133,6 @@ async fn run_until_connection_lost<P: TransactionProofPipeline>(
|
||||
)
|
||||
.await;
|
||||
|
||||
if let Some(ref metrics_global) = metrics_global {
|
||||
metrics_global.update().await;
|
||||
}
|
||||
|
||||
if let Err((is_connection_error, failed_client)) = iteration_result {
|
||||
if is_connection_error {
|
||||
return Err(failed_client);
|
||||
@@ -321,7 +299,7 @@ mod tests {
|
||||
}
|
||||
}));
|
||||
|
||||
async_std::task::block_on(run(
|
||||
let _ = async_std::task::block_on(run(
|
||||
storage,
|
||||
source,
|
||||
target,
|
||||
|
||||
@@ -27,7 +27,7 @@ use futures::{select, Future, FutureExt, Stream, StreamExt};
|
||||
use headers_relay::sync_loop_metrics::SyncLoopMetrics;
|
||||
use num_traits::{One, Saturating};
|
||||
use relay_utils::{
|
||||
metrics::{start as metrics_start, GlobalMetrics, MetricsParams},
|
||||
metrics::{GlobalMetrics, MetricsParams},
|
||||
relay_loop::Client as RelayClient,
|
||||
retry_backoff, FailedClient, MaybeConnectionError,
|
||||
};
|
||||
@@ -97,43 +97,24 @@ pub async fn run<P: FinalitySyncPipeline>(
|
||||
sync_params: FinalitySyncParams,
|
||||
metrics_params: Option<MetricsParams>,
|
||||
exit_signal: impl Future<Output = ()>,
|
||||
) {
|
||||
) -> Result<(), String> {
|
||||
let exit_signal = exit_signal.shared();
|
||||
|
||||
let metrics_global = GlobalMetrics::default();
|
||||
let metrics_sync = SyncLoopMetrics::default();
|
||||
let metrics_enabled = metrics_params.is_some();
|
||||
metrics_start(
|
||||
format!("{}_to_{}_Sync", P::SOURCE_NAME, P::TARGET_NAME),
|
||||
metrics_params,
|
||||
&metrics_global,
|
||||
&metrics_sync,
|
||||
);
|
||||
|
||||
relay_utils::relay_loop::run(
|
||||
relay_utils::relay_loop::RECONNECT_DELAY,
|
||||
source_client,
|
||||
target_client,
|
||||
|source_client, target_client| {
|
||||
relay_utils::relay_loop(source_client, target_client)
|
||||
.with_metrics(format!("{}_to_{}_Sync", P::SOURCE_NAME, P::TARGET_NAME))
|
||||
.loop_metric(SyncLoopMetrics::default())?
|
||||
.standalone_metric(GlobalMetrics::default())?
|
||||
.expose(metrics_params)
|
||||
.await?
|
||||
.run(|source_client, target_client, metrics| {
|
||||
run_until_connection_lost(
|
||||
source_client,
|
||||
target_client,
|
||||
sync_params.clone(),
|
||||
if metrics_enabled {
|
||||
Some(metrics_global.clone())
|
||||
} else {
|
||||
None
|
||||
},
|
||||
if metrics_enabled {
|
||||
Some(metrics_sync.clone())
|
||||
} else {
|
||||
None
|
||||
},
|
||||
metrics,
|
||||
exit_signal.clone(),
|
||||
)
|
||||
},
|
||||
)
|
||||
.await;
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
/// Unjustified headers container. Ordered by header number.
|
||||
@@ -221,7 +202,6 @@ async fn run_until_connection_lost<P: FinalitySyncPipeline>(
|
||||
source_client: impl SourceClient<P>,
|
||||
target_client: impl TargetClient<P>,
|
||||
sync_params: FinalitySyncParams,
|
||||
metrics_global: Option<GlobalMetrics>,
|
||||
metrics_sync: Option<SyncLoopMetrics>,
|
||||
exit_signal: impl Future<Output = ()>,
|
||||
) -> Result<(), FailedClient> {
|
||||
@@ -267,11 +247,6 @@ async fn run_until_connection_lost<P: FinalitySyncPipeline>(
|
||||
)
|
||||
.await;
|
||||
|
||||
// update global metrics
|
||||
if let Some(ref metrics_global) = metrics_global {
|
||||
metrics_global.update().await;
|
||||
}
|
||||
|
||||
// deal with errors
|
||||
let next_tick = match iteration_result {
|
||||
Ok(updated_last_transaction) => {
|
||||
|
||||
@@ -202,7 +202,7 @@ fn run_sync_loop(state_function: impl Fn(&mut ClientsData) -> bool + Send + Sync
|
||||
stall_timeout: Duration::from_secs(1),
|
||||
};
|
||||
|
||||
async_std::task::block_on(run(
|
||||
let _ = async_std::task::block_on(run(
|
||||
source_client,
|
||||
target_client,
|
||||
sync_params,
|
||||
|
||||
@@ -25,7 +25,7 @@ use futures::{future::FutureExt, stream::StreamExt};
|
||||
use num_traits::{Saturating, Zero};
|
||||
use relay_utils::{
|
||||
format_ids, interval,
|
||||
metrics::{start as metrics_start, GlobalMetrics, MetricsParams},
|
||||
metrics::{GlobalMetrics, MetricsParams},
|
||||
process_future_result,
|
||||
relay_loop::Client as RelayClient,
|
||||
retry_backoff, FailedClient, MaybeConnectionError, StringifiedMaybeConnectionError,
|
||||
@@ -121,24 +121,15 @@ pub async fn run<P: HeadersSyncPipeline, TC: TargetClient<P>>(
|
||||
sync_params: HeadersSyncParams,
|
||||
metrics_params: Option<MetricsParams>,
|
||||
exit_signal: impl Future<Output = ()>,
|
||||
) {
|
||||
) -> Result<(), String> {
|
||||
let exit_signal = exit_signal.shared();
|
||||
|
||||
let metrics_global = GlobalMetrics::default();
|
||||
let metrics_sync = SyncLoopMetrics::default();
|
||||
let metrics_enabled = metrics_params.is_some();
|
||||
metrics_start(
|
||||
format!("{}_to_{}_Sync", P::SOURCE_NAME, P::TARGET_NAME),
|
||||
metrics_params,
|
||||
&metrics_global,
|
||||
&metrics_sync,
|
||||
);
|
||||
|
||||
relay_utils::relay_loop::run(
|
||||
relay_utils::relay_loop::RECONNECT_DELAY,
|
||||
source_client,
|
||||
target_client,
|
||||
|source_client, target_client| {
|
||||
relay_utils::relay_loop(source_client, target_client)
|
||||
.with_metrics(format!("{}_to_{}_Sync", P::SOURCE_NAME, P::TARGET_NAME))
|
||||
.loop_metric(SyncLoopMetrics::default())?
|
||||
.standalone_metric(GlobalMetrics::default())?
|
||||
.expose(metrics_params)
|
||||
.await?
|
||||
.run(|source_client, target_client, metrics| {
|
||||
run_until_connection_lost(
|
||||
source_client,
|
||||
source_tick,
|
||||
@@ -146,21 +137,11 @@ pub async fn run<P: HeadersSyncPipeline, TC: TargetClient<P>>(
|
||||
target_tick,
|
||||
sync_maintain.clone(),
|
||||
sync_params.clone(),
|
||||
if metrics_enabled {
|
||||
Some(metrics_global.clone())
|
||||
} else {
|
||||
None
|
||||
},
|
||||
if metrics_enabled {
|
||||
Some(metrics_sync.clone())
|
||||
} else {
|
||||
None
|
||||
},
|
||||
metrics,
|
||||
exit_signal.clone(),
|
||||
)
|
||||
},
|
||||
)
|
||||
.await;
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
/// Run headers synchronization.
|
||||
@@ -172,7 +153,6 @@ async fn run_until_connection_lost<P: HeadersSyncPipeline, TC: TargetClient<P>>(
|
||||
target_tick: Duration,
|
||||
sync_maintain: impl SyncMaintain<P>,
|
||||
sync_params: HeadersSyncParams,
|
||||
metrics_global: Option<GlobalMetrics>,
|
||||
metrics_sync: Option<SyncLoopMetrics>,
|
||||
exit_signal: impl Future<Output = ()>,
|
||||
) -> Result<(), FailedClient> {
|
||||
@@ -438,9 +418,6 @@ async fn run_until_connection_lost<P: HeadersSyncPipeline, TC: TargetClient<P>>(
|
||||
}
|
||||
|
||||
// update metrics
|
||||
if let Some(ref metrics_global) = metrics_global {
|
||||
metrics_global.update().await;
|
||||
}
|
||||
if let Some(ref metrics_sync) = metrics_sync {
|
||||
metrics_sync.update(&sync);
|
||||
}
|
||||
|
||||
@@ -493,7 +493,7 @@ fn run_sync_loop_test(params: SyncLoopTestParams) {
|
||||
target.data.lock().requires_extra = target_requires_extra;
|
||||
target.data.lock().requires_completion = target_requires_completion;
|
||||
|
||||
async_std::task::block_on(run(
|
||||
let _ = async_std::task::block_on(run(
|
||||
source,
|
||||
test_tick(),
|
||||
target,
|
||||
|
||||
@@ -34,7 +34,7 @@ use bp_messages::{LaneId, MessageNonce, UnrewardedRelayersState, Weight};
|
||||
use futures::{channel::mpsc::unbounded, future::FutureExt, stream::StreamExt};
|
||||
use relay_utils::{
|
||||
interval,
|
||||
metrics::{start as metrics_start, GlobalMetrics, MetricsParams},
|
||||
metrics::{GlobalMetrics, MetricsParams},
|
||||
process_future_result,
|
||||
relay_loop::Client as RelayClient,
|
||||
retry_backoff, FailedClient,
|
||||
@@ -212,47 +212,29 @@ pub async fn run<P: MessageLane>(
|
||||
target_client: impl TargetClient<P>,
|
||||
metrics_params: Option<MetricsParams>,
|
||||
exit_signal: impl Future<Output = ()>,
|
||||
) {
|
||||
) -> Result<(), String> {
|
||||
let exit_signal = exit_signal.shared();
|
||||
let metrics_global = GlobalMetrics::default();
|
||||
let metrics_msg = MessageLaneLoopMetrics::default();
|
||||
let metrics_enabled = metrics_params.is_some();
|
||||
metrics_start(
|
||||
format!(
|
||||
relay_utils::relay_loop(source_client, target_client)
|
||||
.with_metrics(format!(
|
||||
"{}_to_{}_MessageLane_{}",
|
||||
P::SOURCE_NAME,
|
||||
P::TARGET_NAME,
|
||||
hex::encode(params.lane)
|
||||
),
|
||||
metrics_params,
|
||||
&metrics_global,
|
||||
&metrics_msg,
|
||||
);
|
||||
|
||||
relay_utils::relay_loop::run(
|
||||
params.reconnect_delay,
|
||||
source_client,
|
||||
target_client,
|
||||
|source_client, target_client| {
|
||||
))
|
||||
.loop_metric(MessageLaneLoopMetrics::default())?
|
||||
.standalone_metric(GlobalMetrics::default())?
|
||||
.expose(metrics_params)
|
||||
.await?
|
||||
.run(|source_client, target_client, metrics| {
|
||||
run_until_connection_lost(
|
||||
params.clone(),
|
||||
source_client,
|
||||
target_client,
|
||||
if metrics_enabled {
|
||||
Some(metrics_global.clone())
|
||||
} else {
|
||||
None
|
||||
},
|
||||
if metrics_enabled {
|
||||
Some(metrics_msg.clone())
|
||||
} else {
|
||||
None
|
||||
},
|
||||
metrics,
|
||||
exit_signal.clone(),
|
||||
)
|
||||
},
|
||||
)
|
||||
.await;
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
/// Run one-way message delivery loop until connection with target or source node is lost, or exit signal is received.
|
||||
@@ -260,7 +242,6 @@ async fn run_until_connection_lost<P: MessageLane, SC: SourceClient<P>, TC: Targ
|
||||
params: Params,
|
||||
source_client: SC,
|
||||
target_client: TC,
|
||||
metrics_global: Option<GlobalMetrics>,
|
||||
metrics_msg: Option<MessageLaneLoopMetrics>,
|
||||
exit_signal: impl Future<Output = ()>,
|
||||
) -> Result<(), FailedClient> {
|
||||
@@ -404,10 +385,6 @@ async fn run_until_connection_lost<P: MessageLane, SC: SourceClient<P>, TC: Targ
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(ref metrics_global) = metrics_global {
|
||||
metrics_global.update().await;
|
||||
}
|
||||
|
||||
if source_client_is_online && source_state_required {
|
||||
log::debug!(target: "bridge", "Asking {} node about its state", P::SOURCE_NAME);
|
||||
source_state.set(source_client.state().fuse());
|
||||
@@ -707,7 +684,7 @@ pub(crate) mod tests {
|
||||
data: data.clone(),
|
||||
tick: target_tick,
|
||||
};
|
||||
run(
|
||||
let _ = run(
|
||||
Params {
|
||||
lane: [0, 0, 0, 0],
|
||||
source_tick: Duration::from_millis(100),
|
||||
|
||||
@@ -16,6 +16,8 @@
|
||||
|
||||
//! Utilities used by different relays.
|
||||
|
||||
pub use relay_loop::relay_loop;
|
||||
|
||||
use backoff::{backoff::Backoff, ExponentialBackoff};
|
||||
use futures::future::FutureExt;
|
||||
use std::time::Duration;
|
||||
|
||||
@@ -14,12 +14,13 @@
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
pub use global::GlobalMetrics;
|
||||
pub use substrate_prometheus_endpoint::{register, Counter, CounterVec, Gauge, GaugeVec, Opts, Registry, F64, U64};
|
||||
|
||||
use async_std::sync::{Arc, Mutex};
|
||||
use std::net::SocketAddr;
|
||||
use substrate_prometheus_endpoint::init_prometheus;
|
||||
use sysinfo::{ProcessExt, RefreshKind, System, SystemExt};
|
||||
use async_trait::async_trait;
|
||||
use std::time::Duration;
|
||||
|
||||
mod global;
|
||||
|
||||
/// Prometheus endpoint MetricsParams.
|
||||
#[derive(Debug, Clone)]
|
||||
@@ -31,62 +32,32 @@ pub struct MetricsParams {
|
||||
}
|
||||
|
||||
/// Metrics API.
|
||||
pub trait Metrics {
|
||||
pub trait Metrics: Clone + Send + Sync + 'static {
|
||||
/// Register metrics in the registry.
|
||||
fn register(&self, registry: &Registry) -> Result<(), String>;
|
||||
}
|
||||
|
||||
/// Global Prometheus metrics.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct GlobalMetrics {
|
||||
system: Arc<Mutex<System>>,
|
||||
system_average_load: GaugeVec<F64>,
|
||||
process_cpu_usage_percentage: Gauge<F64>,
|
||||
process_memory_usage_bytes: Gauge<U64>,
|
||||
}
|
||||
/// Standalone metrics API.
|
||||
///
|
||||
/// Metrics of this kind know how to update themselves, so we may just spawn and forget the
|
||||
/// asynchronous self-update task.
|
||||
#[async_trait]
|
||||
pub trait StandaloneMetrics: Metrics {
|
||||
/// Update metric values.
|
||||
async fn update(&self);
|
||||
|
||||
/// Start Prometheus endpoint with given metrics registry.
|
||||
pub fn start(
|
||||
prefix: String,
|
||||
params: Option<MetricsParams>,
|
||||
global_metrics: &GlobalMetrics,
|
||||
extra_metrics: &impl Metrics,
|
||||
) {
|
||||
let params = match params {
|
||||
Some(params) => params,
|
||||
None => return,
|
||||
};
|
||||
/// Metrics update interval.
|
||||
fn update_interval(&self) -> Duration;
|
||||
|
||||
assert!(!prefix.is_empty(), "Metrics prefix can not be empty");
|
||||
|
||||
let do_start = move || {
|
||||
let prometheus_socket_addr = SocketAddr::new(
|
||||
params
|
||||
.host
|
||||
.parse()
|
||||
.map_err(|err| format!("Invalid Prometheus host {}: {}", params.host, err))?,
|
||||
params.port,
|
||||
);
|
||||
let metrics_registry =
|
||||
Registry::new_custom(Some(prefix), None).expect("only fails if prefix is empty; prefix is not empty; qed");
|
||||
global_metrics.register(&metrics_registry)?;
|
||||
extra_metrics.register(&metrics_registry)?;
|
||||
/// Spawn the self update task that will keep update metric value at given intervals.
|
||||
fn spawn(self) {
|
||||
async_std::task::spawn(async move {
|
||||
init_prometheus(prometheus_socket_addr, metrics_registry)
|
||||
.await
|
||||
.map_err(|err| format!("Error starting Prometheus endpoint: {}", err))
|
||||
let update_interval = self.update_interval();
|
||||
loop {
|
||||
self.update().await;
|
||||
async_std::task::sleep(update_interval).await;
|
||||
}
|
||||
});
|
||||
|
||||
Ok(())
|
||||
};
|
||||
|
||||
let result: Result<(), String> = do_start();
|
||||
if let Err(err) = result {
|
||||
log::warn!(
|
||||
target: "bridge",
|
||||
"Failed to expose metrics: {}",
|
||||
err,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -98,71 +69,3 @@ impl Default for MetricsParams {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Metrics for GlobalMetrics {
|
||||
fn register(&self, registry: &Registry) -> Result<(), String> {
|
||||
register(self.system_average_load.clone(), registry).map_err(|e| e.to_string())?;
|
||||
register(self.process_cpu_usage_percentage.clone(), registry).map_err(|e| e.to_string())?;
|
||||
register(self.process_memory_usage_bytes.clone(), registry).map_err(|e| e.to_string())?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for GlobalMetrics {
|
||||
fn default() -> Self {
|
||||
GlobalMetrics {
|
||||
system: Arc::new(Mutex::new(System::new_with_specifics(RefreshKind::everything()))),
|
||||
system_average_load: GaugeVec::new(Opts::new("system_average_load", "System load average"), &["over"])
|
||||
.expect("metric is static and thus valid; qed"),
|
||||
process_cpu_usage_percentage: Gauge::new("process_cpu_usage_percentage", "Process CPU usage")
|
||||
.expect("metric is static and thus valid; qed"),
|
||||
process_memory_usage_bytes: Gauge::new(
|
||||
"process_memory_usage_bytes",
|
||||
"Process memory (resident set size) usage",
|
||||
)
|
||||
.expect("metric is static and thus valid; qed"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl GlobalMetrics {
|
||||
/// Update metrics.
|
||||
pub async fn update(&self) {
|
||||
// update system-wide metrics
|
||||
let mut system = self.system.lock().await;
|
||||
let load = system.get_load_average();
|
||||
self.system_average_load.with_label_values(&["1min"]).set(load.one);
|
||||
self.system_average_load.with_label_values(&["5min"]).set(load.five);
|
||||
self.system_average_load.with_label_values(&["15min"]).set(load.fifteen);
|
||||
|
||||
// update process-related metrics
|
||||
let pid = sysinfo::get_current_pid().expect(
|
||||
"only fails where pid is unavailable (os=unknown || arch=wasm32);\
|
||||
relay is not supposed to run in such MetricsParamss;\
|
||||
qed",
|
||||
);
|
||||
let is_process_refreshed = system.refresh_process(pid);
|
||||
match (is_process_refreshed, system.get_process(pid)) {
|
||||
(true, Some(process_info)) => {
|
||||
let cpu_usage = process_info.cpu_usage() as f64;
|
||||
let memory_usage = process_info.memory() * 1024;
|
||||
log::trace!(
|
||||
target: "bridge-metrics",
|
||||
"Refreshed process metrics: CPU={}, memory={}",
|
||||
cpu_usage,
|
||||
memory_usage,
|
||||
);
|
||||
|
||||
self.process_cpu_usage_percentage
|
||||
.set(if cpu_usage.is_finite() { cpu_usage } else { 0f64 });
|
||||
self.process_memory_usage_bytes.set(memory_usage);
|
||||
}
|
||||
_ => {
|
||||
log::warn!(
|
||||
target: "bridge",
|
||||
"Failed to refresh process information. Metrics may show obsolete values",
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,109 @@
|
||||
// Copyright 2019-2021 Parity Technologies (UK) Ltd.
|
||||
// This file is part of Parity Bridges Common.
|
||||
|
||||
// Parity Bridges Common is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// Parity Bridges Common is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
//! Global system-wide Prometheus metrics exposed by relays.
|
||||
|
||||
use crate::metrics::{Metrics, StandaloneMetrics};
|
||||
|
||||
use async_std::sync::{Arc, Mutex};
|
||||
use async_trait::async_trait;
|
||||
use std::time::Duration;
|
||||
use substrate_prometheus_endpoint::{register, Gauge, GaugeVec, Opts, Registry, F64, U64};
|
||||
use sysinfo::{ProcessExt, RefreshKind, System, SystemExt};
|
||||
|
||||
/// Global metrics update interval.
|
||||
const UPDATE_INTERVAL: Duration = Duration::from_secs(10);
|
||||
|
||||
/// Global Prometheus metrics.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct GlobalMetrics {
|
||||
system: Arc<Mutex<System>>,
|
||||
system_average_load: GaugeVec<F64>,
|
||||
process_cpu_usage_percentage: Gauge<F64>,
|
||||
process_memory_usage_bytes: Gauge<U64>,
|
||||
}
|
||||
|
||||
impl Metrics for GlobalMetrics {
|
||||
fn register(&self, registry: &Registry) -> Result<(), String> {
|
||||
register(self.system_average_load.clone(), registry).map_err(|e| e.to_string())?;
|
||||
register(self.process_cpu_usage_percentage.clone(), registry).map_err(|e| e.to_string())?;
|
||||
register(self.process_memory_usage_bytes.clone(), registry).map_err(|e| e.to_string())?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl StandaloneMetrics for GlobalMetrics {
|
||||
async fn update(&self) {
|
||||
// update system-wide metrics
|
||||
let mut system = self.system.lock().await;
|
||||
let load = system.get_load_average();
|
||||
self.system_average_load.with_label_values(&["1min"]).set(load.one);
|
||||
self.system_average_load.with_label_values(&["5min"]).set(load.five);
|
||||
self.system_average_load.with_label_values(&["15min"]).set(load.fifteen);
|
||||
|
||||
// update process-related metrics
|
||||
let pid = sysinfo::get_current_pid().expect(
|
||||
"only fails where pid is unavailable (os=unknown || arch=wasm32);\
|
||||
relay is not supposed to run in such MetricsParamss;\
|
||||
qed",
|
||||
);
|
||||
let is_process_refreshed = system.refresh_process(pid);
|
||||
match (is_process_refreshed, system.get_process(pid)) {
|
||||
(true, Some(process_info)) => {
|
||||
let cpu_usage = process_info.cpu_usage() as f64;
|
||||
let memory_usage = process_info.memory() * 1024;
|
||||
log::trace!(
|
||||
target: "bridge-metrics",
|
||||
"Refreshed process metrics: CPU={}, memory={}",
|
||||
cpu_usage,
|
||||
memory_usage,
|
||||
);
|
||||
|
||||
self.process_cpu_usage_percentage
|
||||
.set(if cpu_usage.is_finite() { cpu_usage } else { 0f64 });
|
||||
self.process_memory_usage_bytes.set(memory_usage);
|
||||
}
|
||||
_ => {
|
||||
log::warn!(
|
||||
target: "bridge",
|
||||
"Failed to refresh process information. Metrics may show obsolete values",
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn update_interval(&self) -> Duration {
|
||||
UPDATE_INTERVAL
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for GlobalMetrics {
|
||||
fn default() -> Self {
|
||||
GlobalMetrics {
|
||||
system: Arc::new(Mutex::new(System::new_with_specifics(RefreshKind::everything()))),
|
||||
system_average_load: GaugeVec::new(Opts::new("system_average_load", "System load average"), &["over"])
|
||||
.expect("metric is static and thus valid; qed"),
|
||||
process_cpu_usage_percentage: Gauge::new("process_cpu_usage_percentage", "Process CPU usage")
|
||||
.expect("metric is static and thus valid; qed"),
|
||||
process_memory_usage_bytes: Gauge::new(
|
||||
"process_memory_usage_bytes",
|
||||
"Process memory (resident set size) usage",
|
||||
)
|
||||
.expect("metric is static and thus valid; qed"),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -14,10 +14,12 @@
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
use crate::metrics::{Metrics, MetricsParams, StandaloneMetrics};
|
||||
use crate::{FailedClient, MaybeConnectionError};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use std::{fmt::Debug, future::Future, time::Duration};
|
||||
use std::{fmt::Debug, future::Future, net::SocketAddr, time::Duration};
|
||||
use substrate_prometheus_endpoint::{init_prometheus, Registry};
|
||||
|
||||
/// Default pause between reconnect attempts.
|
||||
pub const RECONNECT_DELAY: Duration = Duration::from_secs(10);
|
||||
@@ -32,60 +34,164 @@ pub trait Client: Clone + Send + Sync {
|
||||
async fn reconnect(&mut self) -> Result<(), Self::Error>;
|
||||
}
|
||||
|
||||
/// Run relay loop.
|
||||
///
|
||||
/// This function represents an outer loop, which in turn calls provided `loop_run` function to do
|
||||
/// actual job. When `loop_run` returns, this outer loop reconnects to failed client (source,
|
||||
/// target or both) and calls `loop_run` again.
|
||||
pub async fn run<SC: Client, TC: Client, R, F>(
|
||||
reconnect_delay: Duration,
|
||||
mut source_client: SC,
|
||||
mut target_client: TC,
|
||||
loop_run: R,
|
||||
) where
|
||||
R: Fn(SC, TC) -> F,
|
||||
F: Future<Output = Result<(), FailedClient>>,
|
||||
{
|
||||
loop {
|
||||
let result = loop_run(source_client.clone(), target_client.clone()).await;
|
||||
|
||||
match result {
|
||||
Ok(()) => break,
|
||||
Err(failed_client) => loop {
|
||||
async_std::task::sleep(reconnect_delay).await;
|
||||
if failed_client == FailedClient::Both || failed_client == FailedClient::Source {
|
||||
match source_client.reconnect().await {
|
||||
Ok(()) => (),
|
||||
Err(error) => {
|
||||
log::warn!(
|
||||
target: "bridge",
|
||||
"Failed to reconnect to source client. Going to retry in {}s: {:?}",
|
||||
reconnect_delay.as_secs(),
|
||||
error,
|
||||
);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
if failed_client == FailedClient::Both || failed_client == FailedClient::Target {
|
||||
match target_client.reconnect().await {
|
||||
Ok(()) => (),
|
||||
Err(error) => {
|
||||
log::warn!(
|
||||
target: "bridge",
|
||||
"Failed to reconnect to target client. Going to retry in {}s: {:?}",
|
||||
reconnect_delay.as_secs(),
|
||||
error,
|
||||
);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
break;
|
||||
},
|
||||
}
|
||||
|
||||
log::debug!(target: "bridge", "Restarting relay loop");
|
||||
/// Returns generic loop that may be customized and started.
|
||||
pub fn relay_loop<SC, TC>(source_client: SC, target_client: TC) -> Loop<SC, TC, ()> {
|
||||
Loop {
|
||||
source_client,
|
||||
target_client,
|
||||
loop_metric: None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Generic relay loop.
|
||||
pub struct Loop<SC, TC, LM> {
|
||||
source_client: SC,
|
||||
target_client: TC,
|
||||
loop_metric: Option<LM>,
|
||||
}
|
||||
|
||||
/// Relay loop metrics builder.
|
||||
pub struct LoopMetrics<SC, TC, LM> {
|
||||
relay_loop: Loop<SC, TC, ()>,
|
||||
registry: Registry,
|
||||
loop_metric: Option<LM>,
|
||||
}
|
||||
|
||||
impl<SC, TC, LM> Loop<SC, TC, LM> {
|
||||
/// Start building loop metrics using given prefix.
|
||||
///
|
||||
/// Panics if `prefix` is empty.
|
||||
pub fn with_metrics(self, prefix: String) -> LoopMetrics<SC, TC, ()> {
|
||||
assert!(!prefix.is_empty(), "Metrics prefix can not be empty");
|
||||
|
||||
LoopMetrics {
|
||||
relay_loop: Loop {
|
||||
source_client: self.source_client,
|
||||
target_client: self.target_client,
|
||||
loop_metric: None,
|
||||
},
|
||||
registry: Registry::new_custom(Some(prefix), None)
|
||||
.expect("only fails if prefix is empty; prefix is not empty; qed"),
|
||||
loop_metric: None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Run relay loop.
|
||||
///
|
||||
/// This function represents an outer loop, which in turn calls provided `run_loop` function to do
|
||||
/// actual job. When `run_loop` returns, this outer loop reconnects to failed client (source,
|
||||
/// target or both) and calls `run_loop` again.
|
||||
pub async fn run<R, F>(mut self, run_loop: R) -> Result<(), String>
|
||||
where
|
||||
R: Fn(SC, TC, Option<LM>) -> F,
|
||||
F: Future<Output = Result<(), FailedClient>>,
|
||||
SC: Client,
|
||||
TC: Client,
|
||||
LM: Clone,
|
||||
{
|
||||
loop {
|
||||
let result = run_loop(
|
||||
self.source_client.clone(),
|
||||
self.target_client.clone(),
|
||||
self.loop_metric.clone(),
|
||||
)
|
||||
.await;
|
||||
|
||||
match result {
|
||||
Ok(()) => break,
|
||||
Err(failed_client) => loop {
|
||||
async_std::task::sleep(RECONNECT_DELAY).await;
|
||||
if failed_client == FailedClient::Both || failed_client == FailedClient::Source {
|
||||
match self.source_client.reconnect().await {
|
||||
Ok(()) => (),
|
||||
Err(error) => {
|
||||
log::warn!(
|
||||
target: "bridge",
|
||||
"Failed to reconnect to source client. Going to retry in {}s: {:?}",
|
||||
RECONNECT_DELAY.as_secs(),
|
||||
error,
|
||||
);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
if failed_client == FailedClient::Both || failed_client == FailedClient::Target {
|
||||
match self.target_client.reconnect().await {
|
||||
Ok(()) => (),
|
||||
Err(error) => {
|
||||
log::warn!(
|
||||
target: "bridge",
|
||||
"Failed to reconnect to target client. Going to retry in {}s: {:?}",
|
||||
RECONNECT_DELAY.as_secs(),
|
||||
error,
|
||||
);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
break;
|
||||
},
|
||||
}
|
||||
|
||||
log::debug!(target: "bridge", "Restarting relay loop");
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl<SC, TC, LM> LoopMetrics<SC, TC, LM> {
|
||||
/// Add relay loop metrics.
|
||||
///
|
||||
/// Loop metrics will be passed to the loop callback.
|
||||
pub fn loop_metric<NewLM: Metrics>(self, loop_metric: NewLM) -> Result<LoopMetrics<SC, TC, NewLM>, String> {
|
||||
loop_metric.register(&self.registry)?;
|
||||
|
||||
Ok(LoopMetrics {
|
||||
relay_loop: self.relay_loop,
|
||||
registry: self.registry,
|
||||
loop_metric: Some(loop_metric),
|
||||
})
|
||||
}
|
||||
|
||||
/// Add standalone metrics.
|
||||
pub fn standalone_metric<M: StandaloneMetrics>(self, standalone_metrics: M) -> Result<Self, String> {
|
||||
standalone_metrics.register(&self.registry)?;
|
||||
standalone_metrics.spawn();
|
||||
Ok(self)
|
||||
}
|
||||
|
||||
/// Expose metrics using given params.
|
||||
///
|
||||
/// If `params` is `None`, metrics are not exposed.
|
||||
pub async fn expose(self, params: Option<MetricsParams>) -> Result<Loop<SC, TC, LM>, String> {
|
||||
if let Some(params) = params {
|
||||
let socket_addr = SocketAddr::new(
|
||||
params.host.parse().map_err(|err| {
|
||||
format!(
|
||||
"Invalid host {} is used to expose Prometheus metrics: {}",
|
||||
params.host, err,
|
||||
)
|
||||
})?,
|
||||
params.port,
|
||||
);
|
||||
|
||||
let registry = self.registry;
|
||||
async_std::task::spawn(async move {
|
||||
let result = init_prometheus(socket_addr, registry).await;
|
||||
log::trace!(
|
||||
target: "bridge-metrics",
|
||||
"Prometheus endpoint has exited with result: {:?}",
|
||||
result,
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
Ok(Loop {
|
||||
source_client: self.relay_loop.source_client,
|
||||
target_client: self.relay_loop.target_client,
|
||||
loop_metric: self.loop_metric,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user