reonnect to failed client in on-demand relay background task (#936)

This commit is contained in:
Svyatoslav Nikolsky
2021-04-27 16:33:28 +03:00
committed by Bastian Köcher
parent 38cd3a328d
commit 622a4b5c6f
2 changed files with 92 additions and 56 deletions
@@ -32,7 +32,10 @@ use relay_substrate_client::{
finality_source::FinalitySource as SubstrateFinalitySource, BlockNumberOf, Chain, Client, HashOf, HeaderIdOf,
SyncHeader,
};
use relay_utils::{metrics::MetricsParams, BlockNumberBase, HeaderId};
use relay_utils::{
metrics::MetricsParams, relay_loop::Client as RelayClient, BlockNumberBase, FailedClient, HeaderId,
MaybeConnectionError,
};
use std::fmt::Debug;
/// On-demand Substrate <-> Substrate headers relay.
@@ -132,11 +135,11 @@ async fn background_task<SourceChain, TargetChain, TargetSign>(
FinalityTargetClient<SubstrateFinalityToSubstrate<SourceChain, TargetChain, TargetSign>>,
{
let relay_task_name = on_demand_headers_relay_name::<SourceChain, TargetChain>();
let finality_source = SubstrateFinalitySource::<
let mut finality_source = SubstrateFinalitySource::<
_,
SubstrateFinalityToSubstrate<SourceChain, TargetChain, TargetSign>,
>::new(source_client.clone());
let finality_target = SubstrateFinalityTarget::new(target_client.clone(), pipeline.clone());
let mut finality_target = SubstrateFinalityTarget::new(target_client.clone(), pipeline.clone());
let mut active_headers_relay = None;
let mut required_header_number = Zero::zero();
@@ -171,15 +174,35 @@ async fn background_task<SourceChain, TargetChain, TargetSign>(
// read best finalized source header number from source
let best_finalized_source_header_at_source =
best_finalized_source_header_at_source(&finality_source, &relay_task_name).await;
if matches!(best_finalized_source_header_at_source, Err(ref e) if e.is_connection_error()) {
relay_utils::relay_loop::reconnect_failed_client(
FailedClient::Source,
relay_utils::relay_loop::RECONNECT_DELAY,
&mut finality_source,
&mut finality_target,
)
.await;
continue;
}
// read best finalized source header number from target
let best_finalized_source_header_at_target =
best_finalized_source_header_at_target::<SourceChain, _, _>(&finality_target, &relay_task_name).await;
if matches!(best_finalized_source_header_at_target, Err(ref e) if e.is_connection_error()) {
relay_utils::relay_loop::reconnect_failed_client(
FailedClient::Target,
relay_utils::relay_loop::RECONNECT_DELAY,
&mut finality_source,
&mut finality_target,
)
.await;
continue;
}
// start or stop headers relay if required
let action = select_on_demand_relay_action::<SourceChain>(
best_finalized_source_header_at_source,
best_finalized_source_header_at_target,
best_finalized_source_header_at_source.ok(),
best_finalized_source_header_at_target.ok(),
required_header_number,
maximal_headers_difference,
&relay_task_name,
@@ -213,25 +236,21 @@ async fn background_task<SourceChain, TargetChain, TargetSign>(
async fn best_finalized_source_header_at_source<SourceChain: Chain, P>(
finality_source: &SubstrateFinalitySource<SourceChain, P>,
relay_task_name: &str,
) -> Option<SourceChain::BlockNumber>
) -> Result<SourceChain::BlockNumber, <SubstrateFinalitySource<SourceChain, P> as RelayClient>::Error>
where
SubstrateFinalitySource<SourceChain, P>: FinalitySourceClient<P>,
P: FinalitySyncPipeline<Number = SourceChain::BlockNumber>,
{
finality_source
.best_finalized_block_number()
.await
.map(Some)
.unwrap_or_else(|error| {
log::error!(
target: "bridge",
"Failed to read best finalized source header from source in {} relay: {:?}",
relay_task_name,
error,
);
finality_source.best_finalized_block_number().await.map_err(|error| {
log::error!(
target: "bridge",
"Failed to read best finalized source header from source in {} relay: {:?}",
relay_task_name,
error,
);
None
})
error
})
}
/// Read best finalized source block number from target client.
@@ -240,7 +259,7 @@ where
async fn best_finalized_source_header_at_target<SourceChain: Chain, TargetChain: Chain, P>(
finality_target: &SubstrateFinalityTarget<TargetChain, P>,
relay_task_name: &str,
) -> Option<SourceChain::BlockNumber>
) -> Result<SourceChain::BlockNumber, <SubstrateFinalityTarget<TargetChain, P> as RelayClient>::Error>
where
SubstrateFinalityTarget<TargetChain, P>: FinalityTargetClient<P>,
P: FinalitySyncPipeline<Number = SourceChain::BlockNumber>,
@@ -248,8 +267,7 @@ where
finality_target
.best_finalized_source_block_number()
.await
.map(Some)
.unwrap_or_else(|error| {
.map_err(|error| {
log::error!(
target: "bridge",
"Failed to read best finalized source header from target in {} relay: {:?}",
@@ -257,7 +275,7 @@ where
error,
);
None
error
})
}
+51 -33
View File
@@ -139,39 +139,15 @@ impl<SC, TC, LM> Loop<SC, TC, LM> {
match result {
Ok(()) => break,
Err(failed_client) => loop {
async_std::task::sleep(self.reconnect_delay).await;
if failed_client == FailedClient::Both || failed_client == FailedClient::Source {
match self.source_client.reconnect().await {
Ok(()) => (),
Err(error) => {
log::warn!(
target: "bridge",
"Failed to reconnect to source client. Going to retry in {}s: {:?}",
self.reconnect_delay.as_secs(),
error,
);
continue;
}
}
}
if failed_client == FailedClient::Both || failed_client == FailedClient::Target {
match self.target_client.reconnect().await {
Ok(()) => (),
Err(error) => {
log::warn!(
target: "bridge",
"Failed to reconnect to target client. Going to retry in {}s: {:?}",
self.reconnect_delay.as_secs(),
error,
);
continue;
}
}
}
break;
},
Err(failed_client) => {
reconnect_failed_client(
failed_client,
self.reconnect_delay,
&mut self.source_client,
&mut self.target_client,
)
.await
}
}
log::debug!(target: "bridge", "Restarting relay loop");
@@ -268,6 +244,48 @@ impl<SC, TC, LM> LoopMetrics<SC, TC, LM> {
}
}
/// Deal with the client who has returned connection error.
pub async fn reconnect_failed_client(
failed_client: FailedClient,
reconnect_delay: Duration,
source_client: &mut impl Client,
target_client: &mut impl Client,
) {
loop {
async_std::task::sleep(reconnect_delay).await;
if failed_client == FailedClient::Both || failed_client == FailedClient::Source {
match source_client.reconnect().await {
Ok(()) => (),
Err(error) => {
log::warn!(
target: "bridge",
"Failed to reconnect to source client. Going to retry in {}s: {:?}",
reconnect_delay.as_secs(),
error,
);
continue;
}
}
}
if failed_client == FailedClient::Both || failed_client == FailedClient::Target {
match target_client.reconnect().await {
Ok(()) => (),
Err(error) => {
log::warn!(
target: "bridge",
"Failed to reconnect to target client. Going to retry in {}s: {:?}",
reconnect_delay.as_secs(),
error,
);
continue;
}
}
}
break;
}
}
/// Create new registry with global metrics.
fn create_metrics_registry(prefix: Option<String>) -> Registry {
match prefix {