More tests for finality relay (#816)

* more tests for finality relay

* clippy

* remove env_logger dep

* fmt

* more clippy

* removed prune_unjustified_headers

* review
This commit is contained in:
Svyatoslav Nikolsky
2021-03-11 16:33:01 +03:00
committed by Bastian Köcher
parent d8852fd197
commit 53cdf66071
7 changed files with 342 additions and 227 deletions
+173 -136
View File
@@ -136,16 +136,21 @@ pub fn run<P: FinalitySyncPipeline>(
} }
/// Unjustified headers container. Ordered by header number. /// Unjustified headers container. Ordered by header number.
pub(crate) type UnjustifiedHeaders<P> = Vec<<P as FinalitySyncPipeline>::Header>; pub(crate) type UnjustifiedHeaders<H> = Vec<H>;
/// Finality proofs container. Ordered by target header number. /// Finality proofs container. Ordered by target header number.
pub(crate) type FinalityProofs<P> = Vec<( pub(crate) type FinalityProofs<P> = Vec<(
<P as FinalitySyncPipeline>::Number, <P as FinalitySyncPipeline>::Number,
<P as FinalitySyncPipeline>::FinalityProof, <P as FinalitySyncPipeline>::FinalityProof,
)>; )>;
/// Reference to finality proofs container.
pub(crate) type FinalityProofsRef<'a, P> = &'a [(
<P as FinalitySyncPipeline>::Number,
<P as FinalitySyncPipeline>::FinalityProof,
)];
/// Error that may happen inside finality synchronization loop. /// Error that may happen inside finality synchronization loop.
#[derive(Debug)] #[derive(Debug)]
enum Error<P: FinalitySyncPipeline, SourceError, TargetError> { pub(crate) enum Error<P: FinalitySyncPipeline, SourceError, TargetError> {
/// Source client request has failed with given error. /// Source client request has failed with given error.
Source(SourceError), Source(SourceError),
/// Target client request has failed with given error. /// Target client request has failed with given error.
@@ -182,13 +187,23 @@ struct Transaction<Number> {
} }
/// Finality proofs stream that may be restarted. /// Finality proofs stream that may be restarted.
struct RestartableFinalityProofsStream<S> { pub(crate) struct RestartableFinalityProofsStream<S> {
/// Flag that the stream needs to be restarted. /// Flag that the stream needs to be restarted.
needs_restart: bool, pub(crate) needs_restart: bool,
/// The stream itself. /// The stream itself.
stream: Pin<Box<S>>, stream: Pin<Box<S>>,
} }
#[cfg(test)]
impl<S> From<S> for RestartableFinalityProofsStream<S> {
fn from(stream: S) -> Self {
RestartableFinalityProofsStream {
needs_restart: false,
stream: Box::pin(stream),
}
}
}
/// Finality synchronization loop state. /// Finality synchronization loop state.
struct FinalityLoopState<'a, P: FinalitySyncPipeline, FinalityProofsStream> { struct FinalityLoopState<'a, P: FinalitySyncPipeline, FinalityProofsStream> {
/// Synchronization loop progress. /// Synchronization loop progress.
@@ -272,6 +287,8 @@ async fn run_until_connection_lost<P: FinalitySyncPipeline>(
} }
}; };
if finality_proofs_stream.needs_restart { if finality_proofs_stream.needs_restart {
log::warn!(target: "bridge", "{} finality proofs stream is being restarted", P::SOURCE_NAME);
finality_proofs_stream.needs_restart = false; finality_proofs_stream.needs_restart = false;
finality_proofs_stream.stream = Box::pin(restart_finality_proofs_stream().await?); finality_proofs_stream.stream = Box::pin(restart_finality_proofs_stream().await?);
} }
@@ -368,7 +385,7 @@ where
async fn select_header_to_submit<P, SC, TC>( async fn select_header_to_submit<P, SC, TC>(
source_client: &SC, source_client: &SC,
_target_client: &TC, target_client: &TC,
finality_proofs_stream: &mut RestartableFinalityProofsStream<SC::FinalityProofsStream>, finality_proofs_stream: &mut RestartableFinalityProofsStream<SC::FinalityProofsStream>,
recent_finality_proofs: &mut FinalityProofs<P>, recent_finality_proofs: &mut FinalityProofs<P>,
best_number_at_source: P::Number, best_number_at_source: P::Number,
@@ -380,9 +397,6 @@ where
SC: SourceClient<P>, SC: SourceClient<P>,
TC: TargetClient<P>, TC: TargetClient<P>,
{ {
let mut selected_finality_proof = None;
let mut unjustified_headers = Vec::new();
// to see that the loop is progressing // to see that the loop is progressing
log::trace!( log::trace!(
target: "bridge", target: "bridge",
@@ -393,92 +407,30 @@ where
// read missing headers. if we see that the header schedules GRANDPA change, we need to // read missing headers. if we see that the header schedules GRANDPA change, we need to
// submit this header // submit this header
let mut header_number = best_number_at_target + One::one(); let selected_finality_proof = read_missing_headers::<P, SC, TC>(
while header_number <= best_number_at_source { source_client,
let (header, finality_proof) = source_client target_client,
.header_and_finality_proof(header_number) best_number_at_source,
.await best_number_at_target,
.map_err(Error::Source)?; )
let is_mandatory = header.is_mandatory(); .await?;
let (mut unjustified_headers, mut selected_finality_proof) = match selected_finality_proof {
match (is_mandatory, finality_proof) { SelectedFinalityProof::Mandatory(header, finality_proof) => return Ok(Some((header, finality_proof))),
(true, Some(finality_proof)) => { SelectedFinalityProof::Regular(unjustified_headers, header, finality_proof) => {
log::trace!(target: "bridge", "Header {:?} is mandatory", header_number); (unjustified_headers, Some((header, finality_proof)))
return Ok(Some((header, finality_proof)));
}
(true, None) => return Err(Error::MissingMandatoryFinalityProof(header.number())),
(false, Some(finality_proof)) => {
log::trace!(target: "bridge", "Header {:?} has persistent finality proof", header_number);
selected_finality_proof = Some((header, finality_proof));
prune_unjustified_headers::<P>(header_number, &mut unjustified_headers);
}
(false, None) => {
unjustified_headers.push(header);
}
} }
SelectedFinalityProof::None(unjustified_headers) => (unjustified_headers, None),
};
header_number = header_number + One::one(); // all headers that are missing from the target client are non-mandatory
} // => even if we have already selected some header and its persistent finality proof,
// we may try to select better header by reading non-persistent proofs from the stream
// see if we can improve finality by using recent finality proofs read_finality_proofs_from_stream::<P, _>(finality_proofs_stream, recent_finality_proofs);
if !unjustified_headers.is_empty() && !recent_finality_proofs.is_empty() { selected_finality_proof = select_better_recent_finality_proof::<P>(
const NOT_EMPTY_PROOF: &str = "we have checked that the vec is not empty; qed"; recent_finality_proofs,
&mut unjustified_headers,
// we need proofs for headers in range unjustified_range_begin..=unjustified_range_end selected_finality_proof,
let unjustified_range_begin = unjustified_headers.first().expect(NOT_EMPTY_PROOF).number(); );
let unjustified_range_end = unjustified_headers.last().expect(NOT_EMPTY_PROOF).number();
// we have proofs for headers in range buffered_range_begin..=buffered_range_end
let buffered_range_begin = recent_finality_proofs.first().expect(NOT_EMPTY_PROOF).0;
let buffered_range_end = recent_finality_proofs.last().expect(NOT_EMPTY_PROOF).0;
// we have two ranges => find intersection
let intersection_begin = std::cmp::max(unjustified_range_begin, buffered_range_begin);
let intersection_end = std::cmp::min(unjustified_range_end, buffered_range_end);
let intersection = intersection_begin..=intersection_end;
// find last proof from intersection
let selected_finality_proof_index = recent_finality_proofs
.binary_search_by_key(intersection.end(), |(number, _)| *number)
.unwrap_or_else(|index| index.saturating_sub(1));
let (selected_header_number, finality_proof) = &recent_finality_proofs[selected_finality_proof_index];
if intersection.contains(selected_header_number) {
// now remove all obsolete headers and extract selected header
let selected_header = prune_unjustified_headers::<P>(*selected_header_number, &mut unjustified_headers)
.expect("unjustified_headers contain all headers from intersection; qed");
selected_finality_proof = Some((selected_header, finality_proof.clone()));
}
}
// read all proofs from the stream, probably selecting updated proof that we're going to submit
loop {
let next_proof = finality_proofs_stream.stream.next();
let finality_proof = match next_proof.now_or_never() {
Some(Some(finality_proof)) => finality_proof,
Some(None) => {
finality_proofs_stream.needs_restart = true;
break;
}
None => break,
};
let finality_proof_target_header_number = match finality_proof.target_header_number() {
Some(target_header_number) => target_header_number,
None => {
continue;
}
};
let justified_header =
prune_unjustified_headers::<P>(finality_proof_target_header_number, &mut unjustified_headers);
if let Some(justified_header) = justified_header {
recent_finality_proofs.clear();
selected_finality_proof = Some((justified_header, finality_proof));
} else {
// the number of proofs read during single wakeup is expected to be low, so we aren't pruning
// `recent_finality_proofs` collection too often
recent_finality_proofs.push((finality_proof_target_header_number, finality_proof));
}
}
// remove obsolete 'recent' finality proofs + keep its size under certain limit // remove obsolete 'recent' finality proofs + keep its size under certain limit
let oldest_finality_proof_to_keep = selected_finality_proof let oldest_finality_proof_to_keep = selected_finality_proof
@@ -494,62 +446,147 @@ where
Ok(selected_finality_proof) Ok(selected_finality_proof)
} }
/// Remove headers from `unjustified_headers` collection with number lower or equal than `justified_header_number`. /// Finality proof that has been selected by the `read_missing_headers` function.
pub(crate) enum SelectedFinalityProof<Header, FinalityProof> {
/// Mandatory header and its proof has been selected. We shall submit proof for this header.
Mandatory(Header, FinalityProof),
/// Regular header and its proof has been selected. We may submit this proof, or proof for
/// some better header.
Regular(UnjustifiedHeaders<Header>, Header, FinalityProof),
/// We haven't found any missing header with persistent proof at the target client.
None(UnjustifiedHeaders<Header>),
}
/// Read missing headers and their persistent finality proofs from the target client.
/// ///
/// Returns the header that matches `justified_header_number` (if any). /// If we have found some header with known proof, it is returned.
pub(crate) fn prune_unjustified_headers<P: FinalitySyncPipeline>( /// Otherwise, `SelectedFinalityProof::None` is returned.
justified_header_number: P::Number, ///
unjustified_headers: &mut UnjustifiedHeaders<P>, /// Unless we have found mandatory header, all missing headers are collected and returned.
) -> Option<P::Header> { pub(crate) async fn read_missing_headers<P: FinalitySyncPipeline, SC: SourceClient<P>, TC: TargetClient<P>>(
prune_ordered_vec(justified_header_number, unjustified_headers, usize::MAX, |header| { source_client: &SC,
header.number() _target_client: &TC,
best_number_at_source: P::Number,
best_number_at_target: P::Number,
) -> Result<SelectedFinalityProof<P::Header, P::FinalityProof>, Error<P, SC::Error, TC::Error>> {
let mut unjustified_headers = Vec::new();
let mut selected_finality_proof = None;
let mut header_number = best_number_at_target + One::one();
while header_number <= best_number_at_source {
let (header, finality_proof) = source_client
.header_and_finality_proof(header_number)
.await
.map_err(Error::Source)?;
let is_mandatory = header.is_mandatory();
match (is_mandatory, finality_proof) {
(true, Some(finality_proof)) => {
log::trace!(target: "bridge", "Header {:?} is mandatory", header_number);
return Ok(SelectedFinalityProof::Mandatory(header, finality_proof));
}
(true, None) => return Err(Error::MissingMandatoryFinalityProof(header.number())),
(false, Some(finality_proof)) => {
log::trace!(target: "bridge", "Header {:?} has persistent finality proof", header_number);
unjustified_headers.clear();
selected_finality_proof = Some((header, finality_proof));
}
(false, None) => {
unjustified_headers.push(header);
}
}
header_number = header_number + One::one();
}
Ok(match selected_finality_proof {
Some((header, proof)) => SelectedFinalityProof::Regular(unjustified_headers, header, proof),
None => SelectedFinalityProof::None(unjustified_headers),
}) })
} }
/// Read finality proofs from the stream.
pub(crate) fn read_finality_proofs_from_stream<P: FinalitySyncPipeline, FPS: Stream<Item = P::FinalityProof>>(
finality_proofs_stream: &mut RestartableFinalityProofsStream<FPS>,
recent_finality_proofs: &mut FinalityProofs<P>,
) {
loop {
let next_proof = finality_proofs_stream.stream.next();
let finality_proof = match next_proof.now_or_never() {
Some(Some(finality_proof)) => finality_proof,
Some(None) => {
finality_proofs_stream.needs_restart = true;
break;
}
None => break,
};
recent_finality_proofs.push((finality_proof.target_header_number(), finality_proof));
}
}
/// Try to select better header and its proof, given finality proofs that we
/// have recently read from the stream.
pub(crate) fn select_better_recent_finality_proof<P: FinalitySyncPipeline>(
recent_finality_proofs: FinalityProofsRef<P>,
unjustified_headers: &mut UnjustifiedHeaders<P::Header>,
selected_finality_proof: Option<(P::Header, P::FinalityProof)>,
) -> Option<(P::Header, P::FinalityProof)> {
if unjustified_headers.is_empty() || recent_finality_proofs.is_empty() {
return selected_finality_proof;
}
const NOT_EMPTY_PROOF: &str = "we have checked that the vec is not empty; qed";
// we need proofs for headers in range unjustified_range_begin..=unjustified_range_end
let unjustified_range_begin = unjustified_headers.first().expect(NOT_EMPTY_PROOF).number();
let unjustified_range_end = unjustified_headers.last().expect(NOT_EMPTY_PROOF).number();
// we have proofs for headers in range buffered_range_begin..=buffered_range_end
let buffered_range_begin = recent_finality_proofs.first().expect(NOT_EMPTY_PROOF).0;
let buffered_range_end = recent_finality_proofs.last().expect(NOT_EMPTY_PROOF).0;
// we have two ranges => find intersection
let intersection_begin = std::cmp::max(unjustified_range_begin, buffered_range_begin);
let intersection_end = std::cmp::min(unjustified_range_end, buffered_range_end);
let intersection = intersection_begin..=intersection_end;
// find last proof from intersection
let selected_finality_proof_index = recent_finality_proofs
.binary_search_by_key(intersection.end(), |(number, _)| *number)
.unwrap_or_else(|index| index.saturating_sub(1));
let (selected_header_number, finality_proof) = &recent_finality_proofs[selected_finality_proof_index];
if !intersection.contains(selected_header_number) {
return selected_finality_proof;
}
// now remove all obsolete headers and extract selected header
let selected_header_position = unjustified_headers
.binary_search_by_key(selected_header_number, |header| header.number())
.expect("unjustified_headers contain all headers from intersection; qed");
let selected_header = unjustified_headers.swap_remove(selected_header_position);
Some((selected_header, finality_proof.clone()))
}
pub(crate) fn prune_recent_finality_proofs<P: FinalitySyncPipeline>( pub(crate) fn prune_recent_finality_proofs<P: FinalitySyncPipeline>(
justified_header_number: P::Number, justified_header_number: P::Number,
recent_finality_proofs: &mut FinalityProofs<P>, recent_finality_proofs: &mut FinalityProofs<P>,
recent_finality_proofs_limit: usize, recent_finality_proofs_limit: usize,
) { ) {
prune_ordered_vec( let position =
justified_header_number, recent_finality_proofs.binary_search_by_key(&justified_header_number, |(header_number, _)| *header_number);
recent_finality_proofs,
recent_finality_proofs_limit, // remove all obsolete elements
|(header_number, _)| *header_number, *recent_finality_proofs = recent_finality_proofs.split_off(
position
.map(|position| position + 1)
.unwrap_or_else(|position| position),
); );
}
fn prune_ordered_vec<T, Number: relay_utils::BlockNumberBase>(
header_number: Number,
ordered_vec: &mut Vec<T>,
maximal_vec_size: usize,
extract_header_number: impl Fn(&T) -> Number,
) -> Option<T> {
let position = ordered_vec.binary_search_by_key(&header_number, extract_header_number);
// first extract element we're interested in
let extracted_element = match position {
Ok(position) => {
let updated_vec = ordered_vec.split_off(position + 1);
let extracted_element = ordered_vec.pop().expect(
"binary_search_by_key has returned Ok(); so there's element at `position`;\
we're splitting vec at `position+1`; so we have pruned at least 1 element;\
qed",
);
*ordered_vec = updated_vec;
Some(extracted_element)
}
Err(position) => {
*ordered_vec = ordered_vec.split_off(position);
None
}
};
// now - limit vec by size // now - limit vec by size
let split_index = ordered_vec.len().saturating_sub(maximal_vec_size); let split_index = recent_finality_proofs
*ordered_vec = ordered_vec.split_off(split_index); .len()
.saturating_sub(recent_finality_proofs_limit);
extracted_element *recent_finality_proofs = recent_finality_proofs.split_off(split_index);
} }
fn print_sync_progress<P: FinalitySyncPipeline>( fn print_sync_progress<P: FinalitySyncPipeline>(
@@ -19,8 +19,8 @@
#![cfg(test)] #![cfg(test)]
use crate::finality_loop::{ use crate::finality_loop::{
prune_recent_finality_proofs, prune_unjustified_headers, run, FinalityProofs, FinalitySyncParams, SourceClient, prune_recent_finality_proofs, read_finality_proofs_from_stream, run, select_better_recent_finality_proof,
TargetClient, UnjustifiedHeaders, FinalityProofs, FinalitySyncParams, SourceClient, TargetClient,
}; };
use crate::{FinalityProof, FinalitySyncPipeline, SourceHeader}; use crate::{FinalityProof, FinalitySyncPipeline, SourceHeader};
@@ -71,10 +71,10 @@ impl SourceHeader<TestNumber> for TestSourceHeader {
} }
#[derive(Debug, Clone, PartialEq)] #[derive(Debug, Clone, PartialEq)]
struct TestFinalityProof(Option<TestNumber>); struct TestFinalityProof(TestNumber);
impl FinalityProof<TestNumber> for TestFinalityProof { impl FinalityProof<TestNumber> for TestFinalityProof {
fn target_header_number(&self) -> Option<TestNumber> { fn target_header_number(&self) -> TestNumber {
self.0 self.0
} }
} }
@@ -176,14 +176,14 @@ fn run_sync_loop(state_function: impl Fn(&mut ClientsData) -> bool + Send + Sync
source_best_block_number: 10, source_best_block_number: 10,
source_headers: vec![ source_headers: vec![
(6, (TestSourceHeader(false, 6), None)), (6, (TestSourceHeader(false, 6), None)),
(7, (TestSourceHeader(false, 7), Some(TestFinalityProof(Some(7))))), (7, (TestSourceHeader(false, 7), Some(TestFinalityProof(7)))),
(8, (TestSourceHeader(true, 8), Some(TestFinalityProof(Some(8))))), (8, (TestSourceHeader(true, 8), Some(TestFinalityProof(8)))),
(9, (TestSourceHeader(false, 9), Some(TestFinalityProof(Some(9))))), (9, (TestSourceHeader(false, 9), Some(TestFinalityProof(9)))),
(10, (TestSourceHeader(false, 10), None)), (10, (TestSourceHeader(false, 10), None)),
] ]
.into_iter() .into_iter()
.collect(), .collect(),
source_proofs: vec![TestFinalityProof(Some(12)), TestFinalityProof(Some(14))], source_proofs: vec![TestFinalityProof(12), TestFinalityProof(14)],
target_best_block_number: 5, target_best_block_number: 5,
target_headers: vec![], target_headers: vec![],
@@ -222,22 +222,22 @@ fn finality_sync_loop_works() {
// header#9 has persistent finality proof, but it isn't mandatory => it is submitted, because // header#9 has persistent finality proof, but it isn't mandatory => it is submitted, because
// there are no more persistent finality proofs // there are no more persistent finality proofs
// //
// once this ^^^ is done, we generate more blocks && read proof for blocks 12, 14 and 16 from the stream // once this ^^^ is done, we generate more blocks && read proof for blocks 12 and 14 from the stream
// but we only submit proof for 16
//
// proof for block 15 is ignored - we haven't managed to decode it
if data.target_best_block_number == 9 { if data.target_best_block_number == 9 {
data.source_best_block_number = 17; data.source_best_block_number = 14;
data.source_headers.insert(11, (TestSourceHeader(false, 11), None)); data.source_headers.insert(11, (TestSourceHeader(false, 11), None));
data.source_headers data.source_headers
.insert(12, (TestSourceHeader(false, 12), Some(TestFinalityProof(Some(12))))); .insert(12, (TestSourceHeader(false, 12), Some(TestFinalityProof(12))));
data.source_headers.insert(13, (TestSourceHeader(false, 13), None)); data.source_headers.insert(13, (TestSourceHeader(false, 13), None));
data.source_headers data.source_headers
.insert(14, (TestSourceHeader(false, 14), Some(TestFinalityProof(Some(14))))); .insert(14, (TestSourceHeader(false, 14), Some(TestFinalityProof(14))));
}
// once this ^^^ is done, we generate more blocks && read persistent proof for block 16
if data.target_best_block_number == 14 {
data.source_best_block_number = 17;
data.source_headers.insert(15, (TestSourceHeader(false, 15), None));
data.source_headers data.source_headers
.insert(15, (TestSourceHeader(false, 15), Some(TestFinalityProof(None)))); .insert(16, (TestSourceHeader(false, 16), Some(TestFinalityProof(16))));
data.source_headers
.insert(16, (TestSourceHeader(false, 16), Some(TestFinalityProof(Some(16)))));
data.source_headers.insert(17, (TestSourceHeader(false, 17), None)); data.source_headers.insert(17, (TestSourceHeader(false, 17), None));
} }
@@ -247,67 +247,132 @@ fn finality_sync_loop_works() {
assert_eq!( assert_eq!(
client_data.target_headers, client_data.target_headers,
vec![ vec![
(TestSourceHeader(true, 8), TestFinalityProof(Some(8))), // before adding 11..14: finality proof for mandatory header#8
(TestSourceHeader(false, 9), TestFinalityProof(Some(9))), (TestSourceHeader(true, 8), TestFinalityProof(8)),
(TestSourceHeader(false, 16), TestFinalityProof(Some(16))), // before adding 11..14: persistent finality proof for non-mandatory header#9
(TestSourceHeader(false, 9), TestFinalityProof(9)),
// after adding 11..14: ephemeral finality proof for non-mandatory header#14
(TestSourceHeader(false, 14), TestFinalityProof(14)),
// after adding 15..17: persistent finality proof for non-mandatory header#16
(TestSourceHeader(false, 16), TestFinalityProof(16)),
], ],
); );
} }
#[test] #[test]
fn prune_unjustified_headers_works() { fn select_better_recent_finality_proof_works() {
let original_unjustified_headers: UnjustifiedHeaders<TestFinalitySyncPipeline> = vec![ // if there are no unjustified headers, nothing is changed
assert_eq!(
select_better_recent_finality_proof::<TestFinalitySyncPipeline>(
&[(5, TestFinalityProof(5))],
&mut vec![],
Some((TestSourceHeader(false, 2), TestFinalityProof(2))),
),
Some((TestSourceHeader(false, 2), TestFinalityProof(2))),
);
// if there are no recent finality proofs, nothing is changed
assert_eq!(
select_better_recent_finality_proof::<TestFinalitySyncPipeline>(
&[],
&mut vec![TestSourceHeader(false, 5)],
Some((TestSourceHeader(false, 2), TestFinalityProof(2))),
),
Some((TestSourceHeader(false, 2), TestFinalityProof(2))),
);
// if there's no intersection between recent finality proofs and unjustified headers, nothing is changed
let mut unjustified_headers = vec![TestSourceHeader(false, 9), TestSourceHeader(false, 10)];
assert_eq!(
select_better_recent_finality_proof::<TestFinalitySyncPipeline>(
&[(1, TestFinalityProof(1)), (4, TestFinalityProof(4))],
&mut unjustified_headers,
Some((TestSourceHeader(false, 2), TestFinalityProof(2))),
),
Some((TestSourceHeader(false, 2), TestFinalityProof(2))),
);
// if there's intersection between recent finality proofs and unjustified headers, but there are no
// proofs in this intersection, nothing is changed
let mut unjustified_headers = vec![
TestSourceHeader(false, 8),
TestSourceHeader(false, 9),
TestSourceHeader(false, 10), TestSourceHeader(false, 10),
TestSourceHeader(false, 13), ];
TestSourceHeader(false, 15),
TestSourceHeader(false, 17),
TestSourceHeader(false, 19),
]
.into_iter()
.collect();
// when header is in the collection
let mut unjustified_headers = original_unjustified_headers.clone();
assert_eq!( assert_eq!(
prune_unjustified_headers::<TestFinalitySyncPipeline>(10, &mut unjustified_headers), select_better_recent_finality_proof::<TestFinalitySyncPipeline>(
Some(TestSourceHeader(false, 10)), &[(7, TestFinalityProof(7)), (11, TestFinalityProof(11))],
&mut unjustified_headers,
Some((TestSourceHeader(false, 2), TestFinalityProof(2))),
),
Some((TestSourceHeader(false, 2), TestFinalityProof(2))),
); );
assert_eq!(&original_unjustified_headers[1..], unjustified_headers,);
// when the header doesn't exist in the collection
let mut unjustified_headers = original_unjustified_headers.clone();
assert_eq!( assert_eq!(
prune_unjustified_headers::<TestFinalitySyncPipeline>(11, &mut unjustified_headers), unjustified_headers,
None, vec![
); TestSourceHeader(false, 8),
assert_eq!(&original_unjustified_headers[1..], unjustified_headers,); TestSourceHeader(false, 9),
TestSourceHeader(false, 10)
// when last entry is pruned ]
let mut unjustified_headers = original_unjustified_headers.clone();
assert_eq!(
prune_unjustified_headers::<TestFinalitySyncPipeline>(19, &mut unjustified_headers),
Some(TestSourceHeader(false, 19)),
); );
assert_eq!(&original_unjustified_headers[5..], unjustified_headers,); // if there's intersection between recent finality proofs and unjustified headers and there's
// a proof in this intersection:
// when we try and prune past last entry // - this better (last from intersection) proof is selected;
let mut unjustified_headers = original_unjustified_headers.clone(); // - 'obsolete' unjustified headers are pruned.
let mut unjustified_headers = vec![
TestSourceHeader(false, 8),
TestSourceHeader(false, 9),
TestSourceHeader(false, 10),
];
assert_eq!( assert_eq!(
prune_unjustified_headers::<TestFinalitySyncPipeline>(20, &mut unjustified_headers), select_better_recent_finality_proof::<TestFinalitySyncPipeline>(
None, &[(7, TestFinalityProof(7)), (9, TestFinalityProof(9))],
&mut unjustified_headers,
Some((TestSourceHeader(false, 2), TestFinalityProof(2))),
),
Some((TestSourceHeader(false, 9), TestFinalityProof(9))),
); );
assert_eq!(&original_unjustified_headers[5..], unjustified_headers,); }
#[test]
fn read_finality_proofs_from_stream_works() {
// when stream is currently empty, nothing is changed
let mut recent_finality_proofs = vec![(1, TestFinalityProof(1))];
let mut stream = futures::stream::pending().into();
read_finality_proofs_from_stream::<TestFinalitySyncPipeline, _>(&mut stream, &mut recent_finality_proofs);
assert_eq!(recent_finality_proofs, vec![(1, TestFinalityProof(1))]);
assert_eq!(stream.needs_restart, false);
// when stream has entry with target, it is added to the recent proofs container
let mut stream = futures::stream::iter(vec![TestFinalityProof(4)])
.chain(futures::stream::pending())
.into();
read_finality_proofs_from_stream::<TestFinalitySyncPipeline, _>(&mut stream, &mut recent_finality_proofs);
assert_eq!(
recent_finality_proofs,
vec![(1, TestFinalityProof(1)), (4, TestFinalityProof(4))]
);
assert_eq!(stream.needs_restart, false);
// when stream has ended, we'll need to restart it
let mut stream = futures::stream::empty().into();
read_finality_proofs_from_stream::<TestFinalitySyncPipeline, _>(&mut stream, &mut recent_finality_proofs);
assert_eq!(
recent_finality_proofs,
vec![(1, TestFinalityProof(1)), (4, TestFinalityProof(4))]
);
assert_eq!(stream.needs_restart, true);
} }
#[test] #[test]
fn prune_recent_finality_proofs_works() { fn prune_recent_finality_proofs_works() {
let original_recent_finality_proofs: FinalityProofs<TestFinalitySyncPipeline> = vec![ let original_recent_finality_proofs: FinalityProofs<TestFinalitySyncPipeline> = vec![
(10, TestFinalityProof(Some(10))), (10, TestFinalityProof(10)),
(13, TestFinalityProof(Some(13))), (13, TestFinalityProof(13)),
(15, TestFinalityProof(Some(15))), (15, TestFinalityProof(15)),
(17, TestFinalityProof(Some(17))), (17, TestFinalityProof(17)),
(19, TestFinalityProof(Some(19))), (19, TestFinalityProof(19)),
] ]
.into_iter() .into_iter()
.collect(); .collect();
+2 -4
View File
@@ -53,8 +53,6 @@ pub trait SourceHeader<Number>: Clone + Debug + PartialEq + Send + Sync {
/// Abstract finality proof that is justifying block finality. /// Abstract finality proof that is justifying block finality.
pub trait FinalityProof<Number>: Clone + Send + Sync + Debug { pub trait FinalityProof<Number>: Clone + Send + Sync + Debug {
/// Return header id that this proof is generated for. /// Return number of header that this proof is generated for.
/// fn target_header_number(&self) -> Number;
/// None is returned if proof is invalid from relayer PoV.
fn target_header_number(&self) -> Option<Number>;
} }
@@ -22,6 +22,7 @@ use crate::error::Error;
use crate::sync_header::SyncHeader; use crate::sync_header::SyncHeader;
use async_trait::async_trait; use async_trait::async_trait;
use bp_header_chain::justification::decode_justification_target;
use finality_relay::{FinalityProof, FinalitySyncPipeline, SourceClient, SourceHeader}; use finality_relay::{FinalityProof, FinalitySyncPipeline, SourceClient, SourceHeader};
use futures::stream::{unfold, Stream, StreamExt}; use futures::stream::{unfold, Stream, StreamExt};
use relay_utils::relay_loop::Client as RelayClient; use relay_utils::relay_loop::Client as RelayClient;
@@ -30,26 +31,23 @@ use std::{marker::PhantomData, pin::Pin};
/// Wrapped raw Justification. /// Wrapped raw Justification.
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct Justification<Header> { pub struct Justification<Number> {
/// Header number decoded from the [`raw_justification`].
target_header_number: Number,
/// Raw, encoded justification bytes.
raw_justification: sp_runtime::Justification, raw_justification: sp_runtime::Justification,
_phantom: PhantomData<Header>,
} }
impl<Header> Justification<Header> { impl<Number> Justification<Number> {
/// Extract raw justification. /// Extract raw justification.
pub fn into_inner(self) -> sp_runtime::Justification { pub fn into_inner(self) -> sp_runtime::Justification {
self.raw_justification self.raw_justification
} }
} }
impl<Header> FinalityProof<Header::Number> for Justification<Header> impl<Number: relay_utils::BlockNumberBase> FinalityProof<Number> for Justification<Number> {
where fn target_header_number(&self) -> Number {
Header: HeaderT, self.target_header_number
{
fn target_header_number(&self) -> Option<Header::Number> {
bp_header_chain::justification::decode_justification_target::<Header>(&self.raw_justification)
.ok()
.map(|(_, number)| number)
} }
} }
@@ -96,11 +94,11 @@ where
Hash = C::Hash, Hash = C::Hash,
Number = C::BlockNumber, Number = C::BlockNumber,
Header = SyncHeader<C::Header>, Header = SyncHeader<C::Header>,
FinalityProof = Justification<C::Header>, FinalityProof = Justification<C::BlockNumber>,
>, >,
P::Header: SourceHeader<C::BlockNumber>, P::Header: SourceHeader<C::BlockNumber>,
{ {
type FinalityProofsStream = Pin<Box<dyn Stream<Item = Justification<C::Header>>>>; type FinalityProofsStream = Pin<Box<dyn Stream<Item = Justification<C::BlockNumber>>>>;
async fn best_finalized_block_number(&self) -> Result<P::Number, Error> { async fn best_finalized_block_number(&self) -> Result<P::Number, Error> {
// we **CAN** continue to relay finality proofs if source node is out of sync, because // we **CAN** continue to relay finality proofs if source node is out of sync, because
@@ -122,8 +120,8 @@ where
.justification() .justification()
.cloned() .cloned()
.map(|raw_justification| Justification { .map(|raw_justification| Justification {
target_header_number: number,
raw_justification, raw_justification,
_phantom: Default::default(),
}), }),
)) ))
} }
@@ -132,14 +130,31 @@ where
Ok(unfold( Ok(unfold(
self.client.clone().subscribe_justifications().await?, self.client.clone().subscribe_justifications().await?,
move |mut subscription| async move { move |mut subscription| async move {
let next_justification = subscription.next().await?; loop {
Some(( let next_justification = subscription.next().await?;
Justification { let decoded_target = decode_justification_target::<C::Header>(&next_justification.0);
raw_justification: next_justification.0, let target_header_number = match decoded_target {
_phantom: Default::default(), Ok((_, number)) => number,
}, Err(err) => {
subscription, log::error!(
)) target: "bridge",
"Failed to decode justification target from the {} justifications stream: {:?}",
P::SOURCE_NAME,
err,
);
continue;
}
};
return Some((
Justification {
target_header_number,
raw_justification: next_justification.0,
},
subscription,
));
}
}, },
) )
.boxed()) .boxed())
@@ -89,7 +89,7 @@ where
type Hash = HashOf<SourceChain>; type Hash = HashOf<SourceChain>;
type Number = BlockNumberOf<SourceChain>; type Number = BlockNumberOf<SourceChain>;
type Header = SyncHeader<SourceChain::Header>; type Header = SyncHeader<SourceChain::Header>;
type FinalityProof = Justification<SourceChain::Header>; type FinalityProof = Justification<SourceChain::BlockNumber>;
} }
/// Run Substrate-to-Substrate finality sync. /// Run Substrate-to-Substrate finality sync.
@@ -103,7 +103,7 @@ pub async fn run<SourceChain, TargetChain, P>(
Hash = HashOf<SourceChain>, Hash = HashOf<SourceChain>,
Number = BlockNumberOf<SourceChain>, Number = BlockNumberOf<SourceChain>,
Header = SyncHeader<SourceChain::Header>, Header = SyncHeader<SourceChain::Header>,
FinalityProof = Justification<SourceChain::Header>, FinalityProof = Justification<SourceChain::BlockNumber>,
>, >,
SourceChain: Clone + Chain, SourceChain: Clone + Chain,
BlockNumberOf<SourceChain>: BlockNumberBase, BlockNumberOf<SourceChain>: BlockNumberBase,
@@ -39,7 +39,7 @@ impl SubstrateFinalitySyncPipeline for MillauFinalityToRialto {
async fn make_submit_finality_proof_transaction( async fn make_submit_finality_proof_transaction(
&self, &self,
header: MillauSyncHeader, header: MillauSyncHeader,
proof: Justification<bp_millau::Header>, proof: Justification<bp_millau::BlockNumber>,
) -> Result<Self::SignedTransaction, SubstrateError> { ) -> Result<Self::SignedTransaction, SubstrateError> {
let account_id = self.target_sign.signer.public().as_array_ref().clone().into(); let account_id = self.target_sign.signer.public().as_array_ref().clone().into();
let nonce = self.target_client.next_account_index(account_id).await?; let nonce = self.target_client.next_account_index(account_id).await?;
@@ -39,7 +39,7 @@ impl SubstrateFinalitySyncPipeline for RialtoFinalityToMillau {
async fn make_submit_finality_proof_transaction( async fn make_submit_finality_proof_transaction(
&self, &self,
header: RialtoSyncHeader, header: RialtoSyncHeader,
proof: Justification<bp_rialto::Header>, proof: Justification<bp_rialto::BlockNumber>,
) -> Result<Self::SignedTransaction, SubstrateError> { ) -> Result<Self::SignedTransaction, SubstrateError> {
let account_id = self.target_sign.signer.public().as_array_ref().clone().into(); let account_id = self.target_sign.signer.public().as_array_ref().clone().into();
let nonce = self.target_client.next_account_index(account_id).await?; let nonce = self.target_client.next_account_index(account_id).await?;