From 38cd3a328d01e75f9160ed0f0f5c151af88d2979 Mon Sep 17 00:00:00 2001 From: Svyatoslav Nikolsky Date: Mon, 26 Apr 2021 16:05:02 +0300 Subject: [PATCH] Do not spawn additional task for on-demand relays (#933) * do not spawn additional task for on-demand relays * compilation --- .../bin-substrate/src/cli/relay_headers.rs | 1 + .../bin-substrate/src/finality_pipeline.rs | 2 ++ .../bin-substrate/src/on_demand_headers.rs | 23 +++++++++------ bridges/relays/finality/src/finality_loop.rs | 3 ++ .../finality/src/finality_loop_tests.rs | 1 + bridges/relays/utils/src/relay_loop.rs | 28 +++++++++++++++++-- 6 files changed, 46 insertions(+), 12 deletions(-) diff --git a/bridges/relays/bin-substrate/src/cli/relay_headers.rs b/bridges/relays/bin-substrate/src/cli/relay_headers.rs index 346790f2ae..cbcbbf8926 100644 --- a/bridges/relays/bin-substrate/src/cli/relay_headers.rs +++ b/bridges/relays/bin-substrate/src/cli/relay_headers.rs @@ -102,6 +102,7 @@ impl RelayHeaders { Finality::new(target_client.clone(), target_sign), source_client, target_client, + false, metrics_params, ) .await diff --git a/bridges/relays/bin-substrate/src/finality_pipeline.rs b/bridges/relays/bin-substrate/src/finality_pipeline.rs index 9538f35778..dad69b1576 100644 --- a/bridges/relays/bin-substrate/src/finality_pipeline.rs +++ b/bridges/relays/bin-substrate/src/finality_pipeline.rs @@ -112,6 +112,7 @@ pub async fn run( pipeline: P, source_client: Client, target_client: Client, + is_on_demand_task: bool, metrics_params: MetricsParams, ) -> anyhow::Result<()> where @@ -137,6 +138,7 @@ where FinalitySource::new(source_client), SubstrateFinalityTarget::new(target_client, pipeline), FinalitySyncParams { + is_on_demand_task, tick: std::cmp::max(SourceChain::AVERAGE_BLOCK_INTERVAL, TargetChain::AVERAGE_BLOCK_INTERVAL), recent_finality_proofs_limit: RECENT_FINALITY_PROOFS_LIMIT, stall_timeout: STALL_TIMEOUT, diff --git a/bridges/relays/bin-substrate/src/on_demand_headers.rs b/bridges/relays/bin-substrate/src/on_demand_headers.rs index d0aff845de..3b65ac7068 100644 --- a/bridges/relays/bin-substrate/src/on_demand_headers.rs +++ b/bridges/relays/bin-substrate/src/on_demand_headers.rs @@ -300,15 +300,20 @@ fn select_on_demand_relay_action( .checked_sub(&best_finalized_source_header_at_target) .unwrap_or_else(Zero::zero); if current_headers_difference > maximal_headers_difference { - log::trace!( - target: "bridge", - "Too many {} headers missing at target in {} relay. Going to sync up to the {}", - C::NAME, - relay_task_name, - best_finalized_source_header_at_source, - ); - required_source_header_at_target = best_finalized_source_header_at_source; + + // don't log if relay is already running + if !is_active { + log::trace!( + target: "bridge", + "Too many {} headers missing at target in {} relay ({} vs {}). Going to sync up to the {}", + C::NAME, + relay_task_name, + best_finalized_source_header_at_source, + best_finalized_source_header_at_target, + best_finalized_source_header_at_source, + ); + } } // now let's select what to do with relay @@ -345,7 +350,7 @@ where TargetSign: 'static, { let headers_relay_future = - crate::finality_pipeline::run(pipeline, source_client, target_client, MetricsParams::disabled()); + crate::finality_pipeline::run(pipeline, source_client, target_client, true, MetricsParams::disabled()); let closure_task_name = task_name.clone(); async_std::task::Builder::new() .name(task_name.clone()) diff --git a/bridges/relays/finality/src/finality_loop.rs b/bridges/relays/finality/src/finality_loop.rs index cce3283990..3aa55a8ac5 100644 --- a/bridges/relays/finality/src/finality_loop.rs +++ b/bridges/relays/finality/src/finality_loop.rs @@ -39,6 +39,8 @@ use std::{ /// Finality proof synchronization loop parameters. #[derive(Debug, Clone)] pub struct FinalitySyncParams { + /// If `true`, then the separate async task for running finality loop is NOT spawned. + pub is_on_demand_task: bool, /// Interval at which we check updates on both clients. Normally should be larger than /// `min(source_block_time, target_block_time)`. /// @@ -105,6 +107,7 @@ pub async fn run( ) -> Result<(), String> { let exit_signal = exit_signal.shared(); relay_utils::relay_loop(source_client, target_client) + .spawn_loop_task(!sync_params.is_on_demand_task) .with_metrics(Some(metrics_prefix::

()), metrics_params) .loop_metric(|registry, prefix| SyncLoopMetrics::new(registry, prefix))? .standalone_metric(|registry, prefix| GlobalMetrics::new(registry, prefix))? diff --git a/bridges/relays/finality/src/finality_loop_tests.rs b/bridges/relays/finality/src/finality_loop_tests.rs index f7826ead73..645aeb1777 100644 --- a/bridges/relays/finality/src/finality_loop_tests.rs +++ b/bridges/relays/finality/src/finality_loop_tests.rs @@ -197,6 +197,7 @@ fn run_sync_loop(state_function: impl Fn(&mut ClientsData) -> bool + Send + Sync data: clients_data.clone(), }; let sync_params = FinalitySyncParams { + is_on_demand_task: false, tick: Duration::from_secs(0), recent_finality_proofs_limit: 1024, stall_timeout: Duration::from_secs(1), diff --git a/bridges/relays/utils/src/relay_loop.rs b/bridges/relays/utils/src/relay_loop.rs index ea2e7623ee..46cd242ce1 100644 --- a/bridges/relays/utils/src/relay_loop.rs +++ b/bridges/relays/utils/src/relay_loop.rs @@ -38,6 +38,7 @@ pub trait Client: 'static + Clone + Send + Sync { pub fn relay_loop(source_client: SC, target_client: TC) -> Loop { Loop { reconnect_delay: RECONNECT_DELAY, + spawn_loop_task: true, source_client, target_client, loop_metric: None, @@ -49,6 +50,7 @@ pub fn relay_metrics(prefix: Option, params: MetricsParams) -> LoopMetri LoopMetrics { relay_loop: Loop { reconnect_delay: RECONNECT_DELAY, + spawn_loop_task: true, source_client: (), target_client: (), loop_metric: None, @@ -63,6 +65,7 @@ pub fn relay_metrics(prefix: Option, params: MetricsParams) -> LoopMetri /// Generic relay loop. pub struct Loop { reconnect_delay: Duration, + spawn_loop_task: bool, source_client: SC, target_client: TC, loop_metric: Option, @@ -84,11 +87,23 @@ impl Loop { self } + /// Set spawn-dedicated-loop-task flag. + /// + /// If `true` (default), separate async task is spawned to run relay loop. This is the default + /// behavior for all loops. If `false`, then loop is executed as a part of the current + /// task. The `false` is used for on-demand tasks, which are cancelled from time to time + /// and there's already a dedicated on-demand task for running such loops. + pub fn spawn_loop_task(mut self, spawn_loop_task: bool) -> Self { + self.spawn_loop_task = spawn_loop_task; + self + } + /// Start building loop metrics using given prefix. pub fn with_metrics(self, prefix: Option, params: MetricsParams) -> LoopMetrics { LoopMetrics { relay_loop: Loop { reconnect_delay: self.reconnect_delay, + spawn_loop_task: self.spawn_loop_task, source_client: self.source_client, target_client: self.target_client, loop_metric: None, @@ -113,7 +128,8 @@ impl Loop { TC: 'static + Client, LM: 'static + Send + Clone, { - async_std::task::spawn(async move { + let spawn_loop_task = self.spawn_loop_task; + let run_loop_task = async move { crate::initialize::initialize_loop(loop_name); loop { @@ -162,8 +178,13 @@ impl Loop { } Ok(()) - }) - .await + }; + + if spawn_loop_task { + async_std::task::spawn(run_loop_task).await + } else { + run_loop_task.await + } } } @@ -239,6 +260,7 @@ impl LoopMetrics { Ok(Loop { reconnect_delay: self.relay_loop.reconnect_delay, + spawn_loop_task: self.relay_loop.spawn_loop_task, source_client: self.relay_loop.source_client, target_client: self.relay_loop.target_client, loop_metric: self.loop_metric,