Equivocation detection loop: Reorganize block checking logic as state machine (#2555) (#2557)

* Split `reconnect_failed_client()` logic

* Reorganize block checking logic as state machine

This way we'll be able to save the state in case of a failure
This commit is contained in:
Serban Iorga
2023-09-12 16:38:26 +03:00
committed by Bastian Köcher
parent 5e5543a35a
commit c102e812eb
4 changed files with 340 additions and 217 deletions
@@ -15,55 +15,17 @@
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
use crate::{
reporter::EquivocationsReporter, EquivocationDetectionPipeline, HeaderFinalityInfo,
handle_client_error, reporter::EquivocationsReporter, EquivocationDetectionPipeline,
SourceClient, TargetClient,
};
use bp_header_chain::{FinalityProof, FindEquivocations};
use crate::block_checker::BlockChecker;
use finality_relay::{FinalityProofsBuf, FinalityProofsStream};
use futures::{select, FutureExt};
use num_traits::Saturating;
use relay_utils::{
metrics::MetricsParams,
relay_loop::{reconnect_failed_client, RECONNECT_DELAY},
FailedClient, MaybeConnectionError,
};
use relay_utils::{metrics::MetricsParams, FailedClient};
use std::{future::Future, time::Duration};
/// The context needed for finding equivocations inside finality proofs and reporting them.
struct EquivocationReportingContext<P: EquivocationDetectionPipeline> {
synced_header_hash: P::Hash,
synced_verification_context: P::FinalityVerificationContext,
}
impl<P: EquivocationDetectionPipeline> EquivocationReportingContext<P> {
/// Try to get the `EquivocationReportingContext` used by the target chain
/// at the provided block.
async fn try_read_from_target<TC: TargetClient<P>>(
target_client: &TC,
at: P::TargetNumber,
) -> Result<Option<Self>, TC::Error> {
let maybe_best_synced_header_hash = target_client.best_synced_header_hash(at).await?;
Ok(match maybe_best_synced_header_hash {
Some(best_synced_header_hash) => Some(EquivocationReportingContext {
synced_header_hash: best_synced_header_hash,
synced_verification_context: target_client
.finality_verification_context(at)
.await?,
}),
None => None,
})
}
/// Update with the new context introduced by the `HeaderFinalityInfo<P>` if any.
fn update(&mut self, info: HeaderFinalityInfo<P>) {
if let Some(new_verification_context) = info.new_verification_context {
self.synced_header_hash = info.finality_proof.target_header_hash();
self.synced_verification_context = new_verification_context;
}
}
}
/// Equivocations detection loop state.
struct EquivocationDetectionLoop<
P: EquivocationDetectionPipeline,
@@ -85,34 +47,6 @@ struct EquivocationDetectionLoop<
impl<P: EquivocationDetectionPipeline, SC: SourceClient<P>, TC: TargetClient<P>>
EquivocationDetectionLoop<P, SC, TC>
{
async fn handle_source_error(&mut self, e: SC::Error) {
if e.is_connection_error() {
reconnect_failed_client(
FailedClient::Source,
RECONNECT_DELAY,
&mut self.source_client,
&mut self.target_client,
)
.await;
} else {
async_std::task::sleep(RECONNECT_DELAY).await;
}
}
async fn handle_target_error(&mut self, e: TC::Error) {
if e.is_connection_error() {
reconnect_failed_client(
FailedClient::Target,
RECONNECT_DELAY,
&mut self.source_client,
&mut self.target_client,
)
.await;
} else {
async_std::task::sleep(RECONNECT_DELAY).await;
}
}
async fn ensure_finality_proofs_stream(&mut self) {
match self.finality_proofs_stream.ensure_stream(&self.source_client).await {
Ok(_) => {},
@@ -124,7 +58,7 @@ impl<P: EquivocationDetectionPipeline, SC: SourceClient<P>, TC: TargetClient<P>>
);
// Reconnect to the source client if needed
self.handle_source_error(e).await
handle_client_error(&mut self.source_client, e).await;
},
}
}
@@ -140,116 +74,13 @@ impl<P: EquivocationDetectionPipeline, SC: SourceClient<P>, TC: TargetClient<P>>
);
// Reconnect target client and move on
self.handle_target_error(e).await;
handle_client_error(&mut self.target_client, e).await;
None
},
}
}
async fn build_equivocation_reporting_context(
&mut self,
block_num: P::TargetNumber,
) -> Option<EquivocationReportingContext<P>> {
match EquivocationReportingContext::try_read_from_target(
&self.target_client,
block_num.saturating_sub(1.into()),
)
.await
{
Ok(Some(context)) => Some(context),
Ok(None) => None,
Err(e) => {
log::error!(
target: "bridge",
"Could not read {} `EquivocationReportingContext` from {} at block {block_num}: {e:?}",
P::SOURCE_NAME,
P::TARGET_NAME,
);
// Reconnect target client if needed and move on.
self.handle_target_error(e).await;
None
},
}
}
/// Try to get the finality info associated to the source headers synced with the target chain
/// at the specified block.
async fn synced_source_headers_at_target(
&mut self,
at: P::TargetNumber,
) -> Vec<HeaderFinalityInfo<P>> {
match self.target_client.synced_headers_finality_info(at).await {
Ok(synced_headers) => synced_headers,
Err(e) => {
log::error!(
target: "bridge",
"Could not get {} headers synced to {} at block {at:?}",
P::SOURCE_NAME,
P::TARGET_NAME
);
// Reconnect in case of a connection error.
self.handle_target_error(e).await;
// And move on to the next block.
vec![]
},
}
}
async fn report_equivocation(&mut self, at: P::Hash, equivocation: P::EquivocationProof) {
match self.reporter.submit_report(&self.source_client, at, equivocation.clone()).await {
Ok(_) => {},
Err(e) => {
log::error!(
target: "bridge",
"Could not submit equivocation report to {} for {equivocation:?}: {e:?}",
P::SOURCE_NAME,
);
// Reconnect source client and move on
self.handle_source_error(e).await;
},
}
}
async fn check_block(
&mut self,
block_num: P::TargetNumber,
context: &mut EquivocationReportingContext<P>,
) {
let synced_headers = self.synced_source_headers_at_target(block_num).await;
for synced_header in synced_headers {
self.finality_proofs_buf.fill(&mut self.finality_proofs_stream);
let equivocations = match P::EquivocationsFinder::find_equivocations(
&context.synced_verification_context,
&synced_header.finality_proof,
self.finality_proofs_buf.buf().as_slice(),
) {
Ok(equivocations) => equivocations,
Err(e) => {
log::error!(
target: "bridge",
"Could not search for equivocations in the finality proof \
for source header {:?} synced at target block {block_num:?}: {e:?}",
synced_header.finality_proof.target_header_hash()
);
continue
},
};
for equivocation in equivocations {
self.report_equivocation(context.synced_header_hash, equivocation).await;
}
self.finality_proofs_buf
.prune(synced_header.finality_proof.target_header_number(), None);
context.update(synced_header);
}
}
async fn do_run(&mut self, tick: Duration, exit_signal: impl Future<Output = ()>) {
let exit_signal = exit_signal.fuse();
futures::pin_mut!(exit_signal);
@@ -273,15 +104,16 @@ impl<P: EquivocationDetectionPipeline, SC: SourceClient<P>, TC: TargetClient<P>>
// Check the available blocks
let mut current_block_number = from;
while current_block_number <= until {
let mut context =
match self.build_equivocation_reporting_context(current_block_number).await {
Some(context) => context,
None => {
current_block_number = current_block_number.saturating_add(1.into());
continue
},
};
self.check_block(current_block_number, &mut context).await;
self.finality_proofs_buf.fill(&mut self.finality_proofs_stream);
let block_checker = BlockChecker::new(current_block_number);
let _ = block_checker
.run(
&mut self.source_client,
&mut self.target_client,
&mut self.finality_proofs_buf,
&mut self.reporter,
)
.await;
current_block_number = current_block_number.saturating_add(1.into());
}
self.until_block_num = Some(current_block_number);