diff --git a/bridges/relays/ethereum/Cargo.toml b/bridges/relays/ethereum/Cargo.toml index 742c1e3bc8..6619a3ca7e 100644 --- a/bridges/relays/ethereum/Cargo.toml +++ b/bridges/relays/ethereum/Cargo.toml @@ -10,6 +10,7 @@ ansi_term = "0.12" async-std = "=1.5.0" async-stream = "0.2.0" async-trait = "0.1.36" +backoff = "0.1" clap = { version = "2.33.1", features = ["yaml"] } codec = { package = "parity-scale-codec", version = "1.0.0" } env_logger = "0.7.0" diff --git a/bridges/relays/ethereum/src/sync_loop.rs b/bridges/relays/ethereum/src/sync_loop.rs index e13b52469d..e1437ccdc5 100644 --- a/bridges/relays/ethereum/src/sync_loop.rs +++ b/bridges/relays/ethereum/src/sync_loop.rs @@ -20,6 +20,7 @@ use crate::sync_types::{ }; use async_trait::async_trait; +use backoff::{backoff::Backoff, ExponentialBackoff}; use futures::{future::FutureExt, stream::StreamExt}; use num_traits::{Saturating, Zero}; use std::{ @@ -44,6 +45,9 @@ const BACKUP_STALL_SYNC_TIMEOUT: Duration = Duration::from_secs(10 * 60); /// Delay after connection-related error happened before we'll try /// reconnection again. const CONNECTION_ERROR_DELAY: Duration = Duration::from_secs(10); +/// Max delay after connection-unrelated error happened before we'll try the +/// same request again. +const MAX_BACKOFF_INTERVAL: Duration = Duration::from_secs(60); /// Source client trait. #[async_trait] @@ -128,6 +132,7 @@ pub fn run>( let mut stall_countdown = None; let mut last_update_time = Instant::now(); + let mut source_retry_backoff = retry_backoff(); let mut source_client_is_online = false; let mut source_best_block_number_required = false; let source_best_block_number_future = source_client.best_block_number().fuse(); @@ -138,6 +143,7 @@ pub fn run>( let source_go_offline_future = futures::future::Fuse::terminated(); let source_tick_stream = interval(source_tick).fuse(); + let mut target_retry_backoff = retry_backoff(); let mut target_client_is_online = false; let mut target_best_block_required = false; let mut target_incomplete_headers_required = true; @@ -175,45 +181,50 @@ pub fn run>( source_client_is_online = process_future_result( source_best_block_number, + &mut source_retry_backoff, |source_best_block_number| sync.source_best_header_number_response(source_best_block_number), &mut source_go_offline_future, - || async_std::task::sleep(CONNECTION_ERROR_DELAY), + |delay| async_std::task::sleep(delay), || format!("Error retrieving best header number from {}", P::SOURCE_NAME), ); }, source_new_header = source_new_header_future => { source_client_is_online = process_future_result( source_new_header, + &mut source_retry_backoff, |source_new_header| sync.headers_mut().header_response(source_new_header), &mut source_go_offline_future, - || async_std::task::sleep(CONNECTION_ERROR_DELAY), + |delay| async_std::task::sleep(delay), || format!("Error retrieving header from {} node", P::SOURCE_NAME), ); }, source_orphan_header = source_orphan_header_future => { source_client_is_online = process_future_result( source_orphan_header, + &mut source_retry_backoff, |source_orphan_header| sync.headers_mut().header_response(source_orphan_header), &mut source_go_offline_future, - || async_std::task::sleep(CONNECTION_ERROR_DELAY), + |delay| async_std::task::sleep(delay), || format!("Error retrieving orphan header from {} node", P::SOURCE_NAME), ); }, source_extra = source_extra_future => { source_client_is_online = process_future_result( source_extra, + &mut source_retry_backoff, |(header, extra)| sync.headers_mut().extra_response(&header, extra), &mut source_go_offline_future, - || async_std::task::sleep(CONNECTION_ERROR_DELAY), + |delay| async_std::task::sleep(delay), || format!("Error retrieving extra data from {} node", P::SOURCE_NAME), ); }, source_completion = source_completion_future => { source_client_is_online = process_future_result( source_completion, + &mut source_retry_backoff, |(header, completion)| sync.headers_mut().completion_response(&header, completion), &mut source_go_offline_future, - || async_std::task::sleep(CONNECTION_ERROR_DELAY), + |delay| async_std::task::sleep(delay), || format!("Error retrieving completion data from {} node", P::SOURCE_NAME), ); }, @@ -230,6 +241,7 @@ pub fn run>( target_client_is_online = process_future_result( target_best_block, + &mut target_retry_backoff, |target_best_block| { let head_updated = sync.target_best_header_response(target_best_block); if head_updated { @@ -264,7 +276,7 @@ pub fn run>( } }, &mut target_go_offline_future, - || async_std::task::sleep(CONNECTION_ERROR_DELAY), + |delay| async_std::task::sleep(delay), || format!("Error retrieving best known header from {} node", P::TARGET_NAME), ); }, @@ -273,20 +285,22 @@ pub fn run>( target_client_is_online = process_future_result( incomplete_headers_ids, + &mut target_retry_backoff, |incomplete_headers_ids| sync.headers_mut().incomplete_headers_response(incomplete_headers_ids), &mut target_go_offline_future, - || async_std::task::sleep(CONNECTION_ERROR_DELAY), + |delay| async_std::task::sleep(delay), || format!("Error retrieving incomplete headers from {} node", P::TARGET_NAME), ); }, target_existence_status = target_existence_status_future => { target_client_is_online = process_future_result( target_existence_status, + &mut target_retry_backoff, |(target_header, target_existence_status)| sync .headers_mut() .maybe_orphan_response(&target_header, target_existence_status), &mut target_go_offline_future, - || async_std::task::sleep(CONNECTION_ERROR_DELAY), + |delay| async_std::task::sleep(delay), || format!("Error retrieving existence status from {} node", P::TARGET_NAME), ); }, @@ -297,9 +311,10 @@ pub fn run>( target_client_is_online = process_future_result( maybe_fatal_error, + &mut target_retry_backoff, |_| {}, &mut target_go_offline_future, - || async_std::task::sleep(CONNECTION_ERROR_DELAY), + |delay| async_std::task::sleep(delay), || format!("Error submitting headers to {} node", P::TARGET_NAME), ); @@ -309,20 +324,22 @@ pub fn run>( target_complete_header_result = target_complete_header_future => { target_client_is_online = process_future_result( target_complete_header_result, + &mut target_retry_backoff, |completed_header| sync.headers_mut().header_completed(&completed_header), &mut target_go_offline_future, - || async_std::task::sleep(CONNECTION_ERROR_DELAY), + |delay| async_std::task::sleep(delay), || format!("Error completing headers at {}", P::TARGET_NAME), ); }, target_extra_check_result = target_extra_check_future => { target_client_is_online = process_future_result( target_extra_check_result, + &mut target_retry_backoff, |(header, extra_check_result)| sync .headers_mut() .maybe_extra_response(&header, extra_check_result), &mut target_go_offline_future, - || async_std::task::sleep(CONNECTION_ERROR_DELAY), + |delay| async_std::task::sleep(delay), || format!("Error retrieving receipts requirement from {} node", P::TARGET_NAME), ); }, @@ -522,6 +539,15 @@ fn interval(timeout: Duration) -> impl futures::Stream { }) } +/// Exponential backoff for connection-unrelated errors retries. +fn retry_backoff() -> ExponentialBackoff { + let mut backoff = ExponentialBackoff::default(); + // we do not want relayer to stop + backoff.max_elapsed_time = None; + backoff.max_interval = MAX_BACKOFF_INTERVAL; + backoff +} + /// Process result of the future from a client. /// /// Returns whether or not the client we're interacting with is online. In this context @@ -529,9 +555,10 @@ fn interval(timeout: Duration) -> impl futures::Stream { /// that we've previously sent. fn process_future_result( result: Result, + retry_backoff: &mut ExponentialBackoff, on_success: impl FnOnce(TResult), go_offline_future: &mut std::pin::Pin<&mut futures::future::Fuse>, - go_offline: impl FnOnce() -> TGoOfflineFuture, + go_offline: impl FnOnce(Duration) -> TGoOfflineFuture, error_pattern: impl FnOnce() -> String, ) -> bool where @@ -543,16 +570,26 @@ where match result { Ok(result) => { on_success(result); + retry_backoff.reset(); client_is_online = true } Err(error) => { - if error.is_connection_error() { - go_offline_future.set(go_offline().fuse()); + let is_connection_error = error.is_connection_error(); + let retry_delay = if is_connection_error { + retry_backoff.reset(); + CONNECTION_ERROR_DELAY } else { - client_is_online = true - } + retry_backoff.next_backoff().unwrap_or(CONNECTION_ERROR_DELAY) + }; + go_offline_future.set(go_offline(retry_delay).fuse()); - log::error!(target: "bridge", "{}: {:?}", error_pattern(), error); + log::error!( + target: "bridge", + "{}: {:?}. Retrying in {}s", + error_pattern(), + error, + retry_delay.as_secs_f64(), + ); } } @@ -587,3 +624,62 @@ fn print_sync_progress( ); (now_time, now_best_header.clone().map(|id| id.0), *now_target_header) } + +#[cfg(test)] +mod tests { + use super::*; + + #[derive(Debug)] + struct TestError(bool); + + impl MaybeConnectionError for TestError { + fn is_connection_error(&self) -> bool { + self.0 + } + } + + fn run_backoff_test(result: Result<(), TestError>) -> (Duration, Duration) { + let mut backoff = retry_backoff(); + + // no randomness in tests (otherwise intervals may overlap => asserts are failing) + backoff.randomization_factor = 0f64; + + // increase backoff's current interval + let interval1 = backoff.next_backoff().unwrap(); + let interval2 = backoff.next_backoff().unwrap(); + assert!(interval2 > interval1); + + // successful future result leads to backoff's reset + let go_offline_future = futures::future::Fuse::terminated(); + futures::pin_mut!(go_offline_future); + + process_future_result( + result, + &mut backoff, + |_| {}, + &mut go_offline_future, + |delay| async_std::task::sleep(delay), + || unreachable!(), + ); + + (interval2, backoff.next_backoff().unwrap()) + } + + #[test] + fn process_future_result_resets_backoff_on_success() { + let (interval2, interval_after_reset) = run_backoff_test(Ok(())); + assert!(interval2 > interval_after_reset); + } + + #[test] + fn process_future_result_resets_backoff_on_connection_error() { + let (interval2, interval_after_reset) = run_backoff_test(Err(TestError(true))); + assert!(interval2 > interval_after_reset); + } + + #[test] + fn process_future_result_does_not_reset_backoff_on_non_connection_error() { + let (interval2, interval_after_reset) = run_backoff_test(Err(TestError(false))); + assert!(interval2 < interval_after_reset); + } +}