Backoff on connection-unrelated errors (#178)

* backoff on connection-unrelated errors

* cargo fmt --all

* Fix some typos

Co-authored-by: Hernando Castano <HCastano@users.noreply.github.com>
This commit is contained in:
Svyatoslav Nikolsky
2020-07-08 21:53:21 +03:00
committed by Bastian Köcher
parent 8fd0a4688f
commit 1cd7be9214
2 changed files with 114 additions and 17 deletions
+1
View File
@@ -10,6 +10,7 @@ ansi_term = "0.12"
async-std = "=1.5.0"
async-stream = "0.2.0"
async-trait = "0.1.36"
backoff = "0.1"
clap = { version = "2.33.1", features = ["yaml"] }
codec = { package = "parity-scale-codec", version = "1.0.0" }
env_logger = "0.7.0"
+113 -17
View File
@@ -20,6 +20,7 @@ use crate::sync_types::{
};
use async_trait::async_trait;
use backoff::{backoff::Backoff, ExponentialBackoff};
use futures::{future::FutureExt, stream::StreamExt};
use num_traits::{Saturating, Zero};
use std::{
@@ -44,6 +45,9 @@ const BACKUP_STALL_SYNC_TIMEOUT: Duration = Duration::from_secs(10 * 60);
/// Delay after connection-related error happened before we'll try
/// reconnection again.
const CONNECTION_ERROR_DELAY: Duration = Duration::from_secs(10);
/// Max delay after connection-unrelated error happened before we'll try the
/// same request again.
const MAX_BACKOFF_INTERVAL: Duration = Duration::from_secs(60);
/// Source client trait.
#[async_trait]
@@ -128,6 +132,7 @@ pub fn run<P: HeadersSyncPipeline, TC: TargetClient<P>>(
let mut stall_countdown = None;
let mut last_update_time = Instant::now();
let mut source_retry_backoff = retry_backoff();
let mut source_client_is_online = false;
let mut source_best_block_number_required = false;
let source_best_block_number_future = source_client.best_block_number().fuse();
@@ -138,6 +143,7 @@ pub fn run<P: HeadersSyncPipeline, TC: TargetClient<P>>(
let source_go_offline_future = futures::future::Fuse::terminated();
let source_tick_stream = interval(source_tick).fuse();
let mut target_retry_backoff = retry_backoff();
let mut target_client_is_online = false;
let mut target_best_block_required = false;
let mut target_incomplete_headers_required = true;
@@ -175,45 +181,50 @@ pub fn run<P: HeadersSyncPipeline, TC: TargetClient<P>>(
source_client_is_online = process_future_result(
source_best_block_number,
&mut source_retry_backoff,
|source_best_block_number| sync.source_best_header_number_response(source_best_block_number),
&mut source_go_offline_future,
|| async_std::task::sleep(CONNECTION_ERROR_DELAY),
|delay| async_std::task::sleep(delay),
|| format!("Error retrieving best header number from {}", P::SOURCE_NAME),
);
},
source_new_header = source_new_header_future => {
source_client_is_online = process_future_result(
source_new_header,
&mut source_retry_backoff,
|source_new_header| sync.headers_mut().header_response(source_new_header),
&mut source_go_offline_future,
|| async_std::task::sleep(CONNECTION_ERROR_DELAY),
|delay| async_std::task::sleep(delay),
|| format!("Error retrieving header from {} node", P::SOURCE_NAME),
);
},
source_orphan_header = source_orphan_header_future => {
source_client_is_online = process_future_result(
source_orphan_header,
&mut source_retry_backoff,
|source_orphan_header| sync.headers_mut().header_response(source_orphan_header),
&mut source_go_offline_future,
|| async_std::task::sleep(CONNECTION_ERROR_DELAY),
|delay| async_std::task::sleep(delay),
|| format!("Error retrieving orphan header from {} node", P::SOURCE_NAME),
);
},
source_extra = source_extra_future => {
source_client_is_online = process_future_result(
source_extra,
&mut source_retry_backoff,
|(header, extra)| sync.headers_mut().extra_response(&header, extra),
&mut source_go_offline_future,
|| async_std::task::sleep(CONNECTION_ERROR_DELAY),
|delay| async_std::task::sleep(delay),
|| format!("Error retrieving extra data from {} node", P::SOURCE_NAME),
);
},
source_completion = source_completion_future => {
source_client_is_online = process_future_result(
source_completion,
&mut source_retry_backoff,
|(header, completion)| sync.headers_mut().completion_response(&header, completion),
&mut source_go_offline_future,
|| async_std::task::sleep(CONNECTION_ERROR_DELAY),
|delay| async_std::task::sleep(delay),
|| format!("Error retrieving completion data from {} node", P::SOURCE_NAME),
);
},
@@ -230,6 +241,7 @@ pub fn run<P: HeadersSyncPipeline, TC: TargetClient<P>>(
target_client_is_online = process_future_result(
target_best_block,
&mut target_retry_backoff,
|target_best_block| {
let head_updated = sync.target_best_header_response(target_best_block);
if head_updated {
@@ -264,7 +276,7 @@ pub fn run<P: HeadersSyncPipeline, TC: TargetClient<P>>(
}
},
&mut target_go_offline_future,
|| async_std::task::sleep(CONNECTION_ERROR_DELAY),
|delay| async_std::task::sleep(delay),
|| format!("Error retrieving best known header from {} node", P::TARGET_NAME),
);
},
@@ -273,20 +285,22 @@ pub fn run<P: HeadersSyncPipeline, TC: TargetClient<P>>(
target_client_is_online = process_future_result(
incomplete_headers_ids,
&mut target_retry_backoff,
|incomplete_headers_ids| sync.headers_mut().incomplete_headers_response(incomplete_headers_ids),
&mut target_go_offline_future,
|| async_std::task::sleep(CONNECTION_ERROR_DELAY),
|delay| async_std::task::sleep(delay),
|| format!("Error retrieving incomplete headers from {} node", P::TARGET_NAME),
);
},
target_existence_status = target_existence_status_future => {
target_client_is_online = process_future_result(
target_existence_status,
&mut target_retry_backoff,
|(target_header, target_existence_status)| sync
.headers_mut()
.maybe_orphan_response(&target_header, target_existence_status),
&mut target_go_offline_future,
|| async_std::task::sleep(CONNECTION_ERROR_DELAY),
|delay| async_std::task::sleep(delay),
|| format!("Error retrieving existence status from {} node", P::TARGET_NAME),
);
},
@@ -297,9 +311,10 @@ pub fn run<P: HeadersSyncPipeline, TC: TargetClient<P>>(
target_client_is_online = process_future_result(
maybe_fatal_error,
&mut target_retry_backoff,
|_| {},
&mut target_go_offline_future,
|| async_std::task::sleep(CONNECTION_ERROR_DELAY),
|delay| async_std::task::sleep(delay),
|| format!("Error submitting headers to {} node", P::TARGET_NAME),
);
@@ -309,20 +324,22 @@ pub fn run<P: HeadersSyncPipeline, TC: TargetClient<P>>(
target_complete_header_result = target_complete_header_future => {
target_client_is_online = process_future_result(
target_complete_header_result,
&mut target_retry_backoff,
|completed_header| sync.headers_mut().header_completed(&completed_header),
&mut target_go_offline_future,
|| async_std::task::sleep(CONNECTION_ERROR_DELAY),
|delay| async_std::task::sleep(delay),
|| format!("Error completing headers at {}", P::TARGET_NAME),
);
},
target_extra_check_result = target_extra_check_future => {
target_client_is_online = process_future_result(
target_extra_check_result,
&mut target_retry_backoff,
|(header, extra_check_result)| sync
.headers_mut()
.maybe_extra_response(&header, extra_check_result),
&mut target_go_offline_future,
|| async_std::task::sleep(CONNECTION_ERROR_DELAY),
|delay| async_std::task::sleep(delay),
|| format!("Error retrieving receipts requirement from {} node", P::TARGET_NAME),
);
},
@@ -522,6 +539,15 @@ fn interval(timeout: Duration) -> impl futures::Stream<Item = ()> {
})
}
/// Exponential backoff for connection-unrelated errors retries.
fn retry_backoff() -> ExponentialBackoff {
let mut backoff = ExponentialBackoff::default();
// we do not want relayer to stop
backoff.max_elapsed_time = None;
backoff.max_interval = MAX_BACKOFF_INTERVAL;
backoff
}
/// Process result of the future from a client.
///
/// Returns whether or not the client we're interacting with is online. In this context
@@ -529,9 +555,10 @@ fn interval(timeout: Duration) -> impl futures::Stream<Item = ()> {
/// that we've previously sent.
fn process_future_result<TResult, TError, TGoOfflineFuture>(
result: Result<TResult, TError>,
retry_backoff: &mut ExponentialBackoff,
on_success: impl FnOnce(TResult),
go_offline_future: &mut std::pin::Pin<&mut futures::future::Fuse<TGoOfflineFuture>>,
go_offline: impl FnOnce() -> TGoOfflineFuture,
go_offline: impl FnOnce(Duration) -> TGoOfflineFuture,
error_pattern: impl FnOnce() -> String,
) -> bool
where
@@ -543,16 +570,26 @@ where
match result {
Ok(result) => {
on_success(result);
retry_backoff.reset();
client_is_online = true
}
Err(error) => {
if error.is_connection_error() {
go_offline_future.set(go_offline().fuse());
let is_connection_error = error.is_connection_error();
let retry_delay = if is_connection_error {
retry_backoff.reset();
CONNECTION_ERROR_DELAY
} else {
client_is_online = true
}
retry_backoff.next_backoff().unwrap_or(CONNECTION_ERROR_DELAY)
};
go_offline_future.set(go_offline(retry_delay).fuse());
log::error!(target: "bridge", "{}: {:?}", error_pattern(), error);
log::error!(
target: "bridge",
"{}: {:?}. Retrying in {}s",
error_pattern(),
error,
retry_delay.as_secs_f64(),
);
}
}
@@ -587,3 +624,62 @@ fn print_sync_progress<P: HeadersSyncPipeline>(
);
(now_time, now_best_header.clone().map(|id| id.0), *now_target_header)
}
#[cfg(test)]
mod tests {
use super::*;
#[derive(Debug)]
struct TestError(bool);
impl MaybeConnectionError for TestError {
fn is_connection_error(&self) -> bool {
self.0
}
}
fn run_backoff_test(result: Result<(), TestError>) -> (Duration, Duration) {
let mut backoff = retry_backoff();
// no randomness in tests (otherwise intervals may overlap => asserts are failing)
backoff.randomization_factor = 0f64;
// increase backoff's current interval
let interval1 = backoff.next_backoff().unwrap();
let interval2 = backoff.next_backoff().unwrap();
assert!(interval2 > interval1);
// successful future result leads to backoff's reset
let go_offline_future = futures::future::Fuse::terminated();
futures::pin_mut!(go_offline_future);
process_future_result(
result,
&mut backoff,
|_| {},
&mut go_offline_future,
|delay| async_std::task::sleep(delay),
|| unreachable!(),
);
(interval2, backoff.next_backoff().unwrap())
}
#[test]
fn process_future_result_resets_backoff_on_success() {
let (interval2, interval_after_reset) = run_backoff_test(Ok(()));
assert!(interval2 > interval_after_reset);
}
#[test]
fn process_future_result_resets_backoff_on_connection_error() {
let (interval2, interval_after_reset) = run_backoff_test(Err(TestError(true)));
assert!(interval2 > interval_after_reset);
}
#[test]
fn process_future_result_does_not_reset_backoff_on_non_connection_error() {
let (interval2, interval_after_reset) = run_backoff_test(Err(TestError(false)));
assert!(interval2 < interval_after_reset);
}
}