Extract common part of relay loops (#660)

* extract common parts of relay loops: begin

* merge client impls

* backoff in exchange loop

* reconnect without clone
This commit is contained in:
Svyatoslav Nikolsky
2021-01-26 17:14:33 +03:00
committed by Bastian Köcher
parent 926520292e
commit 44bf84269a
23 changed files with 1016 additions and 776 deletions
+41 -22
View File
@@ -17,7 +17,9 @@
//! Relaying proofs of exchange transaction.
use async_trait::async_trait;
use relay_utils::{MaybeConnectionError, StringifiedMaybeConnectionError};
use relay_utils::{
relay_loop::Client as RelayClient, FailedClient, MaybeConnectionError, StringifiedMaybeConnectionError,
};
use std::{
fmt::{Debug, Display},
string::ToString,
@@ -84,10 +86,7 @@ pub type HeaderId<P> = relay_utils::HeaderId<BlockHashOf<P>, BlockNumberOf<P>>;
/// Source client API.
#[async_trait]
pub trait SourceClient<P: TransactionProofPipeline> {
/// Error type.
type Error: Debug + MaybeConnectionError;
pub trait SourceClient<P: TransactionProofPipeline>: RelayClient {
/// Sleep until exchange-related data is (probably) updated.
async fn tick(&self);
/// Get block by hash.
@@ -104,10 +103,7 @@ pub trait SourceClient<P: TransactionProofPipeline> {
/// Target client API.
#[async_trait]
pub trait TargetClient<P: TransactionProofPipeline> {
/// Error type.
type Error: Debug + MaybeConnectionError;
pub trait TargetClient<P: TransactionProofPipeline>: RelayClient {
/// Sleep until exchange-related data is (probably) updated.
async fn tick(&self);
/// Returns `Ok(true)` if header is known to the target node.
@@ -146,7 +142,7 @@ pub async fn relay_block_transactions<P: TransactionProofPipeline>(
target_client: &impl TargetClient<P>,
source_block: &P::Block,
mut relayed_transactions: RelayedBlockTransactions,
) -> Result<RelayedBlockTransactions, RelayedBlockTransactions> {
) -> Result<RelayedBlockTransactions, (FailedClient, RelayedBlockTransactions)> {
let transactions_to_process = source_block
.transactions()
.into_iter()
@@ -156,16 +152,21 @@ pub async fn relay_block_transactions<P: TransactionProofPipeline>(
let result = async {
let source_tx_id = format!("{}/{}", source_block.id().1, source_tx_index);
let source_tx_proof =
prepare_transaction_proof(source_client, &source_tx_id, source_block, source_tx_index).await?;
prepare_transaction_proof(source_client, &source_tx_id, source_block, source_tx_index)
.await
.map_err(|e| (FailedClient::Source, e))?;
let needs_to_be_relayed =
target_client
.filter_transaction_proof(&source_tx_proof)
.await
.map_err(|err| {
StringifiedMaybeConnectionError::new(
err.is_connection_error(),
format!("Transaction filtering has failed with {:?}", err),
(
FailedClient::Target,
StringifiedMaybeConnectionError::new(
err.is_connection_error(),
format!("Transaction filtering has failed with {:?}", err),
),
)
})?;
@@ -176,6 +177,7 @@ pub async fn relay_block_transactions<P: TransactionProofPipeline>(
relay_ready_transaction_proof(target_client, &source_tx_id, source_tx_proof)
.await
.map(|_| true)
.map_err(|e| (FailedClient::Target, e))
}
.await;
@@ -205,7 +207,7 @@ pub async fn relay_block_transactions<P: TransactionProofPipeline>(
relayed_transactions.processed += 1;
relayed_transactions.relayed += 1;
}
Err(err) => {
Err((failed_client, err)) => {
log::error!(
target: "bridge",
"Error relaying {} transaction {} proof to {} node: {}. {}",
@@ -221,7 +223,7 @@ pub async fn relay_block_transactions<P: TransactionProofPipeline>(
);
if err.is_connection_error() {
return Err(relayed_transactions);
return Err((failed_client, relayed_transactions));
}
relayed_transactions.processed += 1;
@@ -529,8 +531,9 @@ pub(crate) mod tests {
#[derive(Debug, Clone, PartialEq)]
pub struct TestTransactionProof(pub TestTransactionHash);
#[derive(Clone)]
pub struct TestTransactionsSource {
pub on_tick: Box<dyn Fn(&mut TestTransactionsSourceData) + Send + Sync>,
pub on_tick: Arc<dyn Fn(&mut TestTransactionsSourceData) + Send + Sync>,
pub data: Arc<Mutex<TestTransactionsSourceData>>,
}
@@ -543,7 +546,7 @@ pub(crate) mod tests {
impl TestTransactionsSource {
pub fn new(on_tick: Box<dyn Fn(&mut TestTransactionsSourceData) + Send + Sync>) -> Self {
Self {
on_tick,
on_tick: Arc::new(on_tick),
data: Arc::new(Mutex::new(TestTransactionsSourceData {
block: Ok(test_block()),
transaction_block: Ok(Some((test_block_id(), 0))),
@@ -554,9 +557,16 @@ pub(crate) mod tests {
}
#[async_trait]
impl SourceClient<TestTransactionProofPipeline> for TestTransactionsSource {
impl RelayClient for TestTransactionsSource {
type Error = TestError;
async fn reconnect(&mut self) -> Result<(), TestError> {
Ok(())
}
}
#[async_trait]
impl SourceClient<TestTransactionProofPipeline> for TestTransactionsSource {
async fn tick(&self) {
(self.on_tick)(&mut *self.data.lock())
}
@@ -584,8 +594,9 @@ pub(crate) mod tests {
}
}
#[derive(Clone)]
pub struct TestTransactionsTarget {
pub on_tick: Box<dyn Fn(&mut TestTransactionsTargetData) + Send + Sync>,
pub on_tick: Arc<dyn Fn(&mut TestTransactionsTargetData) + Send + Sync>,
pub data: Arc<Mutex<TestTransactionsTargetData>>,
}
@@ -600,7 +611,7 @@ pub(crate) mod tests {
impl TestTransactionsTarget {
pub fn new(on_tick: Box<dyn Fn(&mut TestTransactionsTargetData) + Send + Sync>) -> Self {
Self {
on_tick,
on_tick: Arc::new(on_tick),
data: Arc::new(Mutex::new(TestTransactionsTargetData {
is_header_known: Ok(true),
is_header_finalized: Ok(true),
@@ -613,9 +624,16 @@ pub(crate) mod tests {
}
#[async_trait]
impl TargetClient<TestTransactionProofPipeline> for TestTransactionsTarget {
impl RelayClient for TestTransactionsTarget {
type Error = TestError;
async fn reconnect(&mut self) -> Result<(), TestError> {
Ok(())
}
}
#[async_trait]
impl TargetClient<TestTransactionProofPipeline> for TestTransactionsTarget {
async fn tick(&self) {
(self.on_tick)(&mut *self.data.lock())
}
@@ -784,6 +802,7 @@ pub(crate) mod tests {
),
pre_relayed,
))
.map_err(|(_, transactions)| transactions)
}
#[test]
@@ -27,13 +27,9 @@ use futures::{future::FutureExt, select};
use num_traits::One;
use relay_utils::{
metrics::{start as metrics_start, GlobalMetrics, MetricsParams},
retry_backoff,
retry_backoff, FailedClient, MaybeConnectionError,
};
use std::{future::Future, time::Duration};
/// Delay after connection-related error happened before we'll try
/// reconnection again.
const CONNECTION_ERROR_DELAY: Duration = Duration::from_secs(10);
use std::future::Future;
/// Transactions proofs relay state.
#[derive(Debug)]
@@ -43,7 +39,7 @@ pub struct TransactionProofsRelayState<BlockNumber> {
}
/// Transactions proofs relay storage.
pub trait TransactionProofsRelayStorage {
pub trait TransactionProofsRelayStorage: Clone {
/// Associated block number.
type BlockNumber;
@@ -54,7 +50,7 @@ pub trait TransactionProofsRelayStorage {
}
/// In-memory storage for auto-relay loop.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct InMemoryStorage<BlockNumber> {
best_processed_header_number: BlockNumber,
}
@@ -84,68 +80,101 @@ impl<BlockNumber: Clone + Copy> TransactionProofsRelayStorage for InMemoryStorag
/// Run proofs synchronization.
pub fn run<P: TransactionProofPipeline>(
mut storage: impl TransactionProofsRelayStorage<BlockNumber = BlockNumberOf<P>>,
storage: impl TransactionProofsRelayStorage<BlockNumber = BlockNumberOf<P>>,
source_client: impl SourceClient<P>,
target_client: impl TargetClient<P>,
metrics_params: Option<MetricsParams>,
exit_signal: impl Future<Output = ()>,
) {
let mut local_pool = futures::executor::LocalPool::new();
let exit_signal = exit_signal.shared();
let metrics_global = GlobalMetrics::default();
let metrics_exch = ExchangeLoopMetrics::default();
let metrics_enabled = metrics_params.is_some();
metrics_start(
format!("{}_to_{}_Exchange", P::SOURCE_NAME, P::TARGET_NAME),
metrics_params,
&metrics_global,
&metrics_exch,
);
local_pool.run_until(async move {
let mut retry_backoff = retry_backoff();
let mut state = storage.state();
let mut current_finalized_block = None;
let mut metrics_global = GlobalMetrics::default();
let mut metrics_exch = ExchangeLoopMetrics::default();
let metrics_enabled = metrics_params.is_some();
metrics_start(
format!("{}_to_{}_Exchange", P::SOURCE_NAME, P::TARGET_NAME),
metrics_params,
&metrics_global,
&metrics_exch,
);
let exit_signal = exit_signal.fuse();
futures::pin_mut!(exit_signal);
loop {
let iteration_result = run_loop_iteration(
&mut storage,
&source_client,
&target_client,
&mut state,
&mut current_finalized_block,
if metrics_enabled { Some(&mut metrics_exch) } else { None },
relay_utils::relay_loop::run(
relay_utils::relay_loop::RECONNECT_DELAY,
source_client,
target_client,
|source_client, target_client| {
run_until_connection_lost(
storage.clone(),
source_client,
target_client,
if metrics_enabled {
Some(metrics_global.clone())
} else {
None
},
if metrics_enabled {
Some(metrics_exch.clone())
} else {
None
},
exit_signal.clone(),
)
.await;
},
);
}
if metrics_enabled {
metrics_global.update();
/// Run proofs synchronization.
async fn run_until_connection_lost<P: TransactionProofPipeline>(
mut storage: impl TransactionProofsRelayStorage<BlockNumber = BlockNumberOf<P>>,
source_client: impl SourceClient<P>,
target_client: impl TargetClient<P>,
metrics_global: Option<GlobalMetrics>,
metrics_exch: Option<ExchangeLoopMetrics>,
exit_signal: impl Future<Output = ()>,
) -> Result<(), FailedClient> {
let mut retry_backoff = retry_backoff();
let mut state = storage.state();
let mut current_finalized_block = None;
let exit_signal = exit_signal.fuse();
futures::pin_mut!(exit_signal);
loop {
let iteration_result = run_loop_iteration(
&mut storage,
&source_client,
&target_client,
&mut state,
&mut current_finalized_block,
metrics_exch.as_ref(),
)
.await;
if let Some(ref metrics_global) = metrics_global {
metrics_global.update().await;
}
if let Err((is_connection_error, failed_client)) = iteration_result {
if is_connection_error {
return Err(failed_client);
}
match iteration_result {
Ok(_) => {
retry_backoff.reset();
let retry_timeout = retry_backoff
.next_backoff()
.unwrap_or(relay_utils::relay_loop::RECONNECT_DELAY);
select! {
_ = async_std::task::sleep(retry_timeout).fuse() => {},
_ = exit_signal => return Ok(()),
}
} else {
retry_backoff.reset();
select! {
_ = source_client.tick().fuse() => {},
_ = exit_signal => return,
}
}
Err(_) => {
let retry_timeout = retry_backoff.next_backoff().unwrap_or(CONNECTION_ERROR_DELAY);
select! {
_ = async_std::task::sleep(retry_timeout).fuse() => {},
_ = exit_signal => return,
}
}
select! {
_ = source_client.tick().fuse() => {},
_ = exit_signal => return Ok(()),
}
}
});
}
}
/// Run exchange loop until we need to break.
@@ -155,8 +184,8 @@ async fn run_loop_iteration<P: TransactionProofPipeline>(
target_client: &impl TargetClient<P>,
state: &mut TransactionProofsRelayState<BlockNumberOf<P>>,
current_finalized_block: &mut Option<(P::Block, RelayedBlockTransactions)>,
mut exchange_loop_metrics: Option<&mut ExchangeLoopMetrics>,
) -> Result<(), ()> {
exchange_loop_metrics: Option<&ExchangeLoopMetrics>,
) -> Result<(), (bool, FailedClient)> {
let best_finalized_header_id = match target_client.best_finalized_header_id().await {
Ok(best_finalized_header_id) => {
log::debug!(
@@ -178,7 +207,7 @@ async fn run_loop_iteration<P: TransactionProofPipeline>(
err,
);
return Err(());
return Err((err.is_connection_error(), FailedClient::Target));
}
};
@@ -202,7 +231,7 @@ async fn run_loop_iteration<P: TransactionProofPipeline>(
state.best_processed_header_number = state.best_processed_header_number + One::one();
storage.set_state(state);
if let Some(exchange_loop_metrics) = exchange_loop_metrics.as_mut() {
if let Some(ref exchange_loop_metrics) = exchange_loop_metrics {
exchange_loop_metrics.update::<P>(
state.best_processed_header_number,
best_finalized_header_id.0,
@@ -212,9 +241,9 @@ async fn run_loop_iteration<P: TransactionProofPipeline>(
// we have just updated state => proceed to next block retrieval
}
Err(relayed_transactions) => {
Err((failed_client, relayed_transactions)) => {
*current_finalized_block = Some((block, relayed_transactions));
return Err(());
return Err((true, failed_client));
}
}
}
@@ -240,7 +269,7 @@ async fn run_loop_iteration<P: TransactionProofPipeline>(
err,
);
return Err(());
return Err((err.is_connection_error(), FailedClient::Source));
}
}
}
@@ -20,6 +20,7 @@ use crate::exchange::{BlockNumberOf, RelayedBlockTransactions, TransactionProofP
use relay_utils::metrics::{register, Counter, CounterVec, GaugeVec, Metrics, Opts, Registry, U64};
/// Exchange transactions relay metrics.
#[derive(Clone)]
pub struct ExchangeLoopMetrics {
/// Best finalized block numbers - "processed" and "known".
best_block_numbers: GaugeVec<U64>,
@@ -60,7 +61,7 @@ impl Default for ExchangeLoopMetrics {
impl ExchangeLoopMetrics {
/// Update metrics when single block is relayed.
pub fn update<P: TransactionProofPipeline>(
&mut self,
&self,
best_processed_block_number: BlockNumberOf<P>,
best_known_block_number: BlockNumberOf<P>,
relayed_transactions: RelayedBlockTransactions,