Merge commit 'e5bed7ac380b6adb54b60a2a72a2a8f07f50d6c1' as 'bridges'

This commit is contained in:
Hernando Castano
2021-04-21 11:56:23 -04:00
339 changed files with 71658 additions and 0 deletions
@@ -0,0 +1,21 @@
[package]
name = "finality-relay"
version = "0.1.0"
authors = ["Parity Technologies <admin@parity.io>"]
edition = "2018"
license = "GPL-3.0-or-later WITH Classpath-exception-2.0"
description = "Finality proofs relay"
[dependencies]
async-std = "1.6.5"
async-trait = "0.1.40"
backoff = "0.2"
bp-header-chain = { path = "../../primitives/header-chain" }
futures = "0.3.5"
headers-relay = { path = "../headers" }
log = "0.4.11"
num-traits = "0.2"
relay-utils = { path = "../utils" }
[dev-dependencies]
parking_lot = "0.11.0"
@@ -0,0 +1,599 @@
// Copyright 2019-2021 Parity Technologies (UK) Ltd.
// This file is part of Parity Bridges Common.
// Parity Bridges Common is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Bridges Common is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
//! The loop basically reads all missing headers and their finality proofs from the source client.
//! The proof for the best possible header is then submitted to the target node. The only exception
//! is the mandatory headers, which we always submit to the target node. For such headers, we
//! assume that the persistent proof either exists, or will eventually become available.
use crate::{FinalityProof, FinalitySyncPipeline, SourceHeader};
use async_trait::async_trait;
use backoff::backoff::Backoff;
use futures::{select, Future, FutureExt, Stream, StreamExt};
use headers_relay::sync_loop_metrics::SyncLoopMetrics;
use num_traits::{One, Saturating};
use relay_utils::{
metrics::{GlobalMetrics, MetricsParams},
relay_loop::Client as RelayClient,
retry_backoff, FailedClient, MaybeConnectionError,
};
use std::{
pin::Pin,
time::{Duration, Instant},
};
/// Finality proof synchronization loop parameters.
#[derive(Debug, Clone)]
pub struct FinalitySyncParams {
/// Interval at which we check updates on both clients. Normally should be larger than
/// `min(source_block_time, target_block_time)`.
///
/// This parameter may be used to limit transactions rate. Increase the value && you'll get
/// infrequent updates => sparse headers => potential slow down of bridge applications, but pallet storage
/// won't be super large. Decrease the value to near `source_block_time` and you'll get
/// transaction for (almost) every block of the source chain => all source headers will be known
/// to the target chain => bridge applications will run faster, but pallet storage may explode
/// (but if pruning is there, then it's fine).
pub tick: Duration,
/// Number of finality proofs to keep in internal buffer between loop wakeups.
///
/// While in "major syncing" state, we still read finality proofs from the stream. They're stored
/// in the internal buffer between loop wakeups. When we're close to the tip of the chain, we may
/// meet finality delays if headers are not finalized frequently. So instead of waiting for next
/// finality proof to appear in the stream, we may use existing proof from that buffer.
pub recent_finality_proofs_limit: usize,
/// Timeout before we treat our transactions as lost and restart the whole sync process.
pub stall_timeout: Duration,
}
/// Source client used in finality synchronization loop.
#[async_trait]
pub trait SourceClient<P: FinalitySyncPipeline>: RelayClient {
/// Stream of new finality proofs. The stream is allowed to miss proofs for some
/// headers, even if those headers are mandatory.
type FinalityProofsStream: Stream<Item = P::FinalityProof>;
/// Get best finalized block number.
async fn best_finalized_block_number(&self) -> Result<P::Number, Self::Error>;
/// Get canonical header and its finality proof by number.
async fn header_and_finality_proof(
&self,
number: P::Number,
) -> Result<(P::Header, Option<P::FinalityProof>), Self::Error>;
/// Subscribe to new finality proofs.
async fn finality_proofs(&self) -> Result<Self::FinalityProofsStream, Self::Error>;
}
/// Target client used in finality synchronization loop.
#[async_trait]
pub trait TargetClient<P: FinalitySyncPipeline>: RelayClient {
/// Get best finalized source block number.
async fn best_finalized_source_block_number(&self) -> Result<P::Number, Self::Error>;
/// Submit header finality proof.
async fn submit_finality_proof(&self, header: P::Header, proof: P::FinalityProof) -> Result<(), Self::Error>;
}
/// Return prefix that will be used by default to expose Prometheus metrics of the finality proofs sync loop.
pub fn metrics_prefix<P: FinalitySyncPipeline>() -> String {
format!("{}_to_{}_Sync", P::SOURCE_NAME, P::TARGET_NAME)
}
/// Run finality proofs synchronization loop.
pub async fn run<P: FinalitySyncPipeline>(
source_client: impl SourceClient<P>,
target_client: impl TargetClient<P>,
sync_params: FinalitySyncParams,
metrics_params: MetricsParams,
exit_signal: impl Future<Output = ()>,
) -> Result<(), String> {
let exit_signal = exit_signal.shared();
relay_utils::relay_loop(source_client, target_client)
.with_metrics(Some(metrics_prefix::<P>()), metrics_params)
.loop_metric(|registry, prefix| SyncLoopMetrics::new(registry, prefix))?
.standalone_metric(|registry, prefix| GlobalMetrics::new(registry, prefix))?
.expose()
.await?
.run(|source_client, target_client, metrics| {
run_until_connection_lost(
source_client,
target_client,
sync_params.clone(),
metrics,
exit_signal.clone(),
)
})
.await
}
/// Unjustified headers container. Ordered by header number.
pub(crate) type UnjustifiedHeaders<H> = Vec<H>;
/// Finality proofs container. Ordered by target header number.
pub(crate) type FinalityProofs<P> = Vec<(
<P as FinalitySyncPipeline>::Number,
<P as FinalitySyncPipeline>::FinalityProof,
)>;
/// Reference to finality proofs container.
pub(crate) type FinalityProofsRef<'a, P> = &'a [(
<P as FinalitySyncPipeline>::Number,
<P as FinalitySyncPipeline>::FinalityProof,
)];
/// Error that may happen inside finality synchronization loop.
#[derive(Debug)]
pub(crate) enum Error<P: FinalitySyncPipeline, SourceError, TargetError> {
/// Source client request has failed with given error.
Source(SourceError),
/// Target client request has failed with given error.
Target(TargetError),
/// Finality proof for mandatory header is missing from the source node.
MissingMandatoryFinalityProof(P::Number),
/// The synchronization has stalled.
Stalled,
}
impl<P, SourceError, TargetError> Error<P, SourceError, TargetError>
where
P: FinalitySyncPipeline,
SourceError: MaybeConnectionError,
TargetError: MaybeConnectionError,
{
fn fail_if_connection_error(&self) -> Result<(), FailedClient> {
match *self {
Error::Source(ref error) if error.is_connection_error() => Err(FailedClient::Source),
Error::Target(ref error) if error.is_connection_error() => Err(FailedClient::Target),
Error::Stalled => Err(FailedClient::Both),
_ => Ok(()),
}
}
}
/// Information about transaction that we have submitted.
#[derive(Debug, Clone)]
struct Transaction<Number> {
/// Time when we have submitted this transaction.
pub time: Instant,
/// The number of the header we have submitted.
pub submitted_header_number: Number,
}
/// Finality proofs stream that may be restarted.
pub(crate) struct RestartableFinalityProofsStream<S> {
/// Flag that the stream needs to be restarted.
pub(crate) needs_restart: bool,
/// The stream itself.
stream: Pin<Box<S>>,
}
#[cfg(test)]
impl<S> From<S> for RestartableFinalityProofsStream<S> {
fn from(stream: S) -> Self {
RestartableFinalityProofsStream {
needs_restart: false,
stream: Box::pin(stream),
}
}
}
/// Finality synchronization loop state.
struct FinalityLoopState<'a, P: FinalitySyncPipeline, FinalityProofsStream> {
/// Synchronization loop progress.
progress: &'a mut (Instant, Option<P::Number>),
/// Finality proofs stream.
finality_proofs_stream: &'a mut RestartableFinalityProofsStream<FinalityProofsStream>,
/// Recent finality proofs that we have read from the stream.
recent_finality_proofs: &'a mut FinalityProofs<P>,
/// Last transaction that we have submitted to the target node.
last_transaction: Option<Transaction<P::Number>>,
}
async fn run_until_connection_lost<P: FinalitySyncPipeline>(
source_client: impl SourceClient<P>,
target_client: impl TargetClient<P>,
sync_params: FinalitySyncParams,
metrics_sync: Option<SyncLoopMetrics>,
exit_signal: impl Future<Output = ()>,
) -> Result<(), FailedClient> {
let restart_finality_proofs_stream = || async {
source_client.finality_proofs().await.map_err(|error| {
log::error!(
target: "bridge",
"Failed to subscribe to {} justifications: {:?}. Going to reconnect",
P::SOURCE_NAME,
error,
);
FailedClient::Source
})
};
let exit_signal = exit_signal.fuse();
futures::pin_mut!(exit_signal);
let mut finality_proofs_stream = RestartableFinalityProofsStream {
needs_restart: false,
stream: Box::pin(restart_finality_proofs_stream().await?),
};
let mut recent_finality_proofs = Vec::new();
let mut progress = (Instant::now(), None);
let mut retry_backoff = retry_backoff();
let mut last_transaction = None;
loop {
// run loop iteration
let iteration_result = run_loop_iteration(
&source_client,
&target_client,
FinalityLoopState {
progress: &mut progress,
finality_proofs_stream: &mut finality_proofs_stream,
recent_finality_proofs: &mut recent_finality_proofs,
last_transaction: last_transaction.clone(),
},
&sync_params,
&metrics_sync,
)
.await;
// deal with errors
let next_tick = match iteration_result {
Ok(updated_last_transaction) => {
last_transaction = updated_last_transaction;
retry_backoff.reset();
sync_params.tick
}
Err(error) => {
log::error!(target: "bridge", "Finality sync loop iteration has failed with error: {:?}", error);
error.fail_if_connection_error()?;
retry_backoff
.next_backoff()
.unwrap_or(relay_utils::relay_loop::RECONNECT_DELAY)
}
};
if finality_proofs_stream.needs_restart {
log::warn!(target: "bridge", "{} finality proofs stream is being restarted", P::SOURCE_NAME);
finality_proofs_stream.needs_restart = false;
finality_proofs_stream.stream = Box::pin(restart_finality_proofs_stream().await?);
}
// wait till exit signal, or new source block
select! {
_ = async_std::task::sleep(next_tick).fuse() => {},
_ = exit_signal => return Ok(()),
}
}
}
async fn run_loop_iteration<P, SC, TC>(
source_client: &SC,
target_client: &TC,
state: FinalityLoopState<'_, P, SC::FinalityProofsStream>,
sync_params: &FinalitySyncParams,
metrics_sync: &Option<SyncLoopMetrics>,
) -> Result<Option<Transaction<P::Number>>, Error<P, SC::Error, TC::Error>>
where
P: FinalitySyncPipeline,
SC: SourceClient<P>,
TC: TargetClient<P>,
{
// read best source headers ids from source and target nodes
let best_number_at_source = source_client
.best_finalized_block_number()
.await
.map_err(Error::Source)?;
let best_number_at_target = target_client
.best_finalized_source_block_number()
.await
.map_err(Error::Target)?;
if let Some(ref metrics_sync) = *metrics_sync {
metrics_sync.update_best_block_at_source(best_number_at_source);
metrics_sync.update_best_block_at_target(best_number_at_target);
}
*state.progress = print_sync_progress::<P>(*state.progress, best_number_at_source, best_number_at_target);
// if we have already submitted header, then we just need to wait for it
// if we're waiting too much, then we believe our transaction has been lost and restart sync
if let Some(last_transaction) = state.last_transaction {
if best_number_at_target >= last_transaction.submitted_header_number {
// transaction has been mined && we can continue
} else if last_transaction.time.elapsed() > sync_params.stall_timeout {
log::error!(
target: "bridge",
"Finality synchronization from {} to {} has stalled. Going to restart",
P::SOURCE_NAME,
P::TARGET_NAME,
);
return Err(Error::Stalled);
} else {
return Ok(Some(last_transaction));
}
}
// submit new header if we have something new
match select_header_to_submit(
source_client,
target_client,
state.finality_proofs_stream,
state.recent_finality_proofs,
best_number_at_source,
best_number_at_target,
sync_params,
)
.await?
{
Some((header, justification)) => {
let new_transaction = Transaction {
time: Instant::now(),
submitted_header_number: header.number(),
};
log::debug!(
target: "bridge",
"Going to submit finality proof of {} header #{:?} to {}",
P::SOURCE_NAME,
new_transaction.submitted_header_number,
P::TARGET_NAME,
);
target_client
.submit_finality_proof(header, justification)
.await
.map_err(Error::Target)?;
Ok(Some(new_transaction))
}
None => Ok(None),
}
}
async fn select_header_to_submit<P, SC, TC>(
source_client: &SC,
target_client: &TC,
finality_proofs_stream: &mut RestartableFinalityProofsStream<SC::FinalityProofsStream>,
recent_finality_proofs: &mut FinalityProofs<P>,
best_number_at_source: P::Number,
best_number_at_target: P::Number,
sync_params: &FinalitySyncParams,
) -> Result<Option<(P::Header, P::FinalityProof)>, Error<P, SC::Error, TC::Error>>
where
P: FinalitySyncPipeline,
SC: SourceClient<P>,
TC: TargetClient<P>,
{
// to see that the loop is progressing
log::trace!(
target: "bridge",
"Considering range of headers ({:?}; {:?}]",
best_number_at_target,
best_number_at_source,
);
// read missing headers. if we see that the header schedules GRANDPA change, we need to
// submit this header
let selected_finality_proof = read_missing_headers::<P, SC, TC>(
source_client,
target_client,
best_number_at_source,
best_number_at_target,
)
.await?;
let (mut unjustified_headers, mut selected_finality_proof) = match selected_finality_proof {
SelectedFinalityProof::Mandatory(header, finality_proof) => return Ok(Some((header, finality_proof))),
SelectedFinalityProof::Regular(unjustified_headers, header, finality_proof) => {
(unjustified_headers, Some((header, finality_proof)))
}
SelectedFinalityProof::None(unjustified_headers) => (unjustified_headers, None),
};
// all headers that are missing from the target client are non-mandatory
// => even if we have already selected some header and its persistent finality proof,
// we may try to select better header by reading non-persistent proofs from the stream
read_finality_proofs_from_stream::<P, _>(finality_proofs_stream, recent_finality_proofs);
selected_finality_proof = select_better_recent_finality_proof::<P>(
recent_finality_proofs,
&mut unjustified_headers,
selected_finality_proof,
);
// remove obsolete 'recent' finality proofs + keep its size under certain limit
let oldest_finality_proof_to_keep = selected_finality_proof
.as_ref()
.map(|(header, _)| header.number())
.unwrap_or(best_number_at_target);
prune_recent_finality_proofs::<P>(
oldest_finality_proof_to_keep,
recent_finality_proofs,
sync_params.recent_finality_proofs_limit,
);
Ok(selected_finality_proof)
}
/// Finality proof that has been selected by the `read_missing_headers` function.
pub(crate) enum SelectedFinalityProof<Header, FinalityProof> {
/// Mandatory header and its proof has been selected. We shall submit proof for this header.
Mandatory(Header, FinalityProof),
/// Regular header and its proof has been selected. We may submit this proof, or proof for
/// some better header.
Regular(UnjustifiedHeaders<Header>, Header, FinalityProof),
/// We haven't found any missing header with persistent proof at the target client.
None(UnjustifiedHeaders<Header>),
}
/// Read missing headers and their persistent finality proofs from the target client.
///
/// If we have found some header with known proof, it is returned.
/// Otherwise, `SelectedFinalityProof::None` is returned.
///
/// Unless we have found mandatory header, all missing headers are collected and returned.
pub(crate) async fn read_missing_headers<P: FinalitySyncPipeline, SC: SourceClient<P>, TC: TargetClient<P>>(
source_client: &SC,
_target_client: &TC,
best_number_at_source: P::Number,
best_number_at_target: P::Number,
) -> Result<SelectedFinalityProof<P::Header, P::FinalityProof>, Error<P, SC::Error, TC::Error>> {
let mut unjustified_headers = Vec::new();
let mut selected_finality_proof = None;
let mut header_number = best_number_at_target + One::one();
while header_number <= best_number_at_source {
let (header, finality_proof) = source_client
.header_and_finality_proof(header_number)
.await
.map_err(Error::Source)?;
let is_mandatory = header.is_mandatory();
match (is_mandatory, finality_proof) {
(true, Some(finality_proof)) => {
log::trace!(target: "bridge", "Header {:?} is mandatory", header_number);
return Ok(SelectedFinalityProof::Mandatory(header, finality_proof));
}
(true, None) => return Err(Error::MissingMandatoryFinalityProof(header.number())),
(false, Some(finality_proof)) => {
log::trace!(target: "bridge", "Header {:?} has persistent finality proof", header_number);
unjustified_headers.clear();
selected_finality_proof = Some((header, finality_proof));
}
(false, None) => {
unjustified_headers.push(header);
}
}
header_number = header_number + One::one();
}
Ok(match selected_finality_proof {
Some((header, proof)) => SelectedFinalityProof::Regular(unjustified_headers, header, proof),
None => SelectedFinalityProof::None(unjustified_headers),
})
}
/// Read finality proofs from the stream.
pub(crate) fn read_finality_proofs_from_stream<P: FinalitySyncPipeline, FPS: Stream<Item = P::FinalityProof>>(
finality_proofs_stream: &mut RestartableFinalityProofsStream<FPS>,
recent_finality_proofs: &mut FinalityProofs<P>,
) {
loop {
let next_proof = finality_proofs_stream.stream.next();
let finality_proof = match next_proof.now_or_never() {
Some(Some(finality_proof)) => finality_proof,
Some(None) => {
finality_proofs_stream.needs_restart = true;
break;
}
None => break,
};
recent_finality_proofs.push((finality_proof.target_header_number(), finality_proof));
}
}
/// Try to select better header and its proof, given finality proofs that we
/// have recently read from the stream.
pub(crate) fn select_better_recent_finality_proof<P: FinalitySyncPipeline>(
recent_finality_proofs: FinalityProofsRef<P>,
unjustified_headers: &mut UnjustifiedHeaders<P::Header>,
selected_finality_proof: Option<(P::Header, P::FinalityProof)>,
) -> Option<(P::Header, P::FinalityProof)> {
if unjustified_headers.is_empty() || recent_finality_proofs.is_empty() {
return selected_finality_proof;
}
const NOT_EMPTY_PROOF: &str = "we have checked that the vec is not empty; qed";
// we need proofs for headers in range unjustified_range_begin..=unjustified_range_end
let unjustified_range_begin = unjustified_headers.first().expect(NOT_EMPTY_PROOF).number();
let unjustified_range_end = unjustified_headers.last().expect(NOT_EMPTY_PROOF).number();
// we have proofs for headers in range buffered_range_begin..=buffered_range_end
let buffered_range_begin = recent_finality_proofs.first().expect(NOT_EMPTY_PROOF).0;
let buffered_range_end = recent_finality_proofs.last().expect(NOT_EMPTY_PROOF).0;
// we have two ranges => find intersection
let intersection_begin = std::cmp::max(unjustified_range_begin, buffered_range_begin);
let intersection_end = std::cmp::min(unjustified_range_end, buffered_range_end);
let intersection = intersection_begin..=intersection_end;
// find last proof from intersection
let selected_finality_proof_index = recent_finality_proofs
.binary_search_by_key(intersection.end(), |(number, _)| *number)
.unwrap_or_else(|index| index.saturating_sub(1));
let (selected_header_number, finality_proof) = &recent_finality_proofs[selected_finality_proof_index];
if !intersection.contains(selected_header_number) {
return selected_finality_proof;
}
// now remove all obsolete headers and extract selected header
let selected_header_position = unjustified_headers
.binary_search_by_key(selected_header_number, |header| header.number())
.expect("unjustified_headers contain all headers from intersection; qed");
let selected_header = unjustified_headers.swap_remove(selected_header_position);
Some((selected_header, finality_proof.clone()))
}
pub(crate) fn prune_recent_finality_proofs<P: FinalitySyncPipeline>(
justified_header_number: P::Number,
recent_finality_proofs: &mut FinalityProofs<P>,
recent_finality_proofs_limit: usize,
) {
let position =
recent_finality_proofs.binary_search_by_key(&justified_header_number, |(header_number, _)| *header_number);
// remove all obsolete elements
*recent_finality_proofs = recent_finality_proofs.split_off(
position
.map(|position| position + 1)
.unwrap_or_else(|position| position),
);
// now - limit vec by size
let split_index = recent_finality_proofs
.len()
.saturating_sub(recent_finality_proofs_limit);
*recent_finality_proofs = recent_finality_proofs.split_off(split_index);
}
fn print_sync_progress<P: FinalitySyncPipeline>(
progress_context: (Instant, Option<P::Number>),
best_number_at_source: P::Number,
best_number_at_target: P::Number,
) -> (Instant, Option<P::Number>) {
let (prev_time, prev_best_number_at_target) = progress_context;
let now = Instant::now();
let need_update = now - prev_time > Duration::from_secs(10)
|| prev_best_number_at_target
.map(|prev_best_number_at_target| {
best_number_at_target.saturating_sub(prev_best_number_at_target) > 10.into()
})
.unwrap_or(true);
if !need_update {
return (prev_time, prev_best_number_at_target);
}
log::info!(
target: "bridge",
"Synced {:?} of {:?} headers",
best_number_at_target,
best_number_at_source,
);
(now, Some(best_number_at_target))
}
@@ -0,0 +1,404 @@
// Copyright 2019-2021 Parity Technologies (UK) Ltd.
// This file is part of Parity Bridges Common.
// Parity Bridges Common is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Bridges Common is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
//! Tests for finality synchronization loop.
#![cfg(test)]
use crate::finality_loop::{
prune_recent_finality_proofs, read_finality_proofs_from_stream, run, select_better_recent_finality_proof,
FinalityProofs, FinalitySyncParams, SourceClient, TargetClient,
};
use crate::{FinalityProof, FinalitySyncPipeline, SourceHeader};
use async_trait::async_trait;
use futures::{FutureExt, Stream, StreamExt};
use parking_lot::Mutex;
use relay_utils::{metrics::MetricsParams, relay_loop::Client as RelayClient, MaybeConnectionError};
use std::{collections::HashMap, pin::Pin, sync::Arc, time::Duration};
type IsMandatory = bool;
type TestNumber = u64;
#[derive(Debug, Clone)]
enum TestError {
NonConnection,
}
impl MaybeConnectionError for TestError {
fn is_connection_error(&self) -> bool {
false
}
}
#[derive(Debug, Clone)]
struct TestFinalitySyncPipeline;
impl FinalitySyncPipeline for TestFinalitySyncPipeline {
const SOURCE_NAME: &'static str = "TestSource";
const TARGET_NAME: &'static str = "TestTarget";
type Hash = u64;
type Number = TestNumber;
type Header = TestSourceHeader;
type FinalityProof = TestFinalityProof;
}
#[derive(Debug, Clone, PartialEq)]
struct TestSourceHeader(IsMandatory, TestNumber);
impl SourceHeader<TestNumber> for TestSourceHeader {
fn number(&self) -> TestNumber {
self.1
}
fn is_mandatory(&self) -> bool {
self.0
}
}
#[derive(Debug, Clone, PartialEq)]
struct TestFinalityProof(TestNumber);
impl FinalityProof<TestNumber> for TestFinalityProof {
fn target_header_number(&self) -> TestNumber {
self.0
}
}
#[derive(Debug, Clone, Default)]
struct ClientsData {
source_best_block_number: TestNumber,
source_headers: HashMap<TestNumber, (TestSourceHeader, Option<TestFinalityProof>)>,
source_proofs: Vec<TestFinalityProof>,
target_best_block_number: TestNumber,
target_headers: Vec<(TestSourceHeader, TestFinalityProof)>,
}
#[derive(Clone)]
struct TestSourceClient {
on_method_call: Arc<dyn Fn(&mut ClientsData) + Send + Sync>,
data: Arc<Mutex<ClientsData>>,
}
#[async_trait]
impl RelayClient for TestSourceClient {
type Error = TestError;
async fn reconnect(&mut self) -> Result<(), TestError> {
unreachable!()
}
}
#[async_trait]
impl SourceClient<TestFinalitySyncPipeline> for TestSourceClient {
type FinalityProofsStream = Pin<Box<dyn Stream<Item = TestFinalityProof>>>;
async fn best_finalized_block_number(&self) -> Result<TestNumber, TestError> {
let mut data = self.data.lock();
(self.on_method_call)(&mut *data);
Ok(data.source_best_block_number)
}
async fn header_and_finality_proof(
&self,
number: TestNumber,
) -> Result<(TestSourceHeader, Option<TestFinalityProof>), TestError> {
let mut data = self.data.lock();
(self.on_method_call)(&mut *data);
data.source_headers
.get(&number)
.cloned()
.ok_or(TestError::NonConnection)
}
async fn finality_proofs(&self) -> Result<Self::FinalityProofsStream, TestError> {
let mut data = self.data.lock();
(self.on_method_call)(&mut *data);
Ok(futures::stream::iter(data.source_proofs.clone()).boxed())
}
}
#[derive(Clone)]
struct TestTargetClient {
on_method_call: Arc<dyn Fn(&mut ClientsData) + Send + Sync>,
data: Arc<Mutex<ClientsData>>,
}
#[async_trait]
impl RelayClient for TestTargetClient {
type Error = TestError;
async fn reconnect(&mut self) -> Result<(), TestError> {
unreachable!()
}
}
#[async_trait]
impl TargetClient<TestFinalitySyncPipeline> for TestTargetClient {
async fn best_finalized_source_block_number(&self) -> Result<TestNumber, TestError> {
let mut data = self.data.lock();
(self.on_method_call)(&mut *data);
Ok(data.target_best_block_number)
}
async fn submit_finality_proof(&self, header: TestSourceHeader, proof: TestFinalityProof) -> Result<(), TestError> {
let mut data = self.data.lock();
(self.on_method_call)(&mut *data);
data.target_best_block_number = header.number();
data.target_headers.push((header, proof));
Ok(())
}
}
fn run_sync_loop(state_function: impl Fn(&mut ClientsData) -> bool + Send + Sync + 'static) -> ClientsData {
let (exit_sender, exit_receiver) = futures::channel::mpsc::unbounded();
let internal_state_function: Arc<dyn Fn(&mut ClientsData) + Send + Sync> = Arc::new(move |data| {
if state_function(data) {
exit_sender.unbounded_send(()).unwrap();
}
});
let clients_data = Arc::new(Mutex::new(ClientsData {
source_best_block_number: 10,
source_headers: vec![
(6, (TestSourceHeader(false, 6), None)),
(7, (TestSourceHeader(false, 7), Some(TestFinalityProof(7)))),
(8, (TestSourceHeader(true, 8), Some(TestFinalityProof(8)))),
(9, (TestSourceHeader(false, 9), Some(TestFinalityProof(9)))),
(10, (TestSourceHeader(false, 10), None)),
]
.into_iter()
.collect(),
source_proofs: vec![TestFinalityProof(12), TestFinalityProof(14)],
target_best_block_number: 5,
target_headers: vec![],
}));
let source_client = TestSourceClient {
on_method_call: internal_state_function.clone(),
data: clients_data.clone(),
};
let target_client = TestTargetClient {
on_method_call: internal_state_function,
data: clients_data.clone(),
};
let sync_params = FinalitySyncParams {
tick: Duration::from_secs(0),
recent_finality_proofs_limit: 1024,
stall_timeout: Duration::from_secs(1),
};
let _ = async_std::task::block_on(run(
source_client,
target_client,
sync_params,
MetricsParams::disabled(),
exit_receiver.into_future().map(|(_, _)| ()),
));
let clients_data = clients_data.lock().clone();
clients_data
}
#[test]
fn finality_sync_loop_works() {
let client_data = run_sync_loop(|data| {
// header#7 has persistent finality proof, but it isn't mandatory => it isn't submitted, because
// header#8 has persistent finality proof && it is mandatory => it is submitted
// header#9 has persistent finality proof, but it isn't mandatory => it is submitted, because
// there are no more persistent finality proofs
//
// once this ^^^ is done, we generate more blocks && read proof for blocks 12 and 14 from the stream
if data.target_best_block_number == 9 {
data.source_best_block_number = 14;
data.source_headers.insert(11, (TestSourceHeader(false, 11), None));
data.source_headers
.insert(12, (TestSourceHeader(false, 12), Some(TestFinalityProof(12))));
data.source_headers.insert(13, (TestSourceHeader(false, 13), None));
data.source_headers
.insert(14, (TestSourceHeader(false, 14), Some(TestFinalityProof(14))));
}
// once this ^^^ is done, we generate more blocks && read persistent proof for block 16
if data.target_best_block_number == 14 {
data.source_best_block_number = 17;
data.source_headers.insert(15, (TestSourceHeader(false, 15), None));
data.source_headers
.insert(16, (TestSourceHeader(false, 16), Some(TestFinalityProof(16))));
data.source_headers.insert(17, (TestSourceHeader(false, 17), None));
}
data.target_best_block_number == 16
});
assert_eq!(
client_data.target_headers,
vec![
// before adding 11..14: finality proof for mandatory header#8
(TestSourceHeader(true, 8), TestFinalityProof(8)),
// before adding 11..14: persistent finality proof for non-mandatory header#9
(TestSourceHeader(false, 9), TestFinalityProof(9)),
// after adding 11..14: ephemeral finality proof for non-mandatory header#14
(TestSourceHeader(false, 14), TestFinalityProof(14)),
// after adding 15..17: persistent finality proof for non-mandatory header#16
(TestSourceHeader(false, 16), TestFinalityProof(16)),
],
);
}
#[test]
fn select_better_recent_finality_proof_works() {
// if there are no unjustified headers, nothing is changed
assert_eq!(
select_better_recent_finality_proof::<TestFinalitySyncPipeline>(
&[(5, TestFinalityProof(5))],
&mut vec![],
Some((TestSourceHeader(false, 2), TestFinalityProof(2))),
),
Some((TestSourceHeader(false, 2), TestFinalityProof(2))),
);
// if there are no recent finality proofs, nothing is changed
assert_eq!(
select_better_recent_finality_proof::<TestFinalitySyncPipeline>(
&[],
&mut vec![TestSourceHeader(false, 5)],
Some((TestSourceHeader(false, 2), TestFinalityProof(2))),
),
Some((TestSourceHeader(false, 2), TestFinalityProof(2))),
);
// if there's no intersection between recent finality proofs and unjustified headers, nothing is changed
let mut unjustified_headers = vec![TestSourceHeader(false, 9), TestSourceHeader(false, 10)];
assert_eq!(
select_better_recent_finality_proof::<TestFinalitySyncPipeline>(
&[(1, TestFinalityProof(1)), (4, TestFinalityProof(4))],
&mut unjustified_headers,
Some((TestSourceHeader(false, 2), TestFinalityProof(2))),
),
Some((TestSourceHeader(false, 2), TestFinalityProof(2))),
);
// if there's intersection between recent finality proofs and unjustified headers, but there are no
// proofs in this intersection, nothing is changed
let mut unjustified_headers = vec![
TestSourceHeader(false, 8),
TestSourceHeader(false, 9),
TestSourceHeader(false, 10),
];
assert_eq!(
select_better_recent_finality_proof::<TestFinalitySyncPipeline>(
&[(7, TestFinalityProof(7)), (11, TestFinalityProof(11))],
&mut unjustified_headers,
Some((TestSourceHeader(false, 2), TestFinalityProof(2))),
),
Some((TestSourceHeader(false, 2), TestFinalityProof(2))),
);
assert_eq!(
unjustified_headers,
vec![
TestSourceHeader(false, 8),
TestSourceHeader(false, 9),
TestSourceHeader(false, 10)
]
);
// if there's intersection between recent finality proofs and unjustified headers and there's
// a proof in this intersection:
// - this better (last from intersection) proof is selected;
// - 'obsolete' unjustified headers are pruned.
let mut unjustified_headers = vec![
TestSourceHeader(false, 8),
TestSourceHeader(false, 9),
TestSourceHeader(false, 10),
];
assert_eq!(
select_better_recent_finality_proof::<TestFinalitySyncPipeline>(
&[(7, TestFinalityProof(7)), (9, TestFinalityProof(9))],
&mut unjustified_headers,
Some((TestSourceHeader(false, 2), TestFinalityProof(2))),
),
Some((TestSourceHeader(false, 9), TestFinalityProof(9))),
);
}
#[test]
fn read_finality_proofs_from_stream_works() {
// when stream is currently empty, nothing is changed
let mut recent_finality_proofs = vec![(1, TestFinalityProof(1))];
let mut stream = futures::stream::pending().into();
read_finality_proofs_from_stream::<TestFinalitySyncPipeline, _>(&mut stream, &mut recent_finality_proofs);
assert_eq!(recent_finality_proofs, vec![(1, TestFinalityProof(1))]);
assert_eq!(stream.needs_restart, false);
// when stream has entry with target, it is added to the recent proofs container
let mut stream = futures::stream::iter(vec![TestFinalityProof(4)])
.chain(futures::stream::pending())
.into();
read_finality_proofs_from_stream::<TestFinalitySyncPipeline, _>(&mut stream, &mut recent_finality_proofs);
assert_eq!(
recent_finality_proofs,
vec![(1, TestFinalityProof(1)), (4, TestFinalityProof(4))]
);
assert_eq!(stream.needs_restart, false);
// when stream has ended, we'll need to restart it
let mut stream = futures::stream::empty().into();
read_finality_proofs_from_stream::<TestFinalitySyncPipeline, _>(&mut stream, &mut recent_finality_proofs);
assert_eq!(
recent_finality_proofs,
vec![(1, TestFinalityProof(1)), (4, TestFinalityProof(4))]
);
assert_eq!(stream.needs_restart, true);
}
#[test]
fn prune_recent_finality_proofs_works() {
let original_recent_finality_proofs: FinalityProofs<TestFinalitySyncPipeline> = vec![
(10, TestFinalityProof(10)),
(13, TestFinalityProof(13)),
(15, TestFinalityProof(15)),
(17, TestFinalityProof(17)),
(19, TestFinalityProof(19)),
]
.into_iter()
.collect();
// when there's proof for justified header in the vec
let mut recent_finality_proofs = original_recent_finality_proofs.clone();
prune_recent_finality_proofs::<TestFinalitySyncPipeline>(10, &mut recent_finality_proofs, 1024);
assert_eq!(&original_recent_finality_proofs[1..], recent_finality_proofs,);
// when there are no proof for justified header in the vec
let mut recent_finality_proofs = original_recent_finality_proofs.clone();
prune_recent_finality_proofs::<TestFinalitySyncPipeline>(11, &mut recent_finality_proofs, 1024);
assert_eq!(&original_recent_finality_proofs[1..], recent_finality_proofs,);
// when there are too many entries after initial prune && they also need to be pruned
let mut recent_finality_proofs = original_recent_finality_proofs.clone();
prune_recent_finality_proofs::<TestFinalitySyncPipeline>(10, &mut recent_finality_proofs, 2);
assert_eq!(&original_recent_finality_proofs[3..], recent_finality_proofs,);
// when last entry is pruned
let mut recent_finality_proofs = original_recent_finality_proofs.clone();
prune_recent_finality_proofs::<TestFinalitySyncPipeline>(19, &mut recent_finality_proofs, 2);
assert_eq!(&original_recent_finality_proofs[5..], recent_finality_proofs,);
// when post-last entry is pruned
let mut recent_finality_proofs = original_recent_finality_proofs.clone();
prune_recent_finality_proofs::<TestFinalitySyncPipeline>(20, &mut recent_finality_proofs, 2);
assert_eq!(&original_recent_finality_proofs[5..], recent_finality_proofs,);
}
@@ -0,0 +1,53 @@
// Copyright 2019-2021 Parity Technologies (UK) Ltd.
// This file is part of Parity Bridges Common.
// Parity Bridges Common is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Bridges Common is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
//! This crate has single entrypoint to run synchronization loop that is built around finality
//! proofs, as opposed to headers synchronization loop, which is built around headers. The headers
//! are still submitted to the target node, but are treated as auxiliary data as we are not trying
//! to submit all source headers to the target node.
pub use crate::finality_loop::{metrics_prefix, run, FinalitySyncParams, SourceClient, TargetClient};
use bp_header_chain::FinalityProof;
use std::fmt::Debug;
mod finality_loop;
mod finality_loop_tests;
/// Finality proofs synchronization pipeline.
pub trait FinalitySyncPipeline: Clone + Debug + Send + Sync {
/// Name of the finality proofs source.
const SOURCE_NAME: &'static str;
/// Name of the finality proofs target.
const TARGET_NAME: &'static str;
/// Headers we're syncing are identified by this hash.
type Hash: Eq + Clone + Copy + Send + Sync + Debug;
/// Headers we're syncing are identified by this number.
type Number: relay_utils::BlockNumberBase;
/// Type of header that we're syncing.
type Header: SourceHeader<Self::Number>;
/// Finality proof type.
type FinalityProof: FinalityProof<Self::Number>;
}
/// Header that we're receiving from source node.
pub trait SourceHeader<Number>: Clone + Debug + PartialEq + Send + Sync {
/// Returns number of header.
fn number(&self) -> Number;
/// Returns true if this header needs to be submitted to target node.
fn is_mandatory(&self) -> bool;
}