Finality loop refactoring (#2357)

This commit is contained in:
Serban Iorga
2023-08-16 12:42:51 +03:00
committed by Bastian Köcher
parent 48cae06a77
commit dc8aa5df7d
9 changed files with 1268 additions and 1221 deletions
+1 -1
View File
@@ -12,7 +12,7 @@ async-trait = "0.1"
backoff = "0.4"
bp-header-chain = { path = "../../primitives/header-chain" }
futures = "0.3.28"
log = "0.4.17"
log = "0.4.20"
num-traits = "0.2"
relay-utils = { path = "../utils" }
File diff suppressed because it is too large Load Diff
@@ -1,604 +0,0 @@
// Copyright 2019-2021 Parity Technologies (UK) Ltd.
// This file is part of Parity Bridges Common.
// Parity Bridges Common is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Bridges Common is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
//! Tests for finality synchronization loop.
#![cfg(test)]
use crate::{
finality_loop::{
prune_recent_finality_proofs, read_finality_proofs_from_stream, run_loop_iteration,
run_until_connection_lost, select_better_recent_finality_proof, select_header_to_submit,
FinalityLoopState, FinalityProofs, FinalitySyncParams, RestartableFinalityProofsStream,
SourceClient, TargetClient,
},
sync_loop_metrics::SyncLoopMetrics,
FinalityPipeline, FinalitySyncPipeline, SourceClientBase, SourceHeader,
};
use async_trait::async_trait;
use bp_header_chain::{FinalityProof, GrandpaConsensusLogReader};
use futures::{FutureExt, Stream, StreamExt};
use parking_lot::Mutex;
use relay_utils::{
relay_loop::Client as RelayClient, FailedClient, HeaderId, MaybeConnectionError,
TrackedTransactionStatus, TransactionTracker,
};
use std::{
collections::HashMap,
pin::Pin,
sync::Arc,
time::{Duration, Instant},
};
type IsMandatory = bool;
type TestNumber = u64;
type TestHash = u64;
#[derive(Clone, Debug)]
struct TestTransactionTracker(TrackedTransactionStatus<HeaderId<TestHash, TestNumber>>);
impl Default for TestTransactionTracker {
fn default() -> TestTransactionTracker {
TestTransactionTracker(TrackedTransactionStatus::Finalized(Default::default()))
}
}
#[async_trait]
impl TransactionTracker for TestTransactionTracker {
type HeaderId = HeaderId<TestHash, TestNumber>;
async fn wait(self) -> TrackedTransactionStatus<HeaderId<TestHash, TestNumber>> {
self.0
}
}
#[derive(Debug, Clone)]
enum TestError {
NonConnection,
}
impl MaybeConnectionError for TestError {
fn is_connection_error(&self) -> bool {
false
}
}
#[derive(Debug, Clone)]
struct TestFinalitySyncPipeline;
impl FinalityPipeline for TestFinalitySyncPipeline {
const SOURCE_NAME: &'static str = "TestSource";
const TARGET_NAME: &'static str = "TestTarget";
type Hash = TestHash;
type Number = TestNumber;
type FinalityProof = TestFinalityProof;
}
impl FinalitySyncPipeline for TestFinalitySyncPipeline {
type ConsensusLogReader = GrandpaConsensusLogReader<TestNumber>;
type Header = TestSourceHeader;
}
#[derive(Debug, Clone, PartialEq, Eq)]
struct TestSourceHeader(IsMandatory, TestNumber, TestHash);
impl SourceHeader<TestHash, TestNumber, GrandpaConsensusLogReader<TestNumber>>
for TestSourceHeader
{
fn hash(&self) -> TestHash {
self.2
}
fn number(&self) -> TestNumber {
self.1
}
fn is_mandatory(&self) -> bool {
self.0
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
struct TestFinalityProof(TestNumber);
impl FinalityProof<TestNumber> for TestFinalityProof {
fn target_header_number(&self) -> TestNumber {
self.0
}
}
#[derive(Debug, Clone, Default)]
struct ClientsData {
source_best_block_number: TestNumber,
source_headers: HashMap<TestNumber, (TestSourceHeader, Option<TestFinalityProof>)>,
source_proofs: Vec<TestFinalityProof>,
target_best_block_id: HeaderId<TestHash, TestNumber>,
target_headers: Vec<(TestSourceHeader, TestFinalityProof)>,
target_transaction_tracker: TestTransactionTracker,
}
#[derive(Clone)]
struct TestSourceClient {
on_method_call: Arc<dyn Fn(&mut ClientsData) + Send + Sync>,
data: Arc<Mutex<ClientsData>>,
}
#[async_trait]
impl RelayClient for TestSourceClient {
type Error = TestError;
async fn reconnect(&mut self) -> Result<(), TestError> {
unreachable!()
}
}
#[async_trait]
impl SourceClientBase<TestFinalitySyncPipeline> for TestSourceClient {
type FinalityProofsStream = Pin<Box<dyn Stream<Item = TestFinalityProof> + 'static + Send>>;
async fn finality_proofs(&self) -> Result<Self::FinalityProofsStream, TestError> {
let mut data = self.data.lock();
(self.on_method_call)(&mut data);
Ok(futures::stream::iter(data.source_proofs.clone()).boxed())
}
}
#[async_trait]
impl SourceClient<TestFinalitySyncPipeline> for TestSourceClient {
async fn best_finalized_block_number(&self) -> Result<TestNumber, TestError> {
let mut data = self.data.lock();
(self.on_method_call)(&mut data);
Ok(data.source_best_block_number)
}
async fn header_and_finality_proof(
&self,
number: TestNumber,
) -> Result<(TestSourceHeader, Option<TestFinalityProof>), TestError> {
let mut data = self.data.lock();
(self.on_method_call)(&mut data);
data.source_headers.get(&number).cloned().ok_or(TestError::NonConnection)
}
}
#[derive(Clone)]
struct TestTargetClient {
on_method_call: Arc<dyn Fn(&mut ClientsData) + Send + Sync>,
data: Arc<Mutex<ClientsData>>,
}
#[async_trait]
impl RelayClient for TestTargetClient {
type Error = TestError;
async fn reconnect(&mut self) -> Result<(), TestError> {
unreachable!()
}
}
#[async_trait]
impl TargetClient<TestFinalitySyncPipeline> for TestTargetClient {
type TransactionTracker = TestTransactionTracker;
async fn best_finalized_source_block_id(
&self,
) -> Result<HeaderId<TestHash, TestNumber>, TestError> {
let mut data = self.data.lock();
(self.on_method_call)(&mut data);
Ok(data.target_best_block_id)
}
async fn submit_finality_proof(
&self,
header: TestSourceHeader,
proof: TestFinalityProof,
) -> Result<TestTransactionTracker, TestError> {
let mut data = self.data.lock();
(self.on_method_call)(&mut data);
data.target_best_block_id = HeaderId(header.number(), header.hash());
data.target_headers.push((header, proof));
(self.on_method_call)(&mut data);
Ok(data.target_transaction_tracker.clone())
}
}
fn prepare_test_clients(
exit_sender: futures::channel::mpsc::UnboundedSender<()>,
state_function: impl Fn(&mut ClientsData) -> bool + Send + Sync + 'static,
source_headers: HashMap<TestNumber, (TestSourceHeader, Option<TestFinalityProof>)>,
) -> (TestSourceClient, TestTargetClient) {
let internal_state_function: Arc<dyn Fn(&mut ClientsData) + Send + Sync> =
Arc::new(move |data| {
if state_function(data) {
exit_sender.unbounded_send(()).unwrap();
}
});
let clients_data = Arc::new(Mutex::new(ClientsData {
source_best_block_number: 10,
source_headers,
source_proofs: vec![TestFinalityProof(12), TestFinalityProof(14)],
target_best_block_id: HeaderId(5, 5),
target_headers: vec![],
target_transaction_tracker: TestTransactionTracker(TrackedTransactionStatus::Finalized(
Default::default(),
)),
}));
(
TestSourceClient {
on_method_call: internal_state_function.clone(),
data: clients_data.clone(),
},
TestTargetClient { on_method_call: internal_state_function, data: clients_data },
)
}
fn test_sync_params() -> FinalitySyncParams {
FinalitySyncParams {
tick: Duration::from_secs(0),
recent_finality_proofs_limit: 1024,
stall_timeout: Duration::from_secs(1),
only_mandatory_headers: false,
}
}
fn run_sync_loop(
state_function: impl Fn(&mut ClientsData) -> bool + Send + Sync + 'static,
) -> (ClientsData, Result<(), FailedClient>) {
let (exit_sender, exit_receiver) = futures::channel::mpsc::unbounded();
let (source_client, target_client) = prepare_test_clients(
exit_sender,
state_function,
vec![
(5, (TestSourceHeader(false, 5, 5), None)),
(6, (TestSourceHeader(false, 6, 6), None)),
(7, (TestSourceHeader(false, 7, 7), Some(TestFinalityProof(7)))),
(8, (TestSourceHeader(true, 8, 8), Some(TestFinalityProof(8)))),
(9, (TestSourceHeader(false, 9, 9), Some(TestFinalityProof(9)))),
(10, (TestSourceHeader(false, 10, 10), None)),
]
.into_iter()
.collect(),
);
let sync_params = test_sync_params();
let clients_data = source_client.data.clone();
let result = async_std::task::block_on(run_until_connection_lost(
source_client,
target_client,
sync_params,
None,
exit_receiver.into_future().map(|(_, _)| ()),
));
let clients_data = clients_data.lock().clone();
(clients_data, result)
}
#[test]
fn finality_sync_loop_works() {
let (client_data, result) = run_sync_loop(|data| {
// header#7 has persistent finality proof, but it isn't mandatory => it isn't submitted,
// because header#8 has persistent finality proof && it is mandatory => it is submitted
// header#9 has persistent finality proof, but it isn't mandatory => it is submitted,
// because there are no more persistent finality proofs
//
// once this ^^^ is done, we generate more blocks && read proof for blocks 12 and 14 from
// the stream
if data.target_best_block_id.0 == 9 {
data.source_best_block_number = 14;
data.source_headers.insert(11, (TestSourceHeader(false, 11, 11), None));
data.source_headers
.insert(12, (TestSourceHeader(false, 12, 12), Some(TestFinalityProof(12))));
data.source_headers.insert(13, (TestSourceHeader(false, 13, 13), None));
data.source_headers
.insert(14, (TestSourceHeader(false, 14, 14), Some(TestFinalityProof(14))));
}
// once this ^^^ is done, we generate more blocks && read persistent proof for block 16
if data.target_best_block_id.0 == 14 {
data.source_best_block_number = 17;
data.source_headers.insert(15, (TestSourceHeader(false, 15, 15), None));
data.source_headers
.insert(16, (TestSourceHeader(false, 16, 16), Some(TestFinalityProof(16))));
data.source_headers.insert(17, (TestSourceHeader(false, 17, 17), None));
}
data.target_best_block_id.0 == 16
});
assert_eq!(result, Ok(()));
assert_eq!(
client_data.target_headers,
vec![
// before adding 11..14: finality proof for mandatory header#8
(TestSourceHeader(true, 8, 8), TestFinalityProof(8)),
// before adding 11..14: persistent finality proof for non-mandatory header#9
(TestSourceHeader(false, 9, 9), TestFinalityProof(9)),
// after adding 11..14: ephemeral finality proof for non-mandatory header#14
(TestSourceHeader(false, 14, 14), TestFinalityProof(14)),
// after adding 15..17: persistent finality proof for non-mandatory header#16
(TestSourceHeader(false, 16, 16), TestFinalityProof(16)),
],
);
}
fn run_only_mandatory_headers_mode_test(
only_mandatory_headers: bool,
has_mandatory_headers: bool,
) -> Option<(TestSourceHeader, TestFinalityProof)> {
let (exit_sender, _) = futures::channel::mpsc::unbounded();
let (source_client, target_client) = prepare_test_clients(
exit_sender,
|_| false,
vec![
(6, (TestSourceHeader(false, 6, 6), Some(TestFinalityProof(6)))),
(7, (TestSourceHeader(false, 7, 7), Some(TestFinalityProof(7)))),
(8, (TestSourceHeader(has_mandatory_headers, 8, 8), Some(TestFinalityProof(8)))),
(9, (TestSourceHeader(false, 9, 9), Some(TestFinalityProof(9)))),
(10, (TestSourceHeader(false, 10, 10), Some(TestFinalityProof(10)))),
]
.into_iter()
.collect(),
);
async_std::task::block_on(select_header_to_submit(
&source_client,
&target_client,
&mut RestartableFinalityProofsStream::from(futures::stream::empty().boxed()),
&mut vec![],
10,
5,
&FinalitySyncParams {
tick: Duration::from_secs(0),
recent_finality_proofs_limit: 0,
stall_timeout: Duration::from_secs(0),
only_mandatory_headers,
},
))
.unwrap()
}
#[test]
fn select_header_to_submit_skips_non_mandatory_headers_when_only_mandatory_headers_are_required() {
assert_eq!(run_only_mandatory_headers_mode_test(true, false), None);
assert_eq!(
run_only_mandatory_headers_mode_test(false, false),
Some((TestSourceHeader(false, 10, 10), TestFinalityProof(10))),
);
}
#[test]
fn select_header_to_submit_selects_mandatory_headers_when_only_mandatory_headers_are_required() {
assert_eq!(
run_only_mandatory_headers_mode_test(true, true),
Some((TestSourceHeader(true, 8, 8), TestFinalityProof(8))),
);
assert_eq!(
run_only_mandatory_headers_mode_test(false, true),
Some((TestSourceHeader(true, 8, 8), TestFinalityProof(8))),
);
}
#[test]
fn select_better_recent_finality_proof_works() {
// if there are no unjustified headers, nothing is changed
assert_eq!(
select_better_recent_finality_proof::<TestFinalitySyncPipeline>(
&[(5, TestFinalityProof(5))],
&mut vec![],
Some((TestSourceHeader(false, 2, 2), TestFinalityProof(2))),
),
Some((TestSourceHeader(false, 2, 2), TestFinalityProof(2))),
);
// if there are no recent finality proofs, nothing is changed
assert_eq!(
select_better_recent_finality_proof::<TestFinalitySyncPipeline>(
&[],
&mut vec![TestSourceHeader(false, 5, 5)],
Some((TestSourceHeader(false, 2, 2), TestFinalityProof(2))),
),
Some((TestSourceHeader(false, 2, 2), TestFinalityProof(2))),
);
// if there's no intersection between recent finality proofs and unjustified headers, nothing is
// changed
let mut unjustified_headers =
vec![TestSourceHeader(false, 9, 9), TestSourceHeader(false, 10, 10)];
assert_eq!(
select_better_recent_finality_proof::<TestFinalitySyncPipeline>(
&[(1, TestFinalityProof(1)), (4, TestFinalityProof(4))],
&mut unjustified_headers,
Some((TestSourceHeader(false, 2, 2), TestFinalityProof(2))),
),
Some((TestSourceHeader(false, 2, 2), TestFinalityProof(2))),
);
// if there's intersection between recent finality proofs and unjustified headers, but there are
// no proofs in this intersection, nothing is changed
let mut unjustified_headers = vec![
TestSourceHeader(false, 8, 8),
TestSourceHeader(false, 9, 9),
TestSourceHeader(false, 10, 10),
];
assert_eq!(
select_better_recent_finality_proof::<TestFinalitySyncPipeline>(
&[(7, TestFinalityProof(7)), (11, TestFinalityProof(11))],
&mut unjustified_headers,
Some((TestSourceHeader(false, 2, 2), TestFinalityProof(2))),
),
Some((TestSourceHeader(false, 2, 2), TestFinalityProof(2))),
);
assert_eq!(
unjustified_headers,
vec![
TestSourceHeader(false, 8, 8),
TestSourceHeader(false, 9, 9),
TestSourceHeader(false, 10, 10)
]
);
// if there's intersection between recent finality proofs and unjustified headers and there's
// a proof in this intersection:
// - this better (last from intersection) proof is selected;
// - 'obsolete' unjustified headers are pruned.
let mut unjustified_headers = vec![
TestSourceHeader(false, 8, 8),
TestSourceHeader(false, 9, 9),
TestSourceHeader(false, 10, 10),
];
assert_eq!(
select_better_recent_finality_proof::<TestFinalitySyncPipeline>(
&[(7, TestFinalityProof(7)), (9, TestFinalityProof(9))],
&mut unjustified_headers,
Some((TestSourceHeader(false, 2, 2), TestFinalityProof(2))),
),
Some((TestSourceHeader(false, 9, 9), TestFinalityProof(9))),
);
}
#[test]
fn read_finality_proofs_from_stream_works() {
// when stream is currently empty, nothing is changed
let mut recent_finality_proofs = vec![(1, TestFinalityProof(1))];
let mut stream = futures::stream::pending().into();
read_finality_proofs_from_stream::<TestFinalitySyncPipeline, _>(
&mut stream,
&mut recent_finality_proofs,
);
assert_eq!(recent_finality_proofs, vec![(1, TestFinalityProof(1))]);
assert!(!stream.needs_restart);
// when stream has entry with target, it is added to the recent proofs container
let mut stream = futures::stream::iter(vec![TestFinalityProof(4)])
.chain(futures::stream::pending())
.into();
read_finality_proofs_from_stream::<TestFinalitySyncPipeline, _>(
&mut stream,
&mut recent_finality_proofs,
);
assert_eq!(recent_finality_proofs, vec![(1, TestFinalityProof(1)), (4, TestFinalityProof(4))]);
assert!(!stream.needs_restart);
// when stream has ended, we'll need to restart it
let mut stream = futures::stream::empty().into();
read_finality_proofs_from_stream::<TestFinalitySyncPipeline, _>(
&mut stream,
&mut recent_finality_proofs,
);
assert_eq!(recent_finality_proofs, vec![(1, TestFinalityProof(1)), (4, TestFinalityProof(4))]);
assert!(stream.needs_restart);
}
#[test]
fn prune_recent_finality_proofs_works() {
let original_recent_finality_proofs: FinalityProofs<TestFinalitySyncPipeline> = vec![
(10, TestFinalityProof(10)),
(13, TestFinalityProof(13)),
(15, TestFinalityProof(15)),
(17, TestFinalityProof(17)),
(19, TestFinalityProof(19)),
]
.into_iter()
.collect();
// when there's proof for justified header in the vec
let mut recent_finality_proofs = original_recent_finality_proofs.clone();
prune_recent_finality_proofs::<TestFinalitySyncPipeline>(10, &mut recent_finality_proofs, 1024);
assert_eq!(&original_recent_finality_proofs[1..], recent_finality_proofs,);
// when there are no proof for justified header in the vec
let mut recent_finality_proofs = original_recent_finality_proofs.clone();
prune_recent_finality_proofs::<TestFinalitySyncPipeline>(11, &mut recent_finality_proofs, 1024);
assert_eq!(&original_recent_finality_proofs[1..], recent_finality_proofs,);
// when there are too many entries after initial prune && they also need to be pruned
let mut recent_finality_proofs = original_recent_finality_proofs.clone();
prune_recent_finality_proofs::<TestFinalitySyncPipeline>(10, &mut recent_finality_proofs, 2);
assert_eq!(&original_recent_finality_proofs[3..], recent_finality_proofs,);
// when last entry is pruned
let mut recent_finality_proofs = original_recent_finality_proofs.clone();
prune_recent_finality_proofs::<TestFinalitySyncPipeline>(19, &mut recent_finality_proofs, 2);
assert_eq!(&original_recent_finality_proofs[5..], recent_finality_proofs,);
// when post-last entry is pruned
let mut recent_finality_proofs = original_recent_finality_proofs.clone();
prune_recent_finality_proofs::<TestFinalitySyncPipeline>(20, &mut recent_finality_proofs, 2);
assert_eq!(&original_recent_finality_proofs[5..], recent_finality_proofs,);
}
#[test]
fn different_forks_at_source_and_at_target_are_detected() {
let (exit_sender, _exit_receiver) = futures::channel::mpsc::unbounded();
let (source_client, target_client) = prepare_test_clients(
exit_sender,
|_| false,
vec![
(5, (TestSourceHeader(false, 5, 42), None)),
(6, (TestSourceHeader(false, 6, 6), None)),
(7, (TestSourceHeader(false, 7, 7), None)),
(8, (TestSourceHeader(false, 8, 8), None)),
(9, (TestSourceHeader(false, 9, 9), None)),
(10, (TestSourceHeader(false, 10, 10), None)),
]
.into_iter()
.collect(),
);
let mut progress = (Instant::now(), None);
let mut finality_proofs_stream = futures::stream::iter(vec![]).boxed().into();
let mut recent_finality_proofs = Vec::new();
let metrics_sync = SyncLoopMetrics::new(None, "source", "target").unwrap();
async_std::task::block_on(run_loop_iteration::<TestFinalitySyncPipeline, _, _>(
&source_client,
&target_client,
FinalityLoopState {
progress: &mut progress,
finality_proofs_stream: &mut finality_proofs_stream,
recent_finality_proofs: &mut recent_finality_proofs,
submitted_header_number: None,
},
&test_sync_params(),
&Some(metrics_sync.clone()),
))
.unwrap();
assert!(!metrics_sync.is_using_same_fork());
}
#[test]
fn stalls_when_transaction_tracker_returns_error() {
let (_, result) = run_sync_loop(|data| {
data.target_transaction_tracker = TestTransactionTracker(TrackedTransactionStatus::Lost);
data.target_best_block_id = HeaderId(5, 5);
data.target_best_block_id.0 == 16
});
assert_eq!(result, Err(FailedClient::Both));
}
#[test]
fn stalls_when_transaction_tracker_returns_finalized_but_transaction_fails() {
let (_, result) = run_sync_loop(|data| {
data.target_best_block_id = HeaderId(5, 5);
data.target_best_block_id.0 == 16
});
assert_eq!(result, Err(FailedClient::Both));
}
@@ -0,0 +1,227 @@
// Copyright 2019-2023 Parity Technologies (UK) Ltd.
// This file is part of Parity Bridges Common.
// Parity Bridges Common is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Bridges Common is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
use crate::{base::SourceClientBase, FinalityPipeline};
use bp_header_chain::FinalityProof;
use futures::{FutureExt, Stream, StreamExt};
use std::pin::Pin;
/// Finality proofs container. Ordered by target header number.
pub type FinalityProofs<P> =
Vec<(<P as FinalityPipeline>::Number, <P as FinalityPipeline>::FinalityProof)>;
/// Source finality proofs stream that may be restarted.
pub struct FinalityProofsStream<P: FinalityPipeline, SC: SourceClientBase<P>> {
/// The underlying stream.
stream: Option<Pin<Box<SC::FinalityProofsStream>>>,
}
impl<P: FinalityPipeline, SC: SourceClientBase<P>> FinalityProofsStream<P, SC> {
pub fn new() -> Self {
Self { stream: None }
}
fn next(&mut self) -> Option<<SC::FinalityProofsStream as Stream>::Item> {
let stream = match &mut self.stream {
Some(stream) => stream,
None => return None,
};
match stream.next().now_or_never() {
Some(Some(finality_proof)) => Some(finality_proof),
Some(None) => {
self.stream = None;
None
},
None => None,
}
}
pub async fn ensure_stream(&mut self, source_client: &SC) -> Result<(), SC::Error> {
if self.stream.is_none() {
log::warn!(target: "bridge", "{} finality proofs stream is being started / restarted",
P::SOURCE_NAME);
let stream = source_client.finality_proofs().await.map_err(|error| {
log::error!(
target: "bridge",
"Failed to subscribe to {} justifications: {:?}",
P::SOURCE_NAME,
error,
);
error
})?;
self.stream = Some(Box::pin(stream));
}
Ok(())
}
}
/// Source finality proofs buffer.
pub struct FinalityProofsBuf<P: FinalityPipeline> {
/// Proofs buffer.
buf: FinalityProofs<P>,
}
impl<P: FinalityPipeline> FinalityProofsBuf<P> {
pub fn new(buf: FinalityProofs<P>) -> Self {
Self { buf }
}
pub fn buf(&self) -> &FinalityProofs<P> {
&self.buf
}
pub fn fill<SC: SourceClientBase<P>>(&mut self, stream: &mut FinalityProofsStream<P, SC>) {
let mut proofs_count = 0;
let mut first_header_number = None;
let mut last_header_number = None;
while let Some(finality_proof) = stream.next() {
let target_header_number = finality_proof.target_header_number();
first_header_number.get_or_insert(target_header_number);
last_header_number = Some(target_header_number);
proofs_count += 1;
self.buf.push((target_header_number, finality_proof));
}
if proofs_count != 0 {
log::trace!(
target: "bridge",
"Read {} finality proofs from {} finality stream for headers in range [{:?}; {:?}]",
proofs_count,
P::SOURCE_NAME,
first_header_number,
last_header_number,
);
}
}
pub fn prune(&mut self, until_hdr_num: P::Number, buf_limit: usize) {
let kept_hdr_idx = self
.buf
.binary_search_by_key(&until_hdr_num, |(hdr_num, _)| *hdr_num)
.map(|idx| idx + 1)
.unwrap_or_else(|idx| idx);
let buf_limit_idx = self.buf.len().saturating_sub(buf_limit);
self.buf = self.buf.split_off(std::cmp::max(kept_hdr_idx, buf_limit_idx));
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::mock::*;
impl<P: FinalityPipeline, SC: SourceClientBase<P>> FinalityProofsStream<P, SC> {
fn from_stream(stream: SC::FinalityProofsStream) -> Self {
Self { stream: Some(Box::pin(stream)) }
}
}
#[test]
fn finality_proofs_buf_fill_works() {
// when stream is currently empty, nothing is changed
let mut finality_proofs_buf =
FinalityProofsBuf::<TestFinalitySyncPipeline> { buf: vec![(1, TestFinalityProof(1))] };
let mut stream =
FinalityProofsStream::<TestFinalitySyncPipeline, TestSourceClient>::from_stream(
Box::pin(futures::stream::pending()),
);
finality_proofs_buf.fill(&mut stream);
assert_eq!(finality_proofs_buf.buf, vec![(1, TestFinalityProof(1))]);
assert!(stream.stream.is_some());
// when stream has entry with target, it is added to the recent proofs container
let mut stream =
FinalityProofsStream::<TestFinalitySyncPipeline, TestSourceClient>::from_stream(
Box::pin(
futures::stream::iter(vec![TestFinalityProof(4)])
.chain(futures::stream::pending()),
),
);
finality_proofs_buf.fill(&mut stream);
assert_eq!(
finality_proofs_buf.buf,
vec![(1, TestFinalityProof(1)), (4, TestFinalityProof(4))]
);
assert!(stream.stream.is_some());
// when stream has ended, we'll need to restart it
let mut stream =
FinalityProofsStream::<TestFinalitySyncPipeline, TestSourceClient>::from_stream(
Box::pin(futures::stream::empty()),
);
finality_proofs_buf.fill(&mut stream);
assert_eq!(
finality_proofs_buf.buf,
vec![(1, TestFinalityProof(1)), (4, TestFinalityProof(4))]
);
assert!(stream.stream.is_none());
}
#[test]
fn finality_proofs_buf_prune_works() {
let original_finality_proofs_buf: FinalityProofs<TestFinalitySyncPipeline> = vec![
(10, TestFinalityProof(10)),
(13, TestFinalityProof(13)),
(15, TestFinalityProof(15)),
(17, TestFinalityProof(17)),
(19, TestFinalityProof(19)),
]
.into_iter()
.collect();
// when there's proof for justified header in the vec
let mut finality_proofs_buf = FinalityProofsBuf::<TestFinalitySyncPipeline> {
buf: original_finality_proofs_buf.clone(),
};
finality_proofs_buf.prune(10, 1024);
assert_eq!(&original_finality_proofs_buf[1..], finality_proofs_buf.buf,);
// when there are no proof for justified header in the vec
let mut finality_proofs_buf = FinalityProofsBuf::<TestFinalitySyncPipeline> {
buf: original_finality_proofs_buf.clone(),
};
finality_proofs_buf.prune(11, 1024);
assert_eq!(&original_finality_proofs_buf[1..], finality_proofs_buf.buf,);
// when there are too many entries after initial prune && they also need to be pruned
let mut finality_proofs_buf = FinalityProofsBuf::<TestFinalitySyncPipeline> {
buf: original_finality_proofs_buf.clone(),
};
finality_proofs_buf.prune(10, 2);
assert_eq!(&original_finality_proofs_buf[3..], finality_proofs_buf.buf,);
// when last entry is pruned
let mut finality_proofs_buf = FinalityProofsBuf::<TestFinalitySyncPipeline> {
buf: original_finality_proofs_buf.clone(),
};
finality_proofs_buf.prune(19, 2);
assert_eq!(&original_finality_proofs_buf[5..], finality_proofs_buf.buf,);
// when post-last entry is pruned
let mut finality_proofs_buf = FinalityProofsBuf::<TestFinalitySyncPipeline> {
buf: original_finality_proofs_buf.clone(),
};
finality_proofs_buf.prune(20, 2);
assert_eq!(&original_finality_proofs_buf[5..], finality_proofs_buf.buf,);
}
}
+237
View File
@@ -0,0 +1,237 @@
// Copyright 2019-2021 Parity Technologies (UK) Ltd.
// This file is part of Parity Bridges Common.
// Parity Bridges Common is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Bridges Common is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
use crate::{
finality_loop::SyncInfo, finality_proofs::FinalityProofsBuf, Error, FinalitySyncPipeline,
SourceClient, SourceHeader, TargetClient,
};
use std::cmp::Ordering;
/// Unjustified headers container. Ordered by header number.
pub type UnjustifiedHeaders<H> = Vec<H>;
#[derive(Debug)]
#[cfg_attr(test, derive(Clone, PartialEq))]
pub struct JustifiedHeader<P: FinalitySyncPipeline> {
pub header: P::Header,
pub proof: P::FinalityProof,
}
impl<P: FinalitySyncPipeline> JustifiedHeader<P> {
pub fn number(&self) -> P::Number {
self.header.number()
}
}
/// Finality proof that has been selected by the `read_missing_headers` function.
pub enum JustifiedHeaderSelector<P: FinalitySyncPipeline> {
/// Mandatory header and its proof has been selected. We shall submit proof for this header.
Mandatory(JustifiedHeader<P>),
/// Regular header and its proof has been selected. We may submit this proof, or proof for
/// some better header.
Regular(UnjustifiedHeaders<P::Header>, JustifiedHeader<P>),
/// We haven't found any missing header with persistent proof at the target client.
None(UnjustifiedHeaders<P::Header>),
}
impl<P: FinalitySyncPipeline> JustifiedHeaderSelector<P> {
pub(crate) async fn new<SC: SourceClient<P>, TC: TargetClient<P>>(
source_client: &SC,
info: &SyncInfo<P>,
) -> Result<Self, Error<P, SC::Error, TC::Error>> {
let mut unjustified_headers = Vec::new();
let mut maybe_justified_header = None;
let mut header_number = info.best_number_at_target + 1.into();
while header_number <= info.best_number_at_source {
let (header, maybe_proof) = source_client
.header_and_finality_proof(header_number)
.await
.map_err(Error::Source)?;
match (header.is_mandatory(), maybe_proof) {
(true, Some(proof)) => {
log::trace!(target: "bridge", "Header {:?} is mandatory", header_number);
return Ok(Self::Mandatory(JustifiedHeader { header, proof }))
},
(true, None) => return Err(Error::MissingMandatoryFinalityProof(header.number())),
(false, Some(proof)) => {
log::trace!(target: "bridge", "Header {:?} has persistent finality proof", header_number);
unjustified_headers.clear();
maybe_justified_header = Some(JustifiedHeader { header, proof });
},
(false, None) => {
unjustified_headers.push(header);
},
}
header_number = header_number + 1.into();
}
log::trace!(
target: "bridge",
"Read {} {} headers. Selected finality proof for header: {:?}",
info.num_headers(),
P::SOURCE_NAME,
maybe_justified_header.as_ref().map(|justified_header| &justified_header.header),
);
Ok(match maybe_justified_header {
Some(justified_header) => Self::Regular(unjustified_headers, justified_header),
None => Self::None(unjustified_headers),
})
}
pub fn select_mandatory(self) -> Option<JustifiedHeader<P>> {
match self {
JustifiedHeaderSelector::Mandatory(header) => Some(header),
_ => None,
}
}
pub fn select(self, buf: &FinalityProofsBuf<P>) -> Option<JustifiedHeader<P>> {
let (unjustified_headers, maybe_justified_header) = match self {
JustifiedHeaderSelector::Mandatory(justified_header) => return Some(justified_header),
JustifiedHeaderSelector::Regular(unjustified_headers, justified_header) =>
(unjustified_headers, Some(justified_header)),
JustifiedHeaderSelector::None(unjustified_headers) => (unjustified_headers, None),
};
let mut finality_proofs_iter = buf.buf().iter().rev();
let mut maybe_finality_proof = finality_proofs_iter.next();
let mut unjustified_headers_iter = unjustified_headers.iter().rev();
let mut maybe_unjustified_header = unjustified_headers_iter.next();
while let (Some(finality_proof), Some(unjustified_header)) =
(maybe_finality_proof, maybe_unjustified_header)
{
match finality_proof.0.cmp(&unjustified_header.number()) {
Ordering::Equal => {
log::trace!(
target: "bridge",
"Managed to improve selected {} finality proof {:?} to {:?}.",
P::SOURCE_NAME,
maybe_justified_header.as_ref().map(|justified_header| justified_header.number()),
finality_proof.0
);
return Some(JustifiedHeader {
header: unjustified_header.clone(),
proof: finality_proof.1.clone(),
})
},
Ordering::Less => maybe_unjustified_header = unjustified_headers_iter.next(),
Ordering::Greater => {
maybe_finality_proof = finality_proofs_iter.next();
},
}
}
log::trace!(
target: "bridge",
"Could not improve selected {} finality proof {:?}.",
P::SOURCE_NAME,
maybe_justified_header.as_ref().map(|justified_header| justified_header.number())
);
maybe_justified_header
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::mock::*;
#[test]
fn select_better_recent_finality_proof_works() {
// if there are no unjustified headers, nothing is changed
let finality_proofs_buf =
FinalityProofsBuf::<TestFinalitySyncPipeline>::new(vec![(5, TestFinalityProof(5))]);
let justified_header =
JustifiedHeader { header: TestSourceHeader(false, 2, 2), proof: TestFinalityProof(2) };
let selector = JustifiedHeaderSelector::Regular(vec![], justified_header.clone());
assert_eq!(selector.select(&finality_proofs_buf), Some(justified_header));
// if there are no buffered finality proofs, nothing is changed
let finality_proofs_buf = FinalityProofsBuf::<TestFinalitySyncPipeline>::new(vec![]);
let justified_header =
JustifiedHeader { header: TestSourceHeader(false, 2, 2), proof: TestFinalityProof(2) };
let selector = JustifiedHeaderSelector::Regular(
vec![TestSourceHeader(false, 5, 5)],
justified_header.clone(),
);
assert_eq!(selector.select(&finality_proofs_buf), Some(justified_header));
// if there's no intersection between recent finality proofs and unjustified headers,
// nothing is changed
let finality_proofs_buf = FinalityProofsBuf::<TestFinalitySyncPipeline>::new(vec![
(1, TestFinalityProof(1)),
(4, TestFinalityProof(4)),
]);
let justified_header =
JustifiedHeader { header: TestSourceHeader(false, 2, 2), proof: TestFinalityProof(2) };
let selector = JustifiedHeaderSelector::Regular(
vec![TestSourceHeader(false, 9, 9), TestSourceHeader(false, 10, 10)],
justified_header.clone(),
);
assert_eq!(selector.select(&finality_proofs_buf), Some(justified_header));
// if there's intersection between recent finality proofs and unjustified headers, but there
// are no proofs in this intersection, nothing is changed
let finality_proofs_buf = FinalityProofsBuf::<TestFinalitySyncPipeline>::new(vec![
(7, TestFinalityProof(7)),
(11, TestFinalityProof(11)),
]);
let justified_header =
JustifiedHeader { header: TestSourceHeader(false, 2, 2), proof: TestFinalityProof(2) };
let selector = JustifiedHeaderSelector::Regular(
vec![
TestSourceHeader(false, 8, 8),
TestSourceHeader(false, 9, 9),
TestSourceHeader(false, 10, 10),
],
justified_header.clone(),
);
assert_eq!(selector.select(&finality_proofs_buf), Some(justified_header));
// if there's intersection between recent finality proofs and unjustified headers and
// there's a proof in this intersection:
// - this better (last from intersection) proof is selected;
// - 'obsolete' unjustified headers are pruned.
let finality_proofs_buf = FinalityProofsBuf::<TestFinalitySyncPipeline>::new(vec![
(7, TestFinalityProof(7)),
(9, TestFinalityProof(9)),
]);
let justified_header =
JustifiedHeader { header: TestSourceHeader(false, 2, 2), proof: TestFinalityProof(2) };
let selector = JustifiedHeaderSelector::Regular(
vec![
TestSourceHeader(false, 8, 8),
TestSourceHeader(false, 9, 9),
TestSourceHeader(false, 10, 10),
],
justified_header,
);
assert_eq!(
selector.select(&finality_proofs_buf),
Some(JustifiedHeader {
header: TestSourceHeader(false, 9, 9),
proof: TestFinalityProof(9)
})
);
}
}
+39 -1
View File
@@ -26,11 +26,14 @@ pub use crate::{
};
use bp_header_chain::ConsensusLogReader;
use relay_utils::{FailedClient, MaybeConnectionError};
use std::fmt::Debug;
mod base;
mod finality_loop;
mod finality_loop_tests;
mod finality_proofs;
mod headers;
mod mock;
mod sync_loop_metrics;
/// Finality proofs synchronization pipeline.
@@ -50,3 +53,38 @@ pub trait SourceHeader<Hash, Number, Reader>: Clone + Debug + PartialEq + Send +
/// Returns true if this header needs to be submitted to target node.
fn is_mandatory(&self) -> bool;
}
/// Error that may happen inside finality synchronization loop.
#[derive(Debug)]
enum Error<P: FinalitySyncPipeline, SourceError, TargetError> {
/// Source client request has failed with given error.
Source(SourceError),
/// Target client request has failed with given error.
Target(TargetError),
/// Finality proof for mandatory header is missing from the source node.
MissingMandatoryFinalityProof(P::Number),
/// `submit_finality_proof` transaction failed
ProofSubmissionTxFailed {
#[allow(dead_code)]
submitted_number: P::Number,
#[allow(dead_code)]
best_number_at_target: P::Number,
},
/// `submit_finality_proof` transaction lost
ProofSubmissionTxLost,
}
impl<P, SourceError, TargetError> Error<P, SourceError, TargetError>
where
P: FinalitySyncPipeline,
SourceError: MaybeConnectionError,
TargetError: MaybeConnectionError,
{
fn fail_if_connection_error(&self) -> Result<(), FailedClient> {
match *self {
Error::Source(ref error) if error.is_connection_error() => Err(FailedClient::Source),
Error::Target(ref error) if error.is_connection_error() => Err(FailedClient::Target),
_ => Ok(()),
}
}
}
+209
View File
@@ -0,0 +1,209 @@
// Copyright 2019-2021 Parity Technologies (UK) Ltd.
// This file is part of Parity Bridges Common.
// Parity Bridges Common is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Bridges Common is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
//! Tests for finality synchronization loop.
#![cfg(test)]
use crate::{
base::SourceClientBase,
finality_loop::{SourceClient, TargetClient},
FinalityPipeline, FinalitySyncPipeline, SourceHeader,
};
use async_trait::async_trait;
use bp_header_chain::{FinalityProof, GrandpaConsensusLogReader};
use futures::{Stream, StreamExt};
use parking_lot::Mutex;
use relay_utils::{
relay_loop::Client as RelayClient, HeaderId, MaybeConnectionError, TrackedTransactionStatus,
TransactionTracker,
};
use std::{collections::HashMap, pin::Pin, sync::Arc};
type IsMandatory = bool;
pub type TestNumber = u64;
type TestHash = u64;
#[derive(Clone, Debug)]
pub struct TestTransactionTracker(pub TrackedTransactionStatus<HeaderId<TestHash, TestNumber>>);
impl Default for TestTransactionTracker {
fn default() -> TestTransactionTracker {
TestTransactionTracker(TrackedTransactionStatus::Finalized(Default::default()))
}
}
#[async_trait]
impl TransactionTracker for TestTransactionTracker {
type HeaderId = HeaderId<TestHash, TestNumber>;
async fn wait(self) -> TrackedTransactionStatus<HeaderId<TestHash, TestNumber>> {
self.0
}
}
#[derive(Debug, Clone)]
pub enum TestError {
NonConnection,
}
impl MaybeConnectionError for TestError {
fn is_connection_error(&self) -> bool {
false
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct TestFinalitySyncPipeline;
impl FinalityPipeline for TestFinalitySyncPipeline {
const SOURCE_NAME: &'static str = "TestSource";
const TARGET_NAME: &'static str = "TestTarget";
type Hash = TestHash;
type Number = TestNumber;
type FinalityProof = TestFinalityProof;
}
impl FinalitySyncPipeline for TestFinalitySyncPipeline {
type ConsensusLogReader = GrandpaConsensusLogReader<TestNumber>;
type Header = TestSourceHeader;
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct TestSourceHeader(pub IsMandatory, pub TestNumber, pub TestHash);
impl SourceHeader<TestHash, TestNumber, GrandpaConsensusLogReader<TestNumber>>
for TestSourceHeader
{
fn hash(&self) -> TestHash {
self.2
}
fn number(&self) -> TestNumber {
self.1
}
fn is_mandatory(&self) -> bool {
self.0
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct TestFinalityProof(pub TestNumber);
impl FinalityProof<TestNumber> for TestFinalityProof {
fn target_header_number(&self) -> TestNumber {
self.0
}
}
#[derive(Debug, Clone, Default)]
pub struct ClientsData {
pub source_best_block_number: TestNumber,
pub source_headers: HashMap<TestNumber, (TestSourceHeader, Option<TestFinalityProof>)>,
pub source_proofs: Vec<TestFinalityProof>,
pub target_best_block_id: HeaderId<TestHash, TestNumber>,
pub target_headers: Vec<(TestSourceHeader, TestFinalityProof)>,
pub target_transaction_tracker: TestTransactionTracker,
}
#[derive(Clone)]
pub struct TestSourceClient {
pub on_method_call: Arc<dyn Fn(&mut ClientsData) + Send + Sync>,
pub data: Arc<Mutex<ClientsData>>,
}
#[async_trait]
impl RelayClient for TestSourceClient {
type Error = TestError;
async fn reconnect(&mut self) -> Result<(), TestError> {
unreachable!()
}
}
#[async_trait]
impl SourceClientBase<TestFinalitySyncPipeline> for TestSourceClient {
type FinalityProofsStream = Pin<Box<dyn Stream<Item = TestFinalityProof> + 'static + Send>>;
async fn finality_proofs(&self) -> Result<Self::FinalityProofsStream, TestError> {
let mut data = self.data.lock();
(self.on_method_call)(&mut data);
Ok(futures::stream::iter(data.source_proofs.clone()).boxed())
}
}
#[async_trait]
impl SourceClient<TestFinalitySyncPipeline> for TestSourceClient {
async fn best_finalized_block_number(&self) -> Result<TestNumber, TestError> {
let mut data = self.data.lock();
(self.on_method_call)(&mut data);
Ok(data.source_best_block_number)
}
async fn header_and_finality_proof(
&self,
number: TestNumber,
) -> Result<(TestSourceHeader, Option<TestFinalityProof>), TestError> {
let mut data = self.data.lock();
(self.on_method_call)(&mut data);
data.source_headers.get(&number).cloned().ok_or(TestError::NonConnection)
}
}
#[derive(Clone)]
pub struct TestTargetClient {
pub on_method_call: Arc<dyn Fn(&mut ClientsData) + Send + Sync>,
pub data: Arc<Mutex<ClientsData>>,
}
#[async_trait]
impl RelayClient for TestTargetClient {
type Error = TestError;
async fn reconnect(&mut self) -> Result<(), TestError> {
unreachable!()
}
}
#[async_trait]
impl TargetClient<TestFinalitySyncPipeline> for TestTargetClient {
type TransactionTracker = TestTransactionTracker;
async fn best_finalized_source_block_id(
&self,
) -> Result<HeaderId<TestHash, TestNumber>, TestError> {
let mut data = self.data.lock();
(self.on_method_call)(&mut data);
Ok(data.target_best_block_id)
}
async fn submit_finality_proof(
&self,
header: TestSourceHeader,
proof: TestFinalityProof,
) -> Result<TestTransactionTracker, TestError> {
let mut data = self.data.lock();
(self.on_method_call)(&mut data);
data.target_best_block_id = HeaderId(header.number(), header.hash());
data.target_headers.push((header, proof));
(self.on_method_call)(&mut data);
Ok(data.target_transaction_tracker.clone())
}
}
@@ -125,7 +125,7 @@ impl<P: SubstrateFinalitySyncPipeline> SubstrateFinalitySource<P> {
Error,
> {
let client = self.client.clone();
let best_finalized_block_number = self.client.best_finalized_header_number().await?;
let best_finalized_block_number = client.best_finalized_header_number().await?;
Ok(try_unfold((client, block_number), move |(client, current_block_number)| async move {
// if we've passed the `best_finalized_block_number`, we no longer need persistent
// justifications
+6 -6
View File
@@ -130,19 +130,19 @@ impl<SC, TC, LM> Loop<SC, TC, LM> {
match result {
Ok(()) => break,
Err(failed_client) =>
Err(failed_client) => {
log::debug!(target: "bridge", "Restarting relay loop");
reconnect_failed_client(
failed_client,
self.reconnect_delay,
&mut self.source_client,
&mut self.target_client,
)
.await,
.await
},
}
log::debug!(target: "bridge", "Restarting relay loop");
}
Ok(())
};
@@ -194,7 +194,7 @@ impl<SC, TC, LM> LoopMetrics<SC, TC, LM> {
Err(err) => {
log::trace!(
target: "bridge-metrics",
"Failed to create tokio runtime. Prometheus meterics are not available: {:?}",
"Failed to create tokio runtime. Prometheus metrics are not available: {:?}",
err,
);
return