mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-05-31 04:01:02 +00:00
Basic tests for sync loop (#182)
* basic sync loop tests * cargo ftm --all * SyncLoopTestParams * move sync loop tests to sync_loop_tests.rs * cargo fmt --all
This commit is contained in:
committed by
Bastian Köcher
parent
ae8c82f0e7
commit
83a3fca5cf
@@ -25,6 +25,7 @@ use futures::{future::FutureExt, stream::StreamExt};
|
||||
use num_traits::{Saturating, Zero};
|
||||
use std::{
|
||||
collections::HashSet,
|
||||
future::Future,
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
|
||||
@@ -123,6 +124,7 @@ pub fn run<P: HeadersSyncPipeline, TC: TargetClient<P>>(
|
||||
target_client: TC,
|
||||
target_tick: Duration,
|
||||
sync_params: HeadersSyncParams,
|
||||
exit_signal: impl Future<Output = ()>,
|
||||
) {
|
||||
let mut local_pool = futures::executor::LocalPool::new();
|
||||
let mut progress_context = (Instant::now(), None, None);
|
||||
@@ -156,6 +158,8 @@ pub fn run<P: HeadersSyncPipeline, TC: TargetClient<P>>(
|
||||
let target_go_offline_future = futures::future::Fuse::terminated();
|
||||
let target_tick_stream = interval(target_tick).fuse();
|
||||
|
||||
let exit_signal = exit_signal.fuse();
|
||||
|
||||
futures::pin_mut!(
|
||||
source_best_block_number_future,
|
||||
source_new_header_future,
|
||||
@@ -171,7 +175,8 @@ pub fn run<P: HeadersSyncPipeline, TC: TargetClient<P>>(
|
||||
target_submit_header_future,
|
||||
target_complete_header_future,
|
||||
target_go_offline_future,
|
||||
target_tick_stream
|
||||
target_tick_stream,
|
||||
exit_signal
|
||||
);
|
||||
|
||||
loop {
|
||||
@@ -350,6 +355,10 @@ pub fn run<P: HeadersSyncPipeline, TC: TargetClient<P>>(
|
||||
target_best_block_required = true;
|
||||
target_incomplete_headers_required = true;
|
||||
},
|
||||
|
||||
_ = exit_signal => {
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// print progress
|
||||
@@ -491,7 +500,7 @@ pub fn run<P: HeadersSyncPipeline, TC: TargetClient<P>>(
|
||||
id,
|
||||
);
|
||||
source_extra_future.set(source_client.header_extra(id, header.clone()).fuse());
|
||||
} else if let Some(header) = sync.headers().header(HeaderStatus::Orphan) {
|
||||
} else if let Some(header) = sync.select_orphan_header_to_download() {
|
||||
// for Orphan we actually ask for parent' header
|
||||
let parent_id = header.parent_id();
|
||||
|
||||
@@ -540,7 +549,7 @@ fn interval(timeout: Duration) -> impl futures::Stream<Item = ()> {
|
||||
}
|
||||
|
||||
/// Exponential backoff for connection-unrelated errors retries.
|
||||
fn retry_backoff() -> ExponentialBackoff {
|
||||
pub(crate) fn retry_backoff() -> ExponentialBackoff {
|
||||
let mut backoff = ExponentialBackoff::default();
|
||||
// we do not want relayer to stop
|
||||
backoff.max_elapsed_time = None;
|
||||
@@ -553,7 +562,7 @@ fn retry_backoff() -> ExponentialBackoff {
|
||||
/// Returns whether or not the client we're interacting with is online. In this context
|
||||
/// what online means is that the client is currently not handling any other requests
|
||||
/// that we've previously sent.
|
||||
fn process_future_result<TResult, TError, TGoOfflineFuture>(
|
||||
pub(crate) fn process_future_result<TResult, TError, TGoOfflineFuture>(
|
||||
result: Result<TResult, TError>,
|
||||
retry_backoff: &mut ExponentialBackoff,
|
||||
on_success: impl FnOnce(TResult),
|
||||
@@ -624,62 +633,3 @@ fn print_sync_progress<P: HeadersSyncPipeline>(
|
||||
);
|
||||
(now_time, now_best_header.clone().map(|id| id.0), *now_target_header)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[derive(Debug)]
|
||||
struct TestError(bool);
|
||||
|
||||
impl MaybeConnectionError for TestError {
|
||||
fn is_connection_error(&self) -> bool {
|
||||
self.0
|
||||
}
|
||||
}
|
||||
|
||||
fn run_backoff_test(result: Result<(), TestError>) -> (Duration, Duration) {
|
||||
let mut backoff = retry_backoff();
|
||||
|
||||
// no randomness in tests (otherwise intervals may overlap => asserts are failing)
|
||||
backoff.randomization_factor = 0f64;
|
||||
|
||||
// increase backoff's current interval
|
||||
let interval1 = backoff.next_backoff().unwrap();
|
||||
let interval2 = backoff.next_backoff().unwrap();
|
||||
assert!(interval2 > interval1);
|
||||
|
||||
// successful future result leads to backoff's reset
|
||||
let go_offline_future = futures::future::Fuse::terminated();
|
||||
futures::pin_mut!(go_offline_future);
|
||||
|
||||
process_future_result(
|
||||
result,
|
||||
&mut backoff,
|
||||
|_| {},
|
||||
&mut go_offline_future,
|
||||
|delay| async_std::task::sleep(delay),
|
||||
|| "Test error".into(),
|
||||
);
|
||||
|
||||
(interval2, backoff.next_backoff().unwrap())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn process_future_result_resets_backoff_on_success() {
|
||||
let (interval2, interval_after_reset) = run_backoff_test(Ok(()));
|
||||
assert!(interval2 > interval_after_reset);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn process_future_result_resets_backoff_on_connection_error() {
|
||||
let (interval2, interval_after_reset) = run_backoff_test(Err(TestError(true)));
|
||||
assert!(interval2 > interval_after_reset);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn process_future_result_does_not_reset_backoff_on_non_connection_error() {
|
||||
let (interval2, interval_after_reset) = run_backoff_test(Err(TestError(false)));
|
||||
assert!(interval2 < interval_after_reset);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user