From 63ce1b59732736012b34509125efe2bc8fd1966d Mon Sep 17 00:00:00 2001 From: Svyatoslav Nikolsky Date: Thu, 22 Apr 2021 14:44:39 +0300 Subject: [PATCH] Prefix in relay loops logs (#920) * prefix in relay loops logs * fmt --- .../bin-substrate/src/finality_pipeline.rs | 2 +- .../bin-substrate/src/messages_source.rs | 2 +- .../bin-substrate/src/messages_target.rs | 2 +- bridges/relays/exchange/src/exchange.rs | 15 +-- bridges/relays/exchange/src/exchange_loop.rs | 12 +-- bridges/relays/finality/src/finality_loop.rs | 6 +- .../finality/src/finality_loop_tests.rs | 2 +- bridges/relays/finality/src/lib.rs | 2 +- bridges/relays/headers/src/sync_loop.rs | 8 +- bridges/relays/headers/src/sync_types.rs | 2 +- bridges/relays/messages/src/message_lane.rs | 2 +- .../relays/messages/src/message_lane_loop.rs | 25 +++-- bridges/relays/utils/src/initialize.rs | 46 +++++++- bridges/relays/utils/src/relay_loop.rs | 100 +++++++++--------- 14 files changed, 137 insertions(+), 89 deletions(-) diff --git a/bridges/relays/bin-substrate/src/finality_pipeline.rs b/bridges/relays/bin-substrate/src/finality_pipeline.rs index bc8461f6a8..9538f35778 100644 --- a/bridges/relays/bin-substrate/src/finality_pipeline.rs +++ b/bridges/relays/bin-substrate/src/finality_pipeline.rs @@ -96,7 +96,7 @@ where SourceChain: Clone + Chain + Debug, BlockNumberOf: BlockNumberBase, TargetChain: Clone + Chain + Debug, - TargetSign: Clone + Send + Sync, + TargetSign: 'static + Clone + Send + Sync, { const SOURCE_NAME: &'static str = SourceChain::NAME; const TARGET_NAME: &'static str = TargetChain::NAME; diff --git a/bridges/relays/bin-substrate/src/messages_source.rs b/bridges/relays/bin-substrate/src/messages_source.rs index cf98f3276b..0ccf8bbde8 100644 --- a/bridges/relays/bin-substrate/src/messages_source.rs +++ b/bridges/relays/bin-substrate/src/messages_source.rs @@ -93,7 +93,7 @@ impl RelayClient for SubstrateMessagesSource where C: Chain, P: SubstrateMessageLane, - R: Send + Sync, + R: 'static + Send + Sync, I: Send + Sync + Instance, { type Error = SubstrateError; diff --git a/bridges/relays/bin-substrate/src/messages_target.rs b/bridges/relays/bin-substrate/src/messages_target.rs index 1760832730..39f638d7e9 100644 --- a/bridges/relays/bin-substrate/src/messages_target.rs +++ b/bridges/relays/bin-substrate/src/messages_target.rs @@ -93,7 +93,7 @@ impl RelayClient for SubstrateMessagesTarget where C: Chain, P: SubstrateMessageLane, - R: Send + Sync, + R: 'static + Send + Sync, I: Send + Sync + Instance, { type Error = SubstrateError; diff --git a/bridges/relays/exchange/src/exchange.rs b/bridges/relays/exchange/src/exchange.rs index cec0d7cba1..4a2f07fa7f 100644 --- a/bridges/relays/exchange/src/exchange.rs +++ b/bridges/relays/exchange/src/exchange.rs @@ -26,7 +26,7 @@ use std::{ }; /// Transaction proof pipeline. -pub trait TransactionProofPipeline { +pub trait TransactionProofPipeline: 'static { /// Name of the transaction proof source. const SOURCE_NAME: &'static str; /// Name of the transaction proof target. @@ -35,18 +35,21 @@ pub trait TransactionProofPipeline { /// Block type. type Block: SourceBlock; /// Transaction inclusion proof type. - type TransactionProof; + type TransactionProof: 'static + Send + Sync; } /// Block that is participating in exchange. -pub trait SourceBlock { +pub trait SourceBlock: 'static + Send + Sync { /// Block hash type. - type Hash: Clone + Debug + Display; + type Hash: 'static + Clone + Send + Sync + Debug + Display; /// Block number type. - type Number: Debug + type Number: 'static + + Debug + Display + Clone + Copy + + Send + + Sync + Into + std::cmp::Ord + std::ops::Add @@ -61,7 +64,7 @@ pub trait SourceBlock { } /// Transaction that is participating in exchange. -pub trait SourceTransaction { +pub trait SourceTransaction: 'static + Send { /// Transaction hash type. type Hash: Debug + Display; diff --git a/bridges/relays/exchange/src/exchange_loop.rs b/bridges/relays/exchange/src/exchange_loop.rs index b46d34e047..4525c33e36 100644 --- a/bridges/relays/exchange/src/exchange_loop.rs +++ b/bridges/relays/exchange/src/exchange_loop.rs @@ -39,9 +39,9 @@ pub struct TransactionProofsRelayState { } /// Transactions proofs relay storage. -pub trait TransactionProofsRelayStorage: Clone { +pub trait TransactionProofsRelayStorage: 'static + Clone + Send + Sync { /// Associated block number. - type BlockNumber; + type BlockNumber: 'static + Send + Sync; /// Get relay state. fn state(&self) -> TransactionProofsRelayState; @@ -64,7 +64,7 @@ impl InMemoryStorage { } } -impl TransactionProofsRelayStorage for InMemoryStorage { +impl TransactionProofsRelayStorage for InMemoryStorage { type BlockNumber = BlockNumber; fn state(&self) -> TransactionProofsRelayState { @@ -89,7 +89,7 @@ pub async fn run( source_client: impl SourceClient

, target_client: impl TargetClient

, metrics_params: MetricsParams, - exit_signal: impl Future, + exit_signal: impl Future + 'static + Send, ) -> Result<(), String> { let exit_signal = exit_signal.shared(); @@ -99,7 +99,7 @@ pub async fn run( .standalone_metric(|registry, prefix| GlobalMetrics::new(registry, prefix))? .expose() .await? - .run(|source_client, target_client, metrics| { + .run(metrics_prefix::

(), move |source_client, target_client, metrics| { run_until_connection_lost( storage.clone(), source_client, @@ -117,7 +117,7 @@ async fn run_until_connection_lost( source_client: impl SourceClient

, target_client: impl TargetClient

, metrics_exch: Option, - exit_signal: impl Future, + exit_signal: impl Future + Send, ) -> Result<(), FailedClient> { let mut retry_backoff = retry_backoff(); let mut state = storage.state(); diff --git a/bridges/relays/finality/src/finality_loop.rs b/bridges/relays/finality/src/finality_loop.rs index aff32e46de..cce3283990 100644 --- a/bridges/relays/finality/src/finality_loop.rs +++ b/bridges/relays/finality/src/finality_loop.rs @@ -65,7 +65,7 @@ pub struct FinalitySyncParams { pub trait SourceClient: RelayClient { /// Stream of new finality proofs. The stream is allowed to miss proofs for some /// headers, even if those headers are mandatory. - type FinalityProofsStream: Stream; + type FinalityProofsStream: Stream + Send; /// Get best finalized block number. async fn best_finalized_block_number(&self) -> Result; @@ -101,7 +101,7 @@ pub async fn run( target_client: impl TargetClient

, sync_params: FinalitySyncParams, metrics_params: MetricsParams, - exit_signal: impl Future, + exit_signal: impl Future + 'static + Send, ) -> Result<(), String> { let exit_signal = exit_signal.shared(); relay_utils::relay_loop(source_client, target_client) @@ -110,7 +110,7 @@ pub async fn run( .standalone_metric(|registry, prefix| GlobalMetrics::new(registry, prefix))? .expose() .await? - .run(|source_client, target_client, metrics| { + .run(metrics_prefix::

(), move |source_client, target_client, metrics| { run_until_connection_lost( source_client, target_client, diff --git a/bridges/relays/finality/src/finality_loop_tests.rs b/bridges/relays/finality/src/finality_loop_tests.rs index eedd902003..f7826ead73 100644 --- a/bridges/relays/finality/src/finality_loop_tests.rs +++ b/bridges/relays/finality/src/finality_loop_tests.rs @@ -106,7 +106,7 @@ impl RelayClient for TestSourceClient { #[async_trait] impl SourceClient for TestSourceClient { - type FinalityProofsStream = Pin>>; + type FinalityProofsStream = Pin + 'static + Send>>; async fn best_finalized_block_number(&self) -> Result { let mut data = self.data.lock(); diff --git a/bridges/relays/finality/src/lib.rs b/bridges/relays/finality/src/lib.rs index d5048aa160..64ec5bed05 100644 --- a/bridges/relays/finality/src/lib.rs +++ b/bridges/relays/finality/src/lib.rs @@ -28,7 +28,7 @@ mod finality_loop; mod finality_loop_tests; /// Finality proofs synchronization pipeline. -pub trait FinalitySyncPipeline: Clone + Debug + Send + Sync { +pub trait FinalitySyncPipeline: 'static + Clone + Debug + Send + Sync { /// Name of the finality proofs source. const SOURCE_NAME: &'static str; /// Name of the finality proofs target. diff --git a/bridges/relays/headers/src/sync_loop.rs b/bridges/relays/headers/src/sync_loop.rs index e4f1b7b045..b204932056 100644 --- a/bridges/relays/headers/src/sync_loop.rs +++ b/bridges/relays/headers/src/sync_loop.rs @@ -102,7 +102,7 @@ pub trait TargetClient: RelayClient { /// Synchronization maintain procedure. #[async_trait] -pub trait SyncMaintain: Clone + Send + Sync { +pub trait SyncMaintain: 'static + Clone + Send + Sync { /// Run custom maintain procedures. This is guaranteed to be called when both source and target /// clients are unoccupied. async fn maintain(&self, _sync: &mut HeadersSync

) {} @@ -125,7 +125,7 @@ pub async fn run>( sync_maintain: impl SyncMaintain

, sync_params: HeadersSyncParams, metrics_params: MetricsParams, - exit_signal: impl Future, + exit_signal: impl Future + 'static + Send, ) -> Result<(), String> { let exit_signal = exit_signal.shared(); relay_utils::relay_loop(source_client, target_client) @@ -134,7 +134,7 @@ pub async fn run>( .standalone_metric(|registry, prefix| GlobalMetrics::new(registry, prefix))? .expose() .await? - .run(|source_client, target_client, metrics| { + .run(metrics_prefix::

(), move |source_client, target_client, metrics| { run_until_connection_lost( source_client, source_tick, @@ -159,7 +159,7 @@ async fn run_until_connection_lost>( sync_maintain: impl SyncMaintain

, sync_params: HeadersSyncParams, metrics_sync: Option, - exit_signal: impl Future, + exit_signal: impl Future + Send, ) -> Result<(), FailedClient> { let mut progress_context = (Instant::now(), None, None); diff --git a/bridges/relays/headers/src/sync_types.rs b/bridges/relays/headers/src/sync_types.rs index e6500ad5fa..5809ebab59 100644 --- a/bridges/relays/headers/src/sync_types.rs +++ b/bridges/relays/headers/src/sync_types.rs @@ -43,7 +43,7 @@ pub enum HeaderStatus { } /// Headers synchronization pipeline. -pub trait HeadersSyncPipeline: Clone + Send + Sync { +pub trait HeadersSyncPipeline: 'static + Clone + Send + Sync { /// Name of the headers source. const SOURCE_NAME: &'static str; /// Name of the headers target. diff --git a/bridges/relays/messages/src/message_lane.rs b/bridges/relays/messages/src/message_lane.rs index 5090ef124e..6473ec9875 100644 --- a/bridges/relays/messages/src/message_lane.rs +++ b/bridges/relays/messages/src/message_lane.rs @@ -23,7 +23,7 @@ use relay_utils::{BlockNumberBase, HeaderId}; use std::fmt::Debug; /// One-way message lane. -pub trait MessageLane: Clone + Send + Sync { +pub trait MessageLane: 'static + Clone + Send + Sync { /// Name of the messages source. const SOURCE_NAME: &'static str; /// Name of the messages target. diff --git a/bridges/relays/messages/src/message_lane_loop.rs b/bridges/relays/messages/src/message_lane_loop.rs index 41eee606d8..d43fecf9b1 100644 --- a/bridges/relays/messages/src/message_lane_loop.rs +++ b/bridges/relays/messages/src/message_lane_loop.rs @@ -227,7 +227,7 @@ pub async fn run( source_client: impl SourceClient

, target_client: impl TargetClient

, metrics_params: MetricsParams, - exit_signal: impl Future, + exit_signal: impl Future + Send + 'static, ) -> Result<(), String> { let exit_signal = exit_signal.shared(); relay_utils::relay_loop(source_client, target_client) @@ -237,15 +237,18 @@ pub async fn run( .standalone_metric(|registry, prefix| GlobalMetrics::new(registry, prefix))? .expose() .await? - .run(|source_client, target_client, metrics| { - run_until_connection_lost( - params.clone(), - source_client, - target_client, - metrics, - exit_signal.clone(), - ) - }) + .run( + metrics_prefix::

(¶ms.lane), + move |source_client, target_client, metrics| { + run_until_connection_lost( + params.clone(), + source_client, + target_client, + metrics, + exit_signal.clone(), + ) + }, + ) .await } @@ -701,7 +704,7 @@ pub(crate) mod tests { data: TestClientData, source_tick: Arc, target_tick: Arc, - exit_signal: impl Future, + exit_signal: impl Future + 'static + Send, ) -> TestClientData { async_std::task::block_on(async { let data = Arc::new(Mutex::new(data)); diff --git a/bridges/relays/utils/src/initialize.rs b/bridges/relays/utils/src/initialize.rs index 7d5f66a538..b87937923b 100644 --- a/bridges/relays/utils/src/initialize.rs +++ b/bridges/relays/utils/src/initialize.rs @@ -16,7 +16,11 @@ //! Relayer initialization functions. -use std::{fmt::Display, io::Write}; +use std::{cell::RefCell, fmt::Display, io::Write}; + +async_std::task_local! { + pub(crate) static LOOP_NAME: RefCell = RefCell::new(String::default()); +} /// Initialize relay environment. pub fn initialize_relay() { @@ -43,20 +47,56 @@ pub fn initialize_logger(with_timestamp: bool) { Either::Right(ansi_term::Colour::Fixed(8).bold().paint(timestamp)) }; - writeln!(buf, "{} {} {} {}", timestamp, log_level, log_target, record.args(),) + writeln!( + buf, + "{}{} {} {} {}", + loop_name_prefix(), + timestamp, + log_level, + log_target, + record.args(), + ) }); } else { builder.format(move |buf, record| { let log_level = color_level(record.level()); let log_target = color_target(record.target()); - writeln!(buf, "{} {} {}", log_level, log_target, record.args(),) + writeln!( + buf, + "{}{} {} {}", + loop_name_prefix(), + log_level, + log_target, + record.args(), + ) }); } builder.init(); } +/// Initialize relay loop. Must only be called once per every loop task. +pub(crate) fn initialize_loop(loop_name: String) { + LOOP_NAME.with(|g_loop_name| *g_loop_name.borrow_mut() = loop_name); +} + +/// Returns loop name prefix to use in logs. The prefix is initialized with the `initialize_loop` call. +fn loop_name_prefix() -> String { + // try_with to avoid panic outside of async-std task context + LOOP_NAME + .try_with(|loop_name| { + // using borrow is ok here, because loop is only initialized once (=> borrow_mut will only be called once) + let loop_name = loop_name.borrow(); + if loop_name.is_empty() { + String::new() + } else { + format!("[{}] ", loop_name) + } + }) + .unwrap_or_else(|_| String::new()) +} + enum Either { Left(A), Right(B), diff --git a/bridges/relays/utils/src/relay_loop.rs b/bridges/relays/utils/src/relay_loop.rs index 8790b0913e..ea2e7623ee 100644 --- a/bridges/relays/utils/src/relay_loop.rs +++ b/bridges/relays/utils/src/relay_loop.rs @@ -26,9 +26,9 @@ pub const RECONNECT_DELAY: Duration = Duration::from_secs(10); /// Basic blockchain client from relay perspective. #[async_trait] -pub trait Client: Clone + Send + Sync { +pub trait Client: 'static + Clone + Send + Sync { /// Type of error this clients returns. - type Error: Debug + MaybeConnectionError; + type Error: 'static + Debug + MaybeConnectionError + Send + Sync; /// Try to reconnect to source node. async fn reconnect(&mut self) -> Result<(), Self::Error>; @@ -105,63 +105,65 @@ impl Loop { /// This function represents an outer loop, which in turn calls provided `run_loop` function to do /// actual job. When `run_loop` returns, this outer loop reconnects to failed client (source, /// target or both) and calls `run_loop` again. - pub async fn run(mut self, run_loop: R) -> Result<(), String> + pub async fn run(mut self, loop_name: String, run_loop: R) -> Result<(), String> where - R: Fn(SC, TC, Option) -> F, - F: Future>, - SC: Client, - TC: Client, - LM: Clone, + R: 'static + Send + Fn(SC, TC, Option) -> F, + F: 'static + Send + Future>, + SC: 'static + Client, + TC: 'static + Client, + LM: 'static + Send + Clone, { - loop { - let result = run_loop( - self.source_client.clone(), - self.target_client.clone(), - self.loop_metric.clone(), - ) - .await; + async_std::task::spawn(async move { + crate::initialize::initialize_loop(loop_name); - match result { - Ok(()) => break, - Err(failed_client) => loop { - async_std::task::sleep(self.reconnect_delay).await; - if failed_client == FailedClient::Both || failed_client == FailedClient::Source { - match self.source_client.reconnect().await { - Ok(()) => (), - Err(error) => { - log::warn!( - target: "bridge", - "Failed to reconnect to source client. Going to retry in {}s: {:?}", - self.reconnect_delay.as_secs(), - error, - ); - continue; + loop { + let loop_metric = self.loop_metric.clone(); + let future_result = run_loop(self.source_client.clone(), self.target_client.clone(), loop_metric); + let result = future_result.await; + + match result { + Ok(()) => break, + Err(failed_client) => loop { + async_std::task::sleep(self.reconnect_delay).await; + if failed_client == FailedClient::Both || failed_client == FailedClient::Source { + match self.source_client.reconnect().await { + Ok(()) => (), + Err(error) => { + log::warn!( + target: "bridge", + "Failed to reconnect to source client. Going to retry in {}s: {:?}", + self.reconnect_delay.as_secs(), + error, + ); + continue; + } } } - } - if failed_client == FailedClient::Both || failed_client == FailedClient::Target { - match self.target_client.reconnect().await { - Ok(()) => (), - Err(error) => { - log::warn!( - target: "bridge", - "Failed to reconnect to target client. Going to retry in {}s: {:?}", - self.reconnect_delay.as_secs(), - error, - ); - continue; + if failed_client == FailedClient::Both || failed_client == FailedClient::Target { + match self.target_client.reconnect().await { + Ok(()) => (), + Err(error) => { + log::warn!( + target: "bridge", + "Failed to reconnect to target client. Going to retry in {}s: {:?}", + self.reconnect_delay.as_secs(), + error, + ); + continue; + } } } - } - break; - }, + break; + }, + } + + log::debug!(target: "bridge", "Restarting relay loop"); } - log::debug!(target: "bridge", "Restarting relay loop"); - } - - Ok(()) + Ok(()) + }) + .await } }