Using-same-fork metric for finality and complex relay (#1327)

* using_same_fork metric in finality relay

* support `using_different_forks` in messages relay

* added dashboards and alerts

* lockfile
This commit is contained in:
Svyatoslav Nikolsky
2022-02-24 13:19:06 +03:00
committed by Bastian Köcher
parent 36443f3d54
commit e29b590785
14 changed files with 397 additions and 198 deletions
@@ -44,7 +44,11 @@ impl<Header> From<Header> for SyncHeader<Header> {
} }
} }
impl<Header: HeaderT> FinalitySourceHeader<Header::Number> for SyncHeader<Header> { impl<Header: HeaderT> FinalitySourceHeader<Header::Hash, Header::Number> for SyncHeader<Header> {
fn hash(&self) -> Header::Hash {
self.0.hash()
}
fn number(&self) -> Header::Number { fn number(&self) -> Header::Number {
*self.0.number() *self.0.number()
} }
+55 -14
View File
@@ -29,7 +29,7 @@ use futures::{select, Future, FutureExt, Stream, StreamExt};
use num_traits::{One, Saturating}; use num_traits::{One, Saturating};
use relay_utils::{ use relay_utils::{
metrics::MetricsParams, relay_loop::Client as RelayClient, retry_backoff, FailedClient, metrics::MetricsParams, relay_loop::Client as RelayClient, retry_backoff, FailedClient,
MaybeConnectionError, HeaderId, MaybeConnectionError,
}; };
use std::{ use std::{
pin::Pin, pin::Pin,
@@ -87,7 +87,9 @@ pub trait SourceClient<P: FinalitySyncPipeline>: RelayClient {
#[async_trait] #[async_trait]
pub trait TargetClient<P: FinalitySyncPipeline>: RelayClient { pub trait TargetClient<P: FinalitySyncPipeline>: RelayClient {
/// Get best finalized source block number. /// Get best finalized source block number.
async fn best_finalized_source_block_number(&self) -> Result<P::Number, Self::Error>; async fn best_finalized_source_block_id(
&self,
) -> Result<HeaderId<P::Hash, P::Number>, Self::Error>;
/// Submit header finality proof. /// Submit header finality proof.
async fn submit_finality_proof( async fn submit_finality_proof(
@@ -114,7 +116,11 @@ pub async fn run<P: FinalitySyncPipeline>(
let exit_signal = exit_signal.shared(); let exit_signal = exit_signal.shared();
relay_utils::relay_loop(source_client, target_client) relay_utils::relay_loop(source_client, target_client)
.with_metrics(metrics_params) .with_metrics(metrics_params)
.loop_metric(SyncLoopMetrics::new(Some(&metrics_prefix::<P>()))?)? .loop_metric(SyncLoopMetrics::new(
Some(&metrics_prefix::<P>()),
"source",
"source_at_target",
)?)?
.expose() .expose()
.await? .await?
.run(metrics_prefix::<P>(), move |source_client, target_client, metrics| { .run(metrics_prefix::<P>(), move |source_client, target_client, metrics| {
@@ -169,7 +175,7 @@ where
/// Information about transaction that we have submitted. /// Information about transaction that we have submitted.
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
struct Transaction<Number> { pub(crate) struct Transaction<Number> {
/// Time when we have submitted this transaction. /// Time when we have submitted this transaction.
pub time: Instant, pub time: Instant,
/// The number of the header we have submitted. /// The number of the header we have submitted.
@@ -181,7 +187,7 @@ pub(crate) struct RestartableFinalityProofsStream<S> {
/// Flag that the stream needs to be restarted. /// Flag that the stream needs to be restarted.
pub(crate) needs_restart: bool, pub(crate) needs_restart: bool,
/// The stream itself. /// The stream itself.
stream: Pin<Box<S>>, pub(crate) stream: Pin<Box<S>>,
} }
#[cfg(test)] #[cfg(test)]
@@ -192,15 +198,16 @@ impl<S> From<S> for RestartableFinalityProofsStream<S> {
} }
/// Finality synchronization loop state. /// Finality synchronization loop state.
struct FinalityLoopState<'a, P: FinalitySyncPipeline, FinalityProofsStream> { pub(crate) struct FinalityLoopState<'a, P: FinalitySyncPipeline, FinalityProofsStream> {
/// Synchronization loop progress. /// Synchronization loop progress.
progress: &'a mut (Instant, Option<P::Number>), pub(crate) progress: &'a mut (Instant, Option<P::Number>),
/// Finality proofs stream. /// Finality proofs stream.
finality_proofs_stream: &'a mut RestartableFinalityProofsStream<FinalityProofsStream>, pub(crate) finality_proofs_stream:
&'a mut RestartableFinalityProofsStream<FinalityProofsStream>,
/// Recent finality proofs that we have read from the stream. /// Recent finality proofs that we have read from the stream.
recent_finality_proofs: &'a mut FinalityProofs<P>, pub(crate) recent_finality_proofs: &'a mut FinalityProofs<P>,
/// Last transaction that we have submitted to the target node. /// Last transaction that we have submitted to the target node.
last_transaction: Option<Transaction<P::Number>>, pub(crate) last_transaction: Option<Transaction<P::Number>>,
} }
async fn run_until_connection_lost<P: FinalitySyncPipeline>( async fn run_until_connection_lost<P: FinalitySyncPipeline>(
@@ -280,7 +287,7 @@ async fn run_until_connection_lost<P: FinalitySyncPipeline>(
} }
} }
async fn run_loop_iteration<P, SC, TC>( pub(crate) async fn run_loop_iteration<P, SC, TC>(
source_client: &SC, source_client: &SC,
target_client: &TC, target_client: &TC,
state: FinalityLoopState<'_, P, SC::FinalityProofsStream>, state: FinalityLoopState<'_, P, SC::FinalityProofsStream>,
@@ -295,13 +302,31 @@ where
// read best source headers ids from source and target nodes // read best source headers ids from source and target nodes
let best_number_at_source = let best_number_at_source =
source_client.best_finalized_block_number().await.map_err(Error::Source)?; source_client.best_finalized_block_number().await.map_err(Error::Source)?;
let best_number_at_target = target_client let best_id_at_target =
.best_finalized_source_block_number() target_client.best_finalized_source_block_id().await.map_err(Error::Target)?;
let best_number_at_target = best_id_at_target.0;
let different_hash_at_source = ensure_same_fork::<P, _>(&best_id_at_target, source_client)
.await .await
.map_err(Error::Target)?; .map_err(Error::Source)?;
let using_same_fork = different_hash_at_source.is_none();
if let Some(ref different_hash_at_source) = different_hash_at_source {
log::error!(
target: "bridge",
"Source node ({}) and pallet at target node ({}) have different headers at the same height {:?}: \
at-source {:?} vs at-target {:?}",
P::SOURCE_NAME,
P::TARGET_NAME,
best_number_at_target,
different_hash_at_source,
best_id_at_target.1,
);
}
if let Some(ref metrics_sync) = *metrics_sync { if let Some(ref metrics_sync) = *metrics_sync {
metrics_sync.update_best_block_at_source(best_number_at_source); metrics_sync.update_best_block_at_source(best_number_at_source);
metrics_sync.update_best_block_at_target(best_number_at_target); metrics_sync.update_best_block_at_target(best_number_at_target);
metrics_sync.update_using_same_fork(using_same_fork);
} }
*state.progress = *state.progress =
print_sync_progress::<P>(*state.progress, best_number_at_source, best_number_at_target); print_sync_progress::<P>(*state.progress, best_number_at_source, best_number_at_target);
@@ -427,6 +452,22 @@ where
Ok(selected_finality_proof) Ok(selected_finality_proof)
} }
/// Ensures that both clients are on the same fork.
///
/// Returns `Some(_)` with header has at the source client if headers are different.
async fn ensure_same_fork<P: FinalitySyncPipeline, SC: SourceClient<P>>(
best_id_at_target: &HeaderId<P::Hash, P::Number>,
source_client: &SC,
) -> Result<Option<P::Hash>, SC::Error> {
let header_at_source = source_client.header_and_finality_proof(best_id_at_target.0).await?.0;
let header_hash_at_source = header_at_source.hash();
Ok(if best_id_at_target.1 == header_hash_at_source {
None
} else {
Some(header_hash_at_source)
})
}
/// Finality proof that has been selected by the `read_missing_headers` function. /// Finality proof that has been selected by the `read_missing_headers` function.
pub(crate) enum SelectedFinalityProof<Header, FinalityProof> { pub(crate) enum SelectedFinalityProof<Header, FinalityProof> {
/// Mandatory header and its proof has been selected. We shall submit proof for this header. /// Mandatory header and its proof has been selected. We shall submit proof for this header.
@@ -20,10 +20,12 @@
use crate::{ use crate::{
finality_loop::{ finality_loop::{
prune_recent_finality_proofs, read_finality_proofs_from_stream, run, prune_recent_finality_proofs, read_finality_proofs_from_stream, run, run_loop_iteration,
select_better_recent_finality_proof, select_header_to_submit, FinalityProofs, select_better_recent_finality_proof, select_header_to_submit, FinalityLoopState,
FinalitySyncParams, RestartableFinalityProofsStream, SourceClient, TargetClient, FinalityProofs, FinalitySyncParams, RestartableFinalityProofsStream, SourceClient,
TargetClient,
}, },
sync_loop_metrics::SyncLoopMetrics,
FinalityProof, FinalitySyncPipeline, SourceHeader, FinalityProof, FinalitySyncPipeline, SourceHeader,
}; };
@@ -31,12 +33,18 @@ use async_trait::async_trait;
use futures::{FutureExt, Stream, StreamExt}; use futures::{FutureExt, Stream, StreamExt};
use parking_lot::Mutex; use parking_lot::Mutex;
use relay_utils::{ use relay_utils::{
metrics::MetricsParams, relay_loop::Client as RelayClient, MaybeConnectionError, metrics::MetricsParams, relay_loop::Client as RelayClient, HeaderId, MaybeConnectionError,
};
use std::{
collections::HashMap,
pin::Pin,
sync::Arc,
time::{Duration, Instant},
}; };
use std::{collections::HashMap, pin::Pin, sync::Arc, time::Duration};
type IsMandatory = bool; type IsMandatory = bool;
type TestNumber = u64; type TestNumber = u64;
type TestHash = u64;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
enum TestError { enum TestError {
@@ -56,16 +64,20 @@ impl FinalitySyncPipeline for TestFinalitySyncPipeline {
const SOURCE_NAME: &'static str = "TestSource"; const SOURCE_NAME: &'static str = "TestSource";
const TARGET_NAME: &'static str = "TestTarget"; const TARGET_NAME: &'static str = "TestTarget";
type Hash = u64; type Hash = TestHash;
type Number = TestNumber; type Number = TestNumber;
type Header = TestSourceHeader; type Header = TestSourceHeader;
type FinalityProof = TestFinalityProof; type FinalityProof = TestFinalityProof;
} }
#[derive(Debug, Clone, PartialEq)] #[derive(Debug, Clone, PartialEq)]
struct TestSourceHeader(IsMandatory, TestNumber); struct TestSourceHeader(IsMandatory, TestNumber, TestHash);
impl SourceHeader<TestHash, TestNumber> for TestSourceHeader {
fn hash(&self) -> TestHash {
self.2
}
impl SourceHeader<TestNumber> for TestSourceHeader {
fn number(&self) -> TestNumber { fn number(&self) -> TestNumber {
self.1 self.1
} }
@@ -90,7 +102,7 @@ struct ClientsData {
source_headers: HashMap<TestNumber, (TestSourceHeader, Option<TestFinalityProof>)>, source_headers: HashMap<TestNumber, (TestSourceHeader, Option<TestFinalityProof>)>,
source_proofs: Vec<TestFinalityProof>, source_proofs: Vec<TestFinalityProof>,
target_best_block_number: TestNumber, target_best_block_id: HeaderId<TestHash, TestNumber>,
target_headers: Vec<(TestSourceHeader, TestFinalityProof)>, target_headers: Vec<(TestSourceHeader, TestFinalityProof)>,
} }
@@ -152,10 +164,12 @@ impl RelayClient for TestTargetClient {
#[async_trait] #[async_trait]
impl TargetClient<TestFinalitySyncPipeline> for TestTargetClient { impl TargetClient<TestFinalitySyncPipeline> for TestTargetClient {
async fn best_finalized_source_block_number(&self) -> Result<TestNumber, TestError> { async fn best_finalized_source_block_id(
&self,
) -> Result<HeaderId<TestHash, TestNumber>, TestError> {
let mut data = self.data.lock(); let mut data = self.data.lock();
(self.on_method_call)(&mut *data); (self.on_method_call)(&mut *data);
Ok(data.target_best_block_number) Ok(data.target_best_block_id)
} }
async fn submit_finality_proof( async fn submit_finality_proof(
@@ -165,7 +179,7 @@ impl TargetClient<TestFinalitySyncPipeline> for TestTargetClient {
) -> Result<(), TestError> { ) -> Result<(), TestError> {
let mut data = self.data.lock(); let mut data = self.data.lock();
(self.on_method_call)(&mut *data); (self.on_method_call)(&mut *data);
data.target_best_block_number = header.number(); data.target_best_block_id = HeaderId(header.number(), header.hash());
data.target_headers.push((header, proof)); data.target_headers.push((header, proof));
Ok(()) Ok(())
} }
@@ -187,7 +201,7 @@ fn prepare_test_clients(
source_headers, source_headers,
source_proofs: vec![TestFinalityProof(12), TestFinalityProof(14)], source_proofs: vec![TestFinalityProof(12), TestFinalityProof(14)],
target_best_block_number: 5, target_best_block_id: HeaderId(5, 5),
target_headers: vec![], target_headers: vec![],
})); }));
( (
@@ -199,6 +213,15 @@ fn prepare_test_clients(
) )
} }
fn test_sync_params() -> FinalitySyncParams {
FinalitySyncParams {
tick: Duration::from_secs(0),
recent_finality_proofs_limit: 1024,
stall_timeout: Duration::from_secs(1),
only_mandatory_headers: false,
}
}
fn run_sync_loop( fn run_sync_loop(
state_function: impl Fn(&mut ClientsData) -> bool + Send + Sync + 'static, state_function: impl Fn(&mut ClientsData) -> bool + Send + Sync + 'static,
) -> ClientsData { ) -> ClientsData {
@@ -207,21 +230,17 @@ fn run_sync_loop(
exit_sender, exit_sender,
state_function, state_function,
vec![ vec![
(6, (TestSourceHeader(false, 6), None)), (5, (TestSourceHeader(false, 5, 5), None)),
(7, (TestSourceHeader(false, 7), Some(TestFinalityProof(7)))), (6, (TestSourceHeader(false, 6, 6), None)),
(8, (TestSourceHeader(true, 8), Some(TestFinalityProof(8)))), (7, (TestSourceHeader(false, 7, 7), Some(TestFinalityProof(7)))),
(9, (TestSourceHeader(false, 9), Some(TestFinalityProof(9)))), (8, (TestSourceHeader(true, 8, 8), Some(TestFinalityProof(8)))),
(10, (TestSourceHeader(false, 10), None)), (9, (TestSourceHeader(false, 9, 9), Some(TestFinalityProof(9)))),
(10, (TestSourceHeader(false, 10, 10), None)),
] ]
.into_iter() .into_iter()
.collect(), .collect(),
); );
let sync_params = FinalitySyncParams { let sync_params = test_sync_params();
tick: Duration::from_secs(0),
recent_finality_proofs_limit: 1024,
stall_timeout: Duration::from_secs(1),
only_mandatory_headers: false,
};
let clients_data = source_client.data.clone(); let clients_data = source_client.data.clone();
let _ = async_std::task::block_on(run( let _ = async_std::task::block_on(run(
@@ -246,38 +265,38 @@ fn finality_sync_loop_works() {
// //
// once this ^^^ is done, we generate more blocks && read proof for blocks 12 and 14 from // once this ^^^ is done, we generate more blocks && read proof for blocks 12 and 14 from
// the stream // the stream
if data.target_best_block_number == 9 { if data.target_best_block_id.0 == 9 {
data.source_best_block_number = 14; data.source_best_block_number = 14;
data.source_headers.insert(11, (TestSourceHeader(false, 11), None)); data.source_headers.insert(11, (TestSourceHeader(false, 11, 11), None));
data.source_headers data.source_headers
.insert(12, (TestSourceHeader(false, 12), Some(TestFinalityProof(12)))); .insert(12, (TestSourceHeader(false, 12, 12), Some(TestFinalityProof(12))));
data.source_headers.insert(13, (TestSourceHeader(false, 13), None)); data.source_headers.insert(13, (TestSourceHeader(false, 13, 13), None));
data.source_headers data.source_headers
.insert(14, (TestSourceHeader(false, 14), Some(TestFinalityProof(14)))); .insert(14, (TestSourceHeader(false, 14, 14), Some(TestFinalityProof(14))));
} }
// once this ^^^ is done, we generate more blocks && read persistent proof for block 16 // once this ^^^ is done, we generate more blocks && read persistent proof for block 16
if data.target_best_block_number == 14 { if data.target_best_block_id.0 == 14 {
data.source_best_block_number = 17; data.source_best_block_number = 17;
data.source_headers.insert(15, (TestSourceHeader(false, 15), None)); data.source_headers.insert(15, (TestSourceHeader(false, 15, 15), None));
data.source_headers data.source_headers
.insert(16, (TestSourceHeader(false, 16), Some(TestFinalityProof(16)))); .insert(16, (TestSourceHeader(false, 16, 16), Some(TestFinalityProof(16))));
data.source_headers.insert(17, (TestSourceHeader(false, 17), None)); data.source_headers.insert(17, (TestSourceHeader(false, 17, 17), None));
} }
data.target_best_block_number == 16 data.target_best_block_id.0 == 16
}); });
assert_eq!( assert_eq!(
client_data.target_headers, client_data.target_headers,
vec![ vec![
// before adding 11..14: finality proof for mandatory header#8 // before adding 11..14: finality proof for mandatory header#8
(TestSourceHeader(true, 8), TestFinalityProof(8)), (TestSourceHeader(true, 8, 8), TestFinalityProof(8)),
// before adding 11..14: persistent finality proof for non-mandatory header#9 // before adding 11..14: persistent finality proof for non-mandatory header#9
(TestSourceHeader(false, 9), TestFinalityProof(9)), (TestSourceHeader(false, 9, 9), TestFinalityProof(9)),
// after adding 11..14: ephemeral finality proof for non-mandatory header#14 // after adding 11..14: ephemeral finality proof for non-mandatory header#14
(TestSourceHeader(false, 14), TestFinalityProof(14)), (TestSourceHeader(false, 14, 14), TestFinalityProof(14)),
// after adding 15..17: persistent finality proof for non-mandatory header#16 // after adding 15..17: persistent finality proof for non-mandatory header#16
(TestSourceHeader(false, 16), TestFinalityProof(16)), (TestSourceHeader(false, 16, 16), TestFinalityProof(16)),
], ],
); );
} }
@@ -291,11 +310,11 @@ fn run_only_mandatory_headers_mode_test(
exit_sender, exit_sender,
|_| false, |_| false,
vec![ vec![
(6, (TestSourceHeader(false, 6), Some(TestFinalityProof(6)))), (6, (TestSourceHeader(false, 6, 6), Some(TestFinalityProof(6)))),
(7, (TestSourceHeader(false, 7), Some(TestFinalityProof(7)))), (7, (TestSourceHeader(false, 7, 7), Some(TestFinalityProof(7)))),
(8, (TestSourceHeader(has_mandatory_headers, 8), Some(TestFinalityProof(8)))), (8, (TestSourceHeader(has_mandatory_headers, 8, 8), Some(TestFinalityProof(8)))),
(9, (TestSourceHeader(false, 9), Some(TestFinalityProof(9)))), (9, (TestSourceHeader(false, 9, 9), Some(TestFinalityProof(9)))),
(10, (TestSourceHeader(false, 10), Some(TestFinalityProof(10)))), (10, (TestSourceHeader(false, 10, 10), Some(TestFinalityProof(10)))),
] ]
.into_iter() .into_iter()
.collect(), .collect(),
@@ -322,7 +341,7 @@ fn select_header_to_submit_skips_non_mandatory_headers_when_only_mandatory_heade
assert_eq!(run_only_mandatory_headers_mode_test(true, false), None); assert_eq!(run_only_mandatory_headers_mode_test(true, false), None);
assert_eq!( assert_eq!(
run_only_mandatory_headers_mode_test(false, false), run_only_mandatory_headers_mode_test(false, false),
Some((TestSourceHeader(false, 10), TestFinalityProof(10))), Some((TestSourceHeader(false, 10, 10), TestFinalityProof(10))),
); );
} }
@@ -330,11 +349,11 @@ fn select_header_to_submit_skips_non_mandatory_headers_when_only_mandatory_heade
fn select_header_to_submit_selects_mandatory_headers_when_only_mandatory_headers_are_required() { fn select_header_to_submit_selects_mandatory_headers_when_only_mandatory_headers_are_required() {
assert_eq!( assert_eq!(
run_only_mandatory_headers_mode_test(true, true), run_only_mandatory_headers_mode_test(true, true),
Some((TestSourceHeader(true, 8), TestFinalityProof(8))), Some((TestSourceHeader(true, 8, 8), TestFinalityProof(8))),
); );
assert_eq!( assert_eq!(
run_only_mandatory_headers_mode_test(false, true), run_only_mandatory_headers_mode_test(false, true),
Some((TestSourceHeader(true, 8), TestFinalityProof(8))), Some((TestSourceHeader(true, 8, 8), TestFinalityProof(8))),
); );
} }
@@ -345,63 +364,74 @@ fn select_better_recent_finality_proof_works() {
select_better_recent_finality_proof::<TestFinalitySyncPipeline>( select_better_recent_finality_proof::<TestFinalitySyncPipeline>(
&[(5, TestFinalityProof(5))], &[(5, TestFinalityProof(5))],
&mut vec![], &mut vec![],
Some((TestSourceHeader(false, 2), TestFinalityProof(2))), Some((TestSourceHeader(false, 2, 2), TestFinalityProof(2))),
), ),
Some((TestSourceHeader(false, 2), TestFinalityProof(2))), Some((TestSourceHeader(false, 2, 2), TestFinalityProof(2))),
); );
// if there are no recent finality proofs, nothing is changed // if there are no recent finality proofs, nothing is changed
assert_eq!( assert_eq!(
select_better_recent_finality_proof::<TestFinalitySyncPipeline>( select_better_recent_finality_proof::<TestFinalitySyncPipeline>(
&[], &[],
&mut vec![TestSourceHeader(false, 5)], &mut vec![TestSourceHeader(false, 5, 5)],
Some((TestSourceHeader(false, 2), TestFinalityProof(2))), Some((TestSourceHeader(false, 2, 2), TestFinalityProof(2))),
), ),
Some((TestSourceHeader(false, 2), TestFinalityProof(2))), Some((TestSourceHeader(false, 2, 2), TestFinalityProof(2))),
); );
// if there's no intersection between recent finality proofs and unjustified headers, nothing is // if there's no intersection between recent finality proofs and unjustified headers, nothing is
// changed // changed
let mut unjustified_headers = vec![TestSourceHeader(false, 9), TestSourceHeader(false, 10)]; let mut unjustified_headers =
vec![TestSourceHeader(false, 9, 9), TestSourceHeader(false, 10, 10)];
assert_eq!( assert_eq!(
select_better_recent_finality_proof::<TestFinalitySyncPipeline>( select_better_recent_finality_proof::<TestFinalitySyncPipeline>(
&[(1, TestFinalityProof(1)), (4, TestFinalityProof(4))], &[(1, TestFinalityProof(1)), (4, TestFinalityProof(4))],
&mut unjustified_headers, &mut unjustified_headers,
Some((TestSourceHeader(false, 2), TestFinalityProof(2))), Some((TestSourceHeader(false, 2, 2), TestFinalityProof(2))),
), ),
Some((TestSourceHeader(false, 2), TestFinalityProof(2))), Some((TestSourceHeader(false, 2, 2), TestFinalityProof(2))),
); );
// if there's intersection between recent finality proofs and unjustified headers, but there are // if there's intersection between recent finality proofs and unjustified headers, but there are
// no proofs in this intersection, nothing is changed // no proofs in this intersection, nothing is changed
let mut unjustified_headers = let mut unjustified_headers = vec![
vec![TestSourceHeader(false, 8), TestSourceHeader(false, 9), TestSourceHeader(false, 10)]; TestSourceHeader(false, 8, 8),
TestSourceHeader(false, 9, 9),
TestSourceHeader(false, 10, 10),
];
assert_eq!( assert_eq!(
select_better_recent_finality_proof::<TestFinalitySyncPipeline>( select_better_recent_finality_proof::<TestFinalitySyncPipeline>(
&[(7, TestFinalityProof(7)), (11, TestFinalityProof(11))], &[(7, TestFinalityProof(7)), (11, TestFinalityProof(11))],
&mut unjustified_headers, &mut unjustified_headers,
Some((TestSourceHeader(false, 2), TestFinalityProof(2))), Some((TestSourceHeader(false, 2, 2), TestFinalityProof(2))),
), ),
Some((TestSourceHeader(false, 2), TestFinalityProof(2))), Some((TestSourceHeader(false, 2, 2), TestFinalityProof(2))),
); );
assert_eq!( assert_eq!(
unjustified_headers, unjustified_headers,
vec![TestSourceHeader(false, 8), TestSourceHeader(false, 9), TestSourceHeader(false, 10)] vec![
TestSourceHeader(false, 8, 8),
TestSourceHeader(false, 9, 9),
TestSourceHeader(false, 10, 10)
]
); );
// if there's intersection between recent finality proofs and unjustified headers and there's // if there's intersection between recent finality proofs and unjustified headers and there's
// a proof in this intersection: // a proof in this intersection:
// - this better (last from intersection) proof is selected; // - this better (last from intersection) proof is selected;
// - 'obsolete' unjustified headers are pruned. // - 'obsolete' unjustified headers are pruned.
let mut unjustified_headers = let mut unjustified_headers = vec![
vec![TestSourceHeader(false, 8), TestSourceHeader(false, 9), TestSourceHeader(false, 10)]; TestSourceHeader(false, 8, 8),
TestSourceHeader(false, 9, 9),
TestSourceHeader(false, 10, 10),
];
assert_eq!( assert_eq!(
select_better_recent_finality_proof::<TestFinalitySyncPipeline>( select_better_recent_finality_proof::<TestFinalitySyncPipeline>(
&[(7, TestFinalityProof(7)), (9, TestFinalityProof(9))], &[(7, TestFinalityProof(7)), (9, TestFinalityProof(9))],
&mut unjustified_headers, &mut unjustified_headers,
Some((TestSourceHeader(false, 2), TestFinalityProof(2))), Some((TestSourceHeader(false, 2, 2), TestFinalityProof(2))),
), ),
Some((TestSourceHeader(false, 9), TestFinalityProof(9))), Some((TestSourceHeader(false, 9, 9), TestFinalityProof(9))),
); );
} }
@@ -475,3 +505,45 @@ fn prune_recent_finality_proofs_works() {
prune_recent_finality_proofs::<TestFinalitySyncPipeline>(20, &mut recent_finality_proofs, 2); prune_recent_finality_proofs::<TestFinalitySyncPipeline>(20, &mut recent_finality_proofs, 2);
assert_eq!(&original_recent_finality_proofs[5..], recent_finality_proofs,); assert_eq!(&original_recent_finality_proofs[5..], recent_finality_proofs,);
} }
#[test]
fn different_forks_at_source_and_at_target_are_detected() {
let (exit_sender, _exit_receiver) = futures::channel::mpsc::unbounded();
let (source_client, target_client) = prepare_test_clients(
exit_sender,
|_| false,
vec![
(5, (TestSourceHeader(false, 5, 42), None)),
(6, (TestSourceHeader(false, 6, 6), None)),
(7, (TestSourceHeader(false, 7, 7), None)),
(8, (TestSourceHeader(false, 8, 8), None)),
(9, (TestSourceHeader(false, 9, 9), None)),
(10, (TestSourceHeader(false, 10, 10), None)),
]
.into_iter()
.collect(),
);
let mut progress = (Instant::now(), None);
let mut finality_proofs_stream = RestartableFinalityProofsStream {
needs_restart: false,
stream: Box::pin(futures::stream::iter(vec![]).boxed()),
};
let mut recent_finality_proofs = Vec::new();
let metrics_sync = SyncLoopMetrics::new(None, "source", "target").unwrap();
async_std::task::block_on(run_loop_iteration::<TestFinalitySyncPipeline, _, _>(
&source_client,
&target_client,
FinalityLoopState {
progress: &mut progress,
finality_proofs_stream: &mut finality_proofs_stream,
recent_finality_proofs: &mut recent_finality_proofs,
last_transaction: None,
},
&test_sync_params(),
&Some(metrics_sync.clone()),
))
.unwrap();
assert!(!metrics_sync.is_using_same_fork());
}
+7 -4
View File
@@ -19,8 +19,9 @@
//! are still submitted to the target node, but are treated as auxiliary data as we are not trying //! are still submitted to the target node, but are treated as auxiliary data as we are not trying
//! to submit all source headers to the target node. //! to submit all source headers to the target node.
pub use crate::finality_loop::{ pub use crate::{
metrics_prefix, run, FinalitySyncParams, SourceClient, TargetClient, finality_loop::{metrics_prefix, run, FinalitySyncParams, SourceClient, TargetClient},
sync_loop_metrics::SyncLoopMetrics,
}; };
use bp_header_chain::FinalityProof; use bp_header_chain::FinalityProof;
@@ -42,13 +43,15 @@ pub trait FinalitySyncPipeline: 'static + Clone + Debug + Send + Sync {
/// Headers we're syncing are identified by this number. /// Headers we're syncing are identified by this number.
type Number: relay_utils::BlockNumberBase; type Number: relay_utils::BlockNumberBase;
/// Type of header that we're syncing. /// Type of header that we're syncing.
type Header: SourceHeader<Self::Number>; type Header: SourceHeader<Self::Hash, Self::Number>;
/// Finality proof type. /// Finality proof type.
type FinalityProof: FinalityProof<Self::Number>; type FinalityProof: FinalityProof<Self::Number>;
} }
/// Header that we're receiving from source node. /// Header that we're receiving from source node.
pub trait SourceHeader<Number>: Clone + Debug + PartialEq + Send + Sync { pub trait SourceHeader<Hash, Number>: Clone + Debug + PartialEq + Send + Sync {
/// Returns hash of header.
fn hash(&self) -> Hash;
/// Returns number of header. /// Returns number of header.
fn number(&self) -> Number; fn number(&self) -> Number;
/// Returns true if this header needs to be submitted to target node. /// Returns true if this header needs to be submitted to target node.
@@ -16,49 +16,71 @@
//! Metrics for headers synchronization relay loop. //! Metrics for headers synchronization relay loop.
use relay_utils::metrics::{ use relay_utils::metrics::{metric_name, register, IntGauge, Metric, PrometheusError, Registry};
metric_name, register, GaugeVec, Metric, Opts, PrometheusError, Registry, U64,
};
/// Headers sync metrics. /// Headers sync metrics.
#[derive(Clone)] #[derive(Clone)]
pub struct SyncLoopMetrics { pub struct SyncLoopMetrics {
/// Best syncing headers at "source" and "target" nodes. /// Best syncing header at the source.
best_block_numbers: GaugeVec<U64>, best_source_block_number: IntGauge,
/// Best syncing header at the target.
best_target_block_number: IntGauge,
/// Flag that has `0` value when best source headers at the source node and at-target-chain
/// are matching and `1` otherwise.
using_different_forks: IntGauge,
} }
impl SyncLoopMetrics { impl SyncLoopMetrics {
/// Create and register headers loop metrics. /// Create and register headers loop metrics.
pub fn new(prefix: Option<&str>) -> Result<Self, PrometheusError> { pub fn new(
prefix: Option<&str>,
at_source_chain_label: &str,
at_target_chain_label: &str,
) -> Result<Self, PrometheusError> {
Ok(SyncLoopMetrics { Ok(SyncLoopMetrics {
best_block_numbers: GaugeVec::new( best_source_block_number: IntGauge::new(
Opts::new( metric_name(prefix, &format!("best_{}_block_number", at_source_chain_label)),
metric_name(prefix, "best_block_numbers"), format!("Best block number at the {}", at_source_chain_label),
"Best block numbers on source and target nodes", )?,
), best_target_block_number: IntGauge::new(
&["node"], metric_name(prefix, &format!("best_{}_block_number", at_target_chain_label)),
format!("Best block number at the {}", at_target_chain_label),
)?,
using_different_forks: IntGauge::new(
metric_name(prefix, &format!("is_{}_and_{}_using_different_forks", at_source_chain_label, at_target_chain_label)),
"Whether the best finalized source block at target node is different (value 1) from the \
corresponding block at the source node",
)?, )?,
}) })
} }
/// Returns current value of the using-same-fork flag.
#[cfg(test)]
pub(crate) fn is_using_same_fork(&self) -> bool {
self.using_different_forks.get() == 0
}
/// Update best block number at source. /// Update best block number at source.
pub fn update_best_block_at_source<Number: Into<u64>>(&self, source_best_number: Number) { pub fn update_best_block_at_source<Number: Into<u64>>(&self, source_best_number: Number) {
self.best_block_numbers self.best_source_block_number.set(source_best_number.into());
.with_label_values(&["source"])
.set(source_best_number.into());
} }
/// Update best block number at target. /// Update best block number at target.
pub fn update_best_block_at_target<Number: Into<u64>>(&self, target_best_number: Number) { pub fn update_best_block_at_target<Number: Into<u64>>(&self, target_best_number: Number) {
self.best_block_numbers self.best_target_block_number.set(target_best_number.into());
.with_label_values(&["target"]) }
.set(target_best_number.into());
/// Update using-same-fork flag.
pub fn update_using_same_fork(&self, using_same_fork: bool) {
self.using_different_forks.set(if using_same_fork { 0 } else { 1 })
} }
} }
impl Metric for SyncLoopMetrics { impl Metric for SyncLoopMetrics {
fn register(&self, registry: &Registry) -> Result<(), PrometheusError> { fn register(&self, registry: &Registry) -> Result<(), PrometheusError> {
register(self.best_block_numbers.clone(), registry)?; register(self.best_source_block_number.clone(), registry)?;
register(self.best_target_block_number.clone(), registry)?;
register(self.using_different_forks.clone(), registry)?;
Ok(()) Ok(())
} }
} }
@@ -30,8 +30,8 @@ use bp_header_chain::{justification::GrandpaJustification, storage_keys::is_halt
use codec::Encode; use codec::Encode;
use finality_relay::TargetClient; use finality_relay::TargetClient;
use relay_substrate_client::{ use relay_substrate_client::{
AccountIdOf, AccountKeyPairOf, BlockNumberOf, Chain, ChainWithGrandpa, Client, Error, HashOf, AccountIdOf, AccountKeyPairOf, Chain, ChainWithGrandpa, Client, Error, HeaderIdOf, HeaderOf,
HeaderOf, SignParam, SyncHeader, TransactionEra, TransactionSignScheme, UnsignedTransaction, SignParam, SyncHeader, TransactionEra, TransactionSignScheme, UnsignedTransaction,
}; };
use relay_utils::relay_loop::Client as RelayClient; use relay_utils::relay_loop::Client as RelayClient;
use sp_core::{Bytes, Pair}; use sp_core::{Bytes, Pair};
@@ -90,23 +90,20 @@ where
AccountIdOf<P::TargetChain>: From<<AccountKeyPairOf<P::TransactionSignScheme> as Pair>::Public>, AccountIdOf<P::TargetChain>: From<<AccountKeyPairOf<P::TransactionSignScheme> as Pair>::Public>,
P::TransactionSignScheme: TransactionSignScheme<Chain = P::TargetChain>, P::TransactionSignScheme: TransactionSignScheme<Chain = P::TargetChain>,
{ {
async fn best_finalized_source_block_number( async fn best_finalized_source_block_id(&self) -> Result<HeaderIdOf<P::SourceChain>, Error> {
&self,
) -> Result<BlockNumberOf<P::SourceChain>, Error> {
// we can't continue to relay finality if target node is out of sync, because // we can't continue to relay finality if target node is out of sync, because
// it may have already received (some of) headers that we're going to relay // it may have already received (some of) headers that we're going to relay
self.client.ensure_synced().await?; self.client.ensure_synced().await?;
// we can't relay finality if GRANDPA pallet at target chain is halted // we can't relay finality if GRANDPA pallet at target chain is halted
self.ensure_pallet_active().await?; self.ensure_pallet_active().await?;
Ok(crate::messages_source::read_client_state::< Ok(crate::messages_source::read_client_state::<P::TargetChain, P::SourceChain>(
P::TargetChain, &self.client,
HashOf<P::SourceChain>, None,
BlockNumberOf<P::SourceChain>, P::SourceChain::BEST_FINALIZED_HEADER_ID_METHOD,
>(&self.client, P::SourceChain::BEST_FINALIZED_HEADER_ID_METHOD) )
.await? .await?
.best_finalized_peer_at_best_self .best_finalized_peer_at_best_self)
.0)
} }
async fn submit_finality_proof( async fn submit_finality_proof(
@@ -234,13 +234,15 @@ where
}, },
}, },
SubstrateMessagesSource::<P>::new( SubstrateMessagesSource::<P>::new(
source_client, source_client.clone(),
target_client.clone(),
params.lane_id, params.lane_id,
params.source_transaction_params, params.source_transaction_params,
params.target_to_source_headers_relay, params.target_to_source_headers_relay,
), ),
SubstrateMessagesTarget::<P>::new( SubstrateMessagesTarget::<P>::new(
target_client, target_client,
source_client,
params.lane_id, params.lane_id,
relayer_id_at_source, relayer_id_at_source,
params.target_transaction_params, params.target_transaction_params,
@@ -46,7 +46,7 @@ use messages_relay::{
}; };
use num_traits::{Bounded, Zero}; use num_traits::{Bounded, Zero};
use relay_substrate_client::{ use relay_substrate_client::{
AccountIdOf, AccountKeyPairOf, BalanceOf, Chain, ChainWithMessages, Client, AccountIdOf, AccountKeyPairOf, BalanceOf, BlockNumberOf, Chain, ChainWithMessages, Client,
Error as SubstrateError, HashOf, HeaderIdOf, IndexOf, SignParam, TransactionEra, Error as SubstrateError, HashOf, HeaderIdOf, IndexOf, SignParam, TransactionEra,
TransactionSignScheme, UnsignedTransaction, TransactionSignScheme, UnsignedTransaction,
}; };
@@ -62,7 +62,8 @@ pub type SubstrateMessagesProof<C> = (Weight, FromBridgedChainMessagesProof<Hash
/// Substrate client as Substrate messages source. /// Substrate client as Substrate messages source.
pub struct SubstrateMessagesSource<P: SubstrateMessageLane> { pub struct SubstrateMessagesSource<P: SubstrateMessageLane> {
client: Client<P::SourceChain>, source_client: Client<P::SourceChain>,
target_client: Client<P::TargetChain>,
lane_id: LaneId, lane_id: LaneId,
transaction_params: TransactionParams<AccountKeyPairOf<P::SourceTransactionSignScheme>>, transaction_params: TransactionParams<AccountKeyPairOf<P::SourceTransactionSignScheme>>,
target_to_source_headers_relay: Option<OnDemandHeadersRelay<P::TargetChain>>, target_to_source_headers_relay: Option<OnDemandHeadersRelay<P::TargetChain>>,
@@ -71,13 +72,15 @@ pub struct SubstrateMessagesSource<P: SubstrateMessageLane> {
impl<P: SubstrateMessageLane> SubstrateMessagesSource<P> { impl<P: SubstrateMessageLane> SubstrateMessagesSource<P> {
/// Create new Substrate headers source. /// Create new Substrate headers source.
pub fn new( pub fn new(
client: Client<P::SourceChain>, source_client: Client<P::SourceChain>,
target_client: Client<P::TargetChain>,
lane_id: LaneId, lane_id: LaneId,
transaction_params: TransactionParams<AccountKeyPairOf<P::SourceTransactionSignScheme>>, transaction_params: TransactionParams<AccountKeyPairOf<P::SourceTransactionSignScheme>>,
target_to_source_headers_relay: Option<OnDemandHeadersRelay<P::TargetChain>>, target_to_source_headers_relay: Option<OnDemandHeadersRelay<P::TargetChain>>,
) -> Self { ) -> Self {
SubstrateMessagesSource { SubstrateMessagesSource {
client, source_client,
target_client,
lane_id, lane_id,
transaction_params, transaction_params,
target_to_source_headers_relay, target_to_source_headers_relay,
@@ -89,7 +92,7 @@ impl<P: SubstrateMessageLane> SubstrateMessagesSource<P> {
&self, &self,
id: SourceHeaderIdOf<MessageLaneAdapter<P>>, id: SourceHeaderIdOf<MessageLaneAdapter<P>>,
) -> Result<Option<OutboundLaneData>, SubstrateError> { ) -> Result<Option<OutboundLaneData>, SubstrateError> {
self.client self.source_client
.storage_value( .storage_value(
outbound_lane_data_key( outbound_lane_data_key(
P::TargetChain::WITH_CHAIN_MESSAGES_PALLET_NAME, P::TargetChain::WITH_CHAIN_MESSAGES_PALLET_NAME,
@@ -102,14 +105,15 @@ impl<P: SubstrateMessageLane> SubstrateMessagesSource<P> {
/// Ensure that the messages pallet at source chain is active. /// Ensure that the messages pallet at source chain is active.
async fn ensure_pallet_active(&self) -> Result<(), SubstrateError> { async fn ensure_pallet_active(&self) -> Result<(), SubstrateError> {
ensure_messages_pallet_active::<P::SourceChain, P::TargetChain>(&self.client).await ensure_messages_pallet_active::<P::SourceChain, P::TargetChain>(&self.source_client).await
} }
} }
impl<P: SubstrateMessageLane> Clone for SubstrateMessagesSource<P> { impl<P: SubstrateMessageLane> Clone for SubstrateMessagesSource<P> {
fn clone(&self) -> Self { fn clone(&self) -> Self {
Self { Self {
client: self.client.clone(), source_client: self.source_client.clone(),
target_client: self.target_client.clone(),
lane_id: self.lane_id, lane_id: self.lane_id,
transaction_params: self.transaction_params.clone(), transaction_params: self.transaction_params.clone(),
target_to_source_headers_relay: self.target_to_source_headers_relay.clone(), target_to_source_headers_relay: self.target_to_source_headers_relay.clone(),
@@ -122,7 +126,8 @@ impl<P: SubstrateMessageLane> RelayClient for SubstrateMessagesSource<P> {
type Error = SubstrateError; type Error = SubstrateError;
async fn reconnect(&mut self) -> Result<(), SubstrateError> { async fn reconnect(&mut self) -> Result<(), SubstrateError> {
self.client.reconnect().await self.source_client.reconnect().await?;
self.target_client.reconnect().await
} }
} }
@@ -136,15 +141,15 @@ where
async fn state(&self) -> Result<SourceClientState<MessageLaneAdapter<P>>, SubstrateError> { async fn state(&self) -> Result<SourceClientState<MessageLaneAdapter<P>>, SubstrateError> {
// we can't continue to deliver confirmations if source node is out of sync, because // we can't continue to deliver confirmations if source node is out of sync, because
// it may have already received confirmations that we're going to deliver // it may have already received confirmations that we're going to deliver
self.client.ensure_synced().await?; self.source_client.ensure_synced().await?;
// we can't relay confirmations if messages pallet at source chain is halted // we can't relay confirmations if messages pallet at source chain is halted
self.ensure_pallet_active().await?; self.ensure_pallet_active().await?;
read_client_state::< read_client_state(
_, &self.source_client,
<MessageLaneAdapter<P> as MessageLane>::TargetHeaderHash, Some(&self.target_client),
<MessageLaneAdapter<P> as MessageLane>::TargetHeaderNumber, P::TargetChain::BEST_FINALIZED_HEADER_ID_METHOD,
>(&self.client, P::TargetChain::BEST_FINALIZED_HEADER_ID_METHOD) )
.await .await
} }
@@ -183,7 +188,7 @@ where
SubstrateError, SubstrateError,
> { > {
let encoded_response = self let encoded_response = self
.client .source_client
.state_call( .state_call(
P::TargetChain::TO_CHAIN_MESSAGE_DETAILS_METHOD.into(), P::TargetChain::TO_CHAIN_MESSAGE_DETAILS_METHOD.into(),
Bytes((self.lane_id, nonces.start(), nonces.end()).encode()), Bytes((self.lane_id, nonces.start(), nonces.end()).encode()),
@@ -230,7 +235,12 @@ where
)); ));
} }
let proof = self.client.prove_storage(storage_keys, id.1).await?.iter_nodes().collect(); let proof = self
.source_client
.prove_storage(storage_keys, id.1)
.await?
.iter_nodes()
.collect();
let proof = FromBridgedChainMessagesProof { let proof = FromBridgedChainMessagesProof {
bridged_header_hash: id.1, bridged_header_hash: id.1,
storage_proof: proof, storage_proof: proof,
@@ -246,10 +256,11 @@ where
_generated_at_block: TargetHeaderIdOf<MessageLaneAdapter<P>>, _generated_at_block: TargetHeaderIdOf<MessageLaneAdapter<P>>,
proof: <MessageLaneAdapter<P> as MessageLane>::MessagesReceivingProof, proof: <MessageLaneAdapter<P> as MessageLane>::MessagesReceivingProof,
) -> Result<(), SubstrateError> { ) -> Result<(), SubstrateError> {
let genesis_hash = *self.client.genesis_hash(); let genesis_hash = *self.source_client.genesis_hash();
let transaction_params = self.transaction_params.clone(); let transaction_params = self.transaction_params.clone();
let (spec_version, transaction_version) = self.client.simple_runtime_version().await?; let (spec_version, transaction_version) =
self.client self.source_client.simple_runtime_version().await?;
self.source_client
.submit_signed_extrinsic( .submit_signed_extrinsic(
self.transaction_params.signer.public().into(), self.transaction_params.signer.public().into(),
move |best_block_id, transaction_nonce| { move |best_block_id, transaction_nonce| {
@@ -278,7 +289,7 @@ where
async fn estimate_confirmation_transaction( async fn estimate_confirmation_transaction(
&self, &self,
) -> <MessageLaneAdapter<P> as MessageLane>::SourceChainBalance { ) -> <MessageLaneAdapter<P> as MessageLane>::SourceChainBalance {
let runtime_version = match self.client.runtime_version().await { let runtime_version = match self.source_client.runtime_version().await {
Ok(v) => v, Ok(v) => v,
Err(_) => return BalanceOf::<P::SourceChain>::max_value(), Err(_) => return BalanceOf::<P::SourceChain>::max_value(),
}; };
@@ -286,14 +297,14 @@ where
let dummy_tx = make_messages_delivery_proof_transaction::<P>( let dummy_tx = make_messages_delivery_proof_transaction::<P>(
runtime_version.spec_version, runtime_version.spec_version,
runtime_version.transaction_version, runtime_version.transaction_version,
self.client.genesis_hash(), self.source_client.genesis_hash(),
&self.transaction_params, &self.transaction_params,
HeaderId(Default::default(), Default::default()), HeaderId(Default::default(), Default::default()),
Zero::zero(), Zero::zero(),
prepare_dummy_messages_delivery_proof::<P::SourceChain, P::TargetChain>(), prepare_dummy_messages_delivery_proof::<P::SourceChain, P::TargetChain>(),
false, false,
)?; )?;
self.client self.source_client
.estimate_extrinsic_fee(dummy_tx) .estimate_extrinsic_fee(dummy_tx)
.await .await
.map(|fee| fee.inclusion_fee()) .map(|fee| fee.inclusion_fee())
@@ -385,19 +396,19 @@ fn prepare_dummy_messages_delivery_proof<SC: Chain, TC: Chain>(
/// This function assumes that the chain that is followed by the `self_client` has /// This function assumes that the chain that is followed by the `self_client` has
/// bridge GRANDPA pallet deployed and it provides `best_finalized_header_id_method_name` /// bridge GRANDPA pallet deployed and it provides `best_finalized_header_id_method_name`
/// runtime API to read the best finalized Bridged chain header. /// runtime API to read the best finalized Bridged chain header.
pub async fn read_client_state<SelfChain, BridgedHeaderHash, BridgedHeaderNumber>( ///
/// If `peer_client` is `None`, the value of `actual_best_finalized_peer_at_best_self` will
/// always match the `best_finalized_peer_at_best_self`.
pub async fn read_client_state<SelfChain, PeerChain>(
self_client: &Client<SelfChain>, self_client: &Client<SelfChain>,
peer_client: Option<&Client<PeerChain>>,
best_finalized_header_id_method_name: &str, best_finalized_header_id_method_name: &str,
) -> Result< ) -> Result<ClientState<HeaderIdOf<SelfChain>, HeaderIdOf<PeerChain>>, SubstrateError>
ClientState<HeaderIdOf<SelfChain>, HeaderId<BridgedHeaderHash, BridgedHeaderNumber>>,
SubstrateError,
>
where where
SelfChain: Chain, SelfChain: Chain,
SelfChain::Header: DeserializeOwned, SelfChain::Header: DeserializeOwned,
SelfChain::Index: DeserializeOwned, SelfChain::Index: DeserializeOwned,
BridgedHeaderHash: Decode, PeerChain: Chain,
BridgedHeaderNumber: Decode,
{ {
// let's read our state first: we need best finalized header hash on **this** chain // let's read our state first: we need best finalized header hash on **this** chain
let self_best_finalized_header_hash = self_client.best_finalized_header_hash().await?; let self_best_finalized_header_hash = self_client.best_finalized_header_hash().await?;
@@ -419,16 +430,27 @@ where
Some(self_best_hash), Some(self_best_hash),
) )
.await?; .await?;
let decoded_best_finalized_peer_on_self: (BridgedHeaderNumber, BridgedHeaderHash) = let decoded_best_finalized_peer_on_self: (BlockNumberOf<PeerChain>, HashOf<PeerChain>) =
Decode::decode(&mut &encoded_best_finalized_peer_on_self.0[..]) Decode::decode(&mut &encoded_best_finalized_peer_on_self.0[..])
.map_err(SubstrateError::ResponseParseFailed)?; .map_err(SubstrateError::ResponseParseFailed)?;
let peer_on_self_best_finalized_id = let peer_on_self_best_finalized_id =
HeaderId(decoded_best_finalized_peer_on_self.0, decoded_best_finalized_peer_on_self.1); HeaderId(decoded_best_finalized_peer_on_self.0, decoded_best_finalized_peer_on_self.1);
// read actual header, matching the `peer_on_self_best_finalized_id` from the peer chain
let actual_peer_on_self_best_finalized_id = match peer_client {
Some(peer_client) => {
let actual_peer_on_self_best_finalized =
peer_client.header_by_number(peer_on_self_best_finalized_id.0).await?;
HeaderId(peer_on_self_best_finalized_id.0, actual_peer_on_self_best_finalized.hash())
},
None => peer_on_self_best_finalized_id.clone(),
};
Ok(ClientState { Ok(ClientState {
best_self: self_best_id, best_self: self_best_id,
best_finalized_self: self_best_finalized_id, best_finalized_self: self_best_finalized_id,
best_finalized_peer_at_best_self: peer_on_self_best_finalized_id, best_finalized_peer_at_best_self: peer_on_self_best_finalized_id,
actual_best_finalized_peer_at_best_self: actual_peer_on_self_best_finalized_id,
}) })
} }
@@ -57,7 +57,8 @@ pub type SubstrateMessagesDeliveryProof<C> =
/// Substrate client as Substrate messages target. /// Substrate client as Substrate messages target.
pub struct SubstrateMessagesTarget<P: SubstrateMessageLane> { pub struct SubstrateMessagesTarget<P: SubstrateMessageLane> {
client: Client<P::TargetChain>, target_client: Client<P::TargetChain>,
source_client: Client<P::SourceChain>,
lane_id: LaneId, lane_id: LaneId,
relayer_id_at_source: AccountIdOf<P::SourceChain>, relayer_id_at_source: AccountIdOf<P::SourceChain>,
transaction_params: TransactionParams<AccountKeyPairOf<P::TargetTransactionSignScheme>>, transaction_params: TransactionParams<AccountKeyPairOf<P::TargetTransactionSignScheme>>,
@@ -68,7 +69,8 @@ pub struct SubstrateMessagesTarget<P: SubstrateMessageLane> {
impl<P: SubstrateMessageLane> SubstrateMessagesTarget<P> { impl<P: SubstrateMessageLane> SubstrateMessagesTarget<P> {
/// Create new Substrate headers target. /// Create new Substrate headers target.
pub fn new( pub fn new(
client: Client<P::TargetChain>, target_client: Client<P::TargetChain>,
source_client: Client<P::SourceChain>,
lane_id: LaneId, lane_id: LaneId,
relayer_id_at_source: AccountIdOf<P::SourceChain>, relayer_id_at_source: AccountIdOf<P::SourceChain>,
transaction_params: TransactionParams<AccountKeyPairOf<P::TargetTransactionSignScheme>>, transaction_params: TransactionParams<AccountKeyPairOf<P::TargetTransactionSignScheme>>,
@@ -76,7 +78,8 @@ impl<P: SubstrateMessageLane> SubstrateMessagesTarget<P> {
source_to_target_headers_relay: Option<OnDemandHeadersRelay<P::SourceChain>>, source_to_target_headers_relay: Option<OnDemandHeadersRelay<P::SourceChain>>,
) -> Self { ) -> Self {
SubstrateMessagesTarget { SubstrateMessagesTarget {
client, target_client,
source_client,
lane_id, lane_id,
relayer_id_at_source, relayer_id_at_source,
transaction_params, transaction_params,
@@ -90,7 +93,7 @@ impl<P: SubstrateMessageLane> SubstrateMessagesTarget<P> {
&self, &self,
id: TargetHeaderIdOf<MessageLaneAdapter<P>>, id: TargetHeaderIdOf<MessageLaneAdapter<P>>,
) -> Result<Option<InboundLaneData<AccountIdOf<P::SourceChain>>>, SubstrateError> { ) -> Result<Option<InboundLaneData<AccountIdOf<P::SourceChain>>>, SubstrateError> {
self.client self.target_client
.storage_value( .storage_value(
inbound_lane_data_key( inbound_lane_data_key(
P::SourceChain::WITH_CHAIN_MESSAGES_PALLET_NAME, P::SourceChain::WITH_CHAIN_MESSAGES_PALLET_NAME,
@@ -103,14 +106,15 @@ impl<P: SubstrateMessageLane> SubstrateMessagesTarget<P> {
/// Ensure that the messages pallet at target chain is active. /// Ensure that the messages pallet at target chain is active.
async fn ensure_pallet_active(&self) -> Result<(), SubstrateError> { async fn ensure_pallet_active(&self) -> Result<(), SubstrateError> {
ensure_messages_pallet_active::<P::TargetChain, P::SourceChain>(&self.client).await ensure_messages_pallet_active::<P::TargetChain, P::SourceChain>(&self.target_client).await
} }
} }
impl<P: SubstrateMessageLane> Clone for SubstrateMessagesTarget<P> { impl<P: SubstrateMessageLane> Clone for SubstrateMessagesTarget<P> {
fn clone(&self) -> Self { fn clone(&self) -> Self {
Self { Self {
client: self.client.clone(), target_client: self.target_client.clone(),
source_client: self.source_client.clone(),
lane_id: self.lane_id, lane_id: self.lane_id,
relayer_id_at_source: self.relayer_id_at_source.clone(), relayer_id_at_source: self.relayer_id_at_source.clone(),
transaction_params: self.transaction_params.clone(), transaction_params: self.transaction_params.clone(),
@@ -125,7 +129,8 @@ impl<P: SubstrateMessageLane> RelayClient for SubstrateMessagesTarget<P> {
type Error = SubstrateError; type Error = SubstrateError;
async fn reconnect(&mut self) -> Result<(), SubstrateError> { async fn reconnect(&mut self) -> Result<(), SubstrateError> {
self.client.reconnect().await self.target_client.reconnect().await?;
self.source_client.reconnect().await
} }
} }
@@ -140,15 +145,15 @@ where
async fn state(&self) -> Result<TargetClientState<MessageLaneAdapter<P>>, SubstrateError> { async fn state(&self) -> Result<TargetClientState<MessageLaneAdapter<P>>, SubstrateError> {
// we can't continue to deliver messages if target node is out of sync, because // we can't continue to deliver messages if target node is out of sync, because
// it may have already received (some of) messages that we're going to deliver // it may have already received (some of) messages that we're going to deliver
self.client.ensure_synced().await?; self.target_client.ensure_synced().await?;
// we can't relay messages if messages pallet at target chain is halted // we can't relay messages if messages pallet at target chain is halted
self.ensure_pallet_active().await?; self.ensure_pallet_active().await?;
read_client_state::< read_client_state(
_, &self.target_client,
<MessageLaneAdapter<P> as MessageLane>::SourceHeaderHash, Some(&self.source_client),
<MessageLaneAdapter<P> as MessageLane>::SourceHeaderNumber, P::SourceChain::BEST_FINALIZED_HEADER_ID_METHOD,
>(&self.client, P::SourceChain::BEST_FINALIZED_HEADER_ID_METHOD) )
.await .await
} }
@@ -184,7 +189,7 @@ where
) -> Result<(TargetHeaderIdOf<MessageLaneAdapter<P>>, UnrewardedRelayersState), SubstrateError> ) -> Result<(TargetHeaderIdOf<MessageLaneAdapter<P>>, UnrewardedRelayersState), SubstrateError>
{ {
let encoded_response = self let encoded_response = self
.client .target_client
.state_call( .state_call(
P::SourceChain::FROM_CHAIN_UNREWARDED_RELAYERS_STATE.into(), P::SourceChain::FROM_CHAIN_UNREWARDED_RELAYERS_STATE.into(),
Bytes(self.lane_id.encode()), Bytes(self.lane_id.encode()),
@@ -213,7 +218,7 @@ where
&self.lane_id, &self.lane_id,
); );
let proof = self let proof = self
.client .target_client
.prove_storage(vec![inbound_data_key], id.1) .prove_storage(vec![inbound_data_key], id.1)
.await? .await?
.iter_nodes() .iter_nodes()
@@ -232,12 +237,13 @@ where
nonces: RangeInclusive<MessageNonce>, nonces: RangeInclusive<MessageNonce>,
proof: <MessageLaneAdapter<P> as MessageLane>::MessagesProof, proof: <MessageLaneAdapter<P> as MessageLane>::MessagesProof,
) -> Result<RangeInclusive<MessageNonce>, SubstrateError> { ) -> Result<RangeInclusive<MessageNonce>, SubstrateError> {
let genesis_hash = *self.client.genesis_hash(); let genesis_hash = *self.target_client.genesis_hash();
let transaction_params = self.transaction_params.clone(); let transaction_params = self.transaction_params.clone();
let relayer_id_at_source = self.relayer_id_at_source.clone(); let relayer_id_at_source = self.relayer_id_at_source.clone();
let nonces_clone = nonces.clone(); let nonces_clone = nonces.clone();
let (spec_version, transaction_version) = self.client.simple_runtime_version().await?; let (spec_version, transaction_version) =
self.client self.target_client.simple_runtime_version().await?;
self.target_client
.submit_signed_extrinsic( .submit_signed_extrinsic(
self.transaction_params.signer.public().into(), self.transaction_params.signer.public().into(),
move |best_block_id, transaction_nonce| { move |best_block_id, transaction_nonce| {
@@ -281,12 +287,13 @@ where
)) ))
})?; })?;
let (spec_version, transaction_version) = self.client.simple_runtime_version().await?; let (spec_version, transaction_version) =
self.target_client.simple_runtime_version().await?;
// Prepare 'dummy' delivery transaction - we only care about its length and dispatch weight. // Prepare 'dummy' delivery transaction - we only care about its length and dispatch weight.
let delivery_tx = make_messages_delivery_transaction::<P>( let delivery_tx = make_messages_delivery_transaction::<P>(
spec_version, spec_version,
transaction_version, transaction_version,
self.client.genesis_hash(), self.target_client.genesis_hash(),
&self.transaction_params, &self.transaction_params,
HeaderId(Default::default(), Default::default()), HeaderId(Default::default(), Default::default()),
Zero::zero(), Zero::zero(),
@@ -299,7 +306,7 @@ where
), ),
false, false,
)?; )?;
let delivery_tx_fee = self.client.estimate_extrinsic_fee(delivery_tx).await?; let delivery_tx_fee = self.target_client.estimate_extrinsic_fee(delivery_tx).await?;
let inclusion_fee_in_target_tokens = delivery_tx_fee.inclusion_fee(); let inclusion_fee_in_target_tokens = delivery_tx_fee.inclusion_fee();
// The pre-dispatch cost of delivery transaction includes additional fee to cover dispatch // The pre-dispatch cost of delivery transaction includes additional fee to cover dispatch
@@ -321,12 +328,13 @@ where
let expected_refund_in_target_tokens = if total_prepaid_nonces != 0 { let expected_refund_in_target_tokens = if total_prepaid_nonces != 0 {
const WEIGHT_DIFFERENCE: Weight = 100; const WEIGHT_DIFFERENCE: Weight = 100;
let (spec_version, transaction_version) = self.client.simple_runtime_version().await?; let (spec_version, transaction_version) =
self.target_client.simple_runtime_version().await?;
let larger_dispatch_weight = total_dispatch_weight.saturating_add(WEIGHT_DIFFERENCE); let larger_dispatch_weight = total_dispatch_weight.saturating_add(WEIGHT_DIFFERENCE);
let dummy_tx = make_messages_delivery_transaction::<P>( let dummy_tx = make_messages_delivery_transaction::<P>(
spec_version, spec_version,
transaction_version, transaction_version,
self.client.genesis_hash(), self.target_client.genesis_hash(),
&self.transaction_params, &self.transaction_params,
HeaderId(Default::default(), Default::default()), HeaderId(Default::default(), Default::default()),
Zero::zero(), Zero::zero(),
@@ -339,7 +347,8 @@ where
), ),
false, false,
)?; )?;
let larger_delivery_tx_fee = self.client.estimate_extrinsic_fee(dummy_tx).await?; let larger_delivery_tx_fee =
self.target_client.estimate_extrinsic_fee(dummy_tx).await?;
compute_prepaid_messages_refund::<P::TargetChain>( compute_prepaid_messages_refund::<P::TargetChain>(
total_prepaid_nonces, total_prepaid_nonces,
@@ -382,16 +382,20 @@ where
From<<AccountKeyPairOf<P::TransactionSignScheme> as sp_core::Pair>::Public>, From<<AccountKeyPairOf<P::TransactionSignScheme> as sp_core::Pair>::Public>,
P::TransactionSignScheme: TransactionSignScheme<Chain = P::TargetChain>, P::TransactionSignScheme: TransactionSignScheme<Chain = P::TargetChain>,
{ {
finality_target.best_finalized_source_block_number().await.map_err(|error| { finality_target
log::error!( .best_finalized_source_block_id()
target: "bridge", .await
"Failed to read best finalized source header from target in {} relay: {:?}", .map_err(|error| {
relay_task_name, log::error!(
error, target: "bridge",
); "Failed to read best finalized source header from target in {} relay: {:?}",
relay_task_name,
error,
);
error error
}) })
.map(|id| id.0)
} }
/// Read first mandatory header in given inclusive range. /// Read first mandatory header in given inclusive range.
+1
View File
@@ -18,6 +18,7 @@ parking_lot = "0.11.0"
bp-messages = { path = "../../primitives/messages" } bp-messages = { path = "../../primitives/messages" }
bp-runtime = { path = "../../primitives/runtime" } bp-runtime = { path = "../../primitives/runtime" }
finality-relay = { path = "../finality" }
relay-utils = { path = "../utils" } relay-utils = { path = "../utils" }
sp-arithmetic = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-arithmetic = { git = "https://github.com/paritytech/substrate", branch = "master" }
@@ -233,6 +233,9 @@ pub struct ClientState<SelfHeaderId, PeerHeaderId> {
/// Best finalized header id of the peer chain read at the best block of this chain (at /// Best finalized header id of the peer chain read at the best block of this chain (at
/// `best_finalized_self`). /// `best_finalized_self`).
pub best_finalized_peer_at_best_self: PeerHeaderId, pub best_finalized_peer_at_best_self: PeerHeaderId,
/// Header id of the peer chain with the number, matching the
/// `best_finalized_peer_at_best_self`.
pub actual_best_finalized_peer_at_best_self: PeerHeaderId,
} }
/// State of source client in one-way message lane. /// State of source client in one-way message lane.
@@ -843,12 +846,14 @@ pub(crate) mod tests {
best_self: HeaderId(0, 0), best_self: HeaderId(0, 0),
best_finalized_self: HeaderId(0, 0), best_finalized_self: HeaderId(0, 0),
best_finalized_peer_at_best_self: HeaderId(0, 0), best_finalized_peer_at_best_self: HeaderId(0, 0),
actual_best_finalized_peer_at_best_self: HeaderId(0, 0),
}, },
source_latest_generated_nonce: 1, source_latest_generated_nonce: 1,
target_state: ClientState { target_state: ClientState {
best_self: HeaderId(0, 0), best_self: HeaderId(0, 0),
best_finalized_self: HeaderId(0, 0), best_finalized_self: HeaderId(0, 0),
best_finalized_peer_at_best_self: HeaderId(0, 0), best_finalized_peer_at_best_self: HeaderId(0, 0),
actual_best_finalized_peer_at_best_self: HeaderId(0, 0),
}, },
target_latest_received_nonce: 0, target_latest_received_nonce: 0,
..Default::default() ..Default::default()
@@ -888,12 +893,14 @@ pub(crate) mod tests {
best_self: HeaderId(10, 10), best_self: HeaderId(10, 10),
best_finalized_self: HeaderId(10, 10), best_finalized_self: HeaderId(10, 10),
best_finalized_peer_at_best_self: HeaderId(0, 0), best_finalized_peer_at_best_self: HeaderId(0, 0),
actual_best_finalized_peer_at_best_self: HeaderId(0, 0),
}, },
source_latest_generated_nonce: 10, source_latest_generated_nonce: 10,
target_state: ClientState { target_state: ClientState {
best_self: HeaderId(0, 0), best_self: HeaderId(0, 0),
best_finalized_self: HeaderId(0, 0), best_finalized_self: HeaderId(0, 0),
best_finalized_peer_at_best_self: HeaderId(0, 0), best_finalized_peer_at_best_self: HeaderId(0, 0),
actual_best_finalized_peer_at_best_self: HeaderId(0, 0),
}, },
target_latest_received_nonce: 0, target_latest_received_nonce: 0,
..Default::default() ..Default::default()
+33 -20
View File
@@ -22,6 +22,7 @@ use crate::{
}; };
use bp_messages::MessageNonce; use bp_messages::MessageNonce;
use finality_relay::SyncLoopMetrics;
use relay_utils::metrics::{ use relay_utils::metrics::{
metric_name, register, GaugeVec, Metric, Opts, PrometheusError, Registry, U64, metric_name, register, GaugeVec, Metric, Opts, PrometheusError, Registry, U64,
}; };
@@ -31,8 +32,10 @@ use relay_utils::metrics::{
/// Cloning only clones references. /// Cloning only clones references.
#[derive(Clone)] #[derive(Clone)]
pub struct MessageLaneLoopMetrics { pub struct MessageLaneLoopMetrics {
/// Best finalized block numbers - "source", "source_at_target", "target_at_source".
source_to_target_finality_metrics: SyncLoopMetrics,
/// Best finalized block numbers - "source", "target", "source_at_target", "target_at_source". /// Best finalized block numbers - "source", "target", "source_at_target", "target_at_source".
best_block_numbers: GaugeVec<U64>, target_to_source_finality_metrics: SyncLoopMetrics,
/// Lane state nonces: "source_latest_generated", "source_latest_confirmed", /// Lane state nonces: "source_latest_generated", "source_latest_confirmed",
/// "target_latest_received", "target_latest_confirmed". /// "target_latest_received", "target_latest_confirmed".
lane_state_nonces: GaugeVec<U64>, lane_state_nonces: GaugeVec<U64>,
@@ -42,12 +45,15 @@ impl MessageLaneLoopMetrics {
/// Create and register messages loop metrics. /// Create and register messages loop metrics.
pub fn new(prefix: Option<&str>) -> Result<Self, PrometheusError> { pub fn new(prefix: Option<&str>) -> Result<Self, PrometheusError> {
Ok(MessageLaneLoopMetrics { Ok(MessageLaneLoopMetrics {
best_block_numbers: GaugeVec::new( source_to_target_finality_metrics: SyncLoopMetrics::new(
Opts::new( prefix,
metric_name(prefix, "best_block_numbers"), "source",
"Best finalized block numbers", "source_at_target",
), )?,
&["type"], target_to_source_finality_metrics: SyncLoopMetrics::new(
prefix,
"target",
"target_at_source",
)?, )?,
lane_state_nonces: GaugeVec::new( lane_state_nonces: GaugeVec::new(
Opts::new(metric_name(prefix, "lane_state_nonces"), "Nonces of the lane state"), Opts::new(metric_name(prefix, "lane_state_nonces"), "Nonces of the lane state"),
@@ -58,22 +64,28 @@ impl MessageLaneLoopMetrics {
/// Update source client state metrics. /// Update source client state metrics.
pub fn update_source_state<P: MessageLane>(&self, source_client_state: SourceClientState<P>) { pub fn update_source_state<P: MessageLane>(&self, source_client_state: SourceClientState<P>) {
self.best_block_numbers self.source_to_target_finality_metrics
.with_label_values(&["source"]) .update_best_block_at_source(source_client_state.best_self.0.into());
.set(source_client_state.best_self.0.into()); self.target_to_source_finality_metrics.update_best_block_at_target(
self.best_block_numbers source_client_state.best_finalized_peer_at_best_self.0.into(),
.with_label_values(&["target_at_source"]) );
.set(source_client_state.best_finalized_peer_at_best_self.0.into()); self.target_to_source_finality_metrics.update_using_same_fork(
source_client_state.best_finalized_peer_at_best_self.1 ==
source_client_state.actual_best_finalized_peer_at_best_self.1,
);
} }
/// Update target client state metrics. /// Update target client state metrics.
pub fn update_target_state<P: MessageLane>(&self, target_client_state: TargetClientState<P>) { pub fn update_target_state<P: MessageLane>(&self, target_client_state: TargetClientState<P>) {
self.best_block_numbers self.target_to_source_finality_metrics
.with_label_values(&["target"]) .update_best_block_at_source(target_client_state.best_self.0.into());
.set(target_client_state.best_self.0.into()); self.source_to_target_finality_metrics.update_best_block_at_target(
self.best_block_numbers target_client_state.best_finalized_peer_at_best_self.0.into(),
.with_label_values(&["source_at_target"]) );
.set(target_client_state.best_finalized_peer_at_best_self.0.into()); self.source_to_target_finality_metrics.update_using_same_fork(
target_client_state.best_finalized_peer_at_best_self.1 ==
target_client_state.actual_best_finalized_peer_at_best_self.1,
);
} }
/// Update latest generated nonce at source. /// Update latest generated nonce at source.
@@ -119,7 +131,8 @@ impl MessageLaneLoopMetrics {
impl Metric for MessageLaneLoopMetrics { impl Metric for MessageLaneLoopMetrics {
fn register(&self, registry: &Registry) -> Result<(), PrometheusError> { fn register(&self, registry: &Registry) -> Result<(), PrometheusError> {
register(self.best_block_numbers.clone(), registry)?; self.source_to_target_finality_metrics.register(registry)?;
self.target_to_source_finality_metrics.register(registry)?;
register(self.lane_state_nonces.clone(), registry)?; register(self.lane_state_nonces.clone(), registry)?;
Ok(()) Ok(())
} }
+3 -1
View File
@@ -18,7 +18,7 @@ pub use float_json_value::FloatJsonValueMetric;
pub use global::GlobalMetrics; pub use global::GlobalMetrics;
pub use substrate_prometheus_endpoint::{ pub use substrate_prometheus_endpoint::{
prometheus::core::{Atomic, Collector}, prometheus::core::{Atomic, Collector},
register, Counter, CounterVec, Gauge, GaugeVec, Opts, PrometheusError, Registry, F64, U64, register, Counter, CounterVec, Gauge, GaugeVec, Opts, PrometheusError, Registry, F64, I64, U64,
}; };
use async_std::sync::{Arc, RwLock}; use async_std::sync::{Arc, RwLock};
@@ -30,6 +30,8 @@ mod global;
/// Shared reference to `f64` value that is updated by the metric. /// Shared reference to `f64` value that is updated by the metric.
pub type F64SharedRef = Arc<RwLock<Option<f64>>>; pub type F64SharedRef = Arc<RwLock<Option<f64>>>;
/// Int gauge metric type.
pub type IntGauge = Gauge<U64>;
/// Unparsed address that needs to be used to expose Prometheus metrics. /// Unparsed address that needs to be used to expose Prometheus metrics.
#[derive(Debug, Clone)] #[derive(Debug, Clone)]