Relay Millau && Rialto headers using (future) finality verifier API (#761)

* finality proofs relay

* SyncHeader::is_mandatory

* empty ancestry proof

* logs

* fixed submit condition

* fixed wrong split index

* tick comment

* recent_finality_proofs

* basic finality loop tests

* removed obsolete files

* rename files in substrate relay

* fmt

* clippy

* fixed TODOs

* clippy

* stop syncing if target node is out of sync

* more clippy

* more clippy

* Update relays/finality-relay/src/finality_loop.rs

Co-authored-by: Hernando Castano <HCastano@users.noreply.github.com>

* Update relays/finality-relay/src/finality_loop.rs

Co-authored-by: Hernando Castano <HCastano@users.noreply.github.com>

* Update relays/finality-relay/src/finality_loop.rs

Co-authored-by: Hernando Castano <HCastano@users.noreply.github.com>

* docs

* moved doc

* typo

* Update relays/finality-relay/src/finality_loop_tests.rs

Co-authored-by: Hernando Castano <HCastano@users.noreply.github.com>

* Update relays/finality-relay/src/finality_loop_tests.rs

Co-authored-by: Hernando Castano <HCastano@users.noreply.github.com>

* header_and_finality_proof_by_number -> header_and_finality_proof

* VecDeque isn't required (because of make_contiguous)

* fixed wrong expect

* Update relays/finality-relay/src/finality_loop.rs

Co-authored-by: Hernando Castano <HCastano@users.noreply.github.com>

* Update relays/substrate/src/rialto_headers_to_millau.rs

Co-authored-by: Hernando Castano <HCastano@users.noreply.github.com>

* Update relays/substrate/src/rialto_headers_to_millau.rs

Co-authored-by: Hernando Castano <HCastano@users.noreply.github.com>

* RialtoSyncHeader

* Update relays/finality-relay/src/finality_loop.rs

Co-authored-by: Tomasz Drwięga <tomusdrw@users.noreply.github.com>

* Update relays/finality-relay/src/finality_loop.rs

Co-authored-by: Tomasz Drwięga <tomusdrw@users.noreply.github.com>

* removed wrong comment

* Update relays/finality-relay/src/finality_loop.rs

Co-authored-by: Tomasz Drwięga <tomusdrw@users.noreply.github.com>

* fix used runtime methods names

* fix for new jsonrpsee

* fix comment

* initialize finality verifier pallet

* fmt

Co-authored-by: Hernando Castano <HCastano@users.noreply.github.com>
Co-authored-by: Tomasz Drwięga <tomusdrw@users.noreply.github.com>
This commit is contained in:
Svyatoslav Nikolsky
2021-03-08 13:18:53 +03:00
committed by Bastian Köcher
parent e13ff320ea
commit f87053c1cb
31 changed files with 1490 additions and 944 deletions
+1
View File
@@ -61,6 +61,7 @@ pub use frame_support::{
pub use frame_system::Call as SystemCall;
pub use pallet_balances::Call as BalancesCall;
pub use pallet_finality_verifier::Call as FinalityBridgeRialtoCall;
pub use pallet_message_lane::Call as MessageLaneCall;
pub use pallet_substrate_bridge::Call as BridgeRialtoCall;
pub use pallet_sudo::Call as SudoCall;
+1
View File
@@ -69,6 +69,7 @@ pub use frame_system::Call as SystemCall;
pub use pallet_balances::Call as BalancesCall;
pub use pallet_bridge_currency_exchange::Call as BridgeCurrencyExchangeCall;
pub use pallet_bridge_eth_poa::Call as BridgeEthPoACall;
pub use pallet_finality_verifier::Call as FinalityBridgeMillauCall;
pub use pallet_message_lane::Call as MessageLaneCall;
pub use pallet_substrate_bridge::Call as BridgeMillauCall;
pub use pallet_sudo::Call as SudoCall;
+1 -1
View File
@@ -417,7 +417,7 @@ where
} else {
// We don't have a scheduled change in storage at the moment. Let's check if the current
// header signals an authority set change.
if let Some(change) = verifier::find_scheduled_change(&header) {
if let Some(change) = bp_header_chain::find_grandpa_authorities_scheduled_change(&header) {
let next_set = AuthoritySet {
authorities: change.next_authorities,
set_id: storage.current_authority_set().set_id + 1,
+3 -18
View File
@@ -25,10 +25,8 @@
use crate::storage::{ImportedHeader, ScheduledChange};
use crate::BridgeStorage;
use bp_header_chain::{justification::verify_justification, AuthoritySet};
use bp_header_chain::{find_grandpa_authorities_scheduled_change, justification::verify_justification, AuthoritySet};
use finality_grandpa::voter_set::VoterSet;
use sp_finality_grandpa::{ConsensusLog, GRANDPA_ENGINE_ID};
use sp_runtime::generic::OpaqueDigestItemId;
use sp_runtime::traits::{CheckedAdd, Header as HeaderT, One};
use sp_runtime::RuntimeDebug;
use sp_std::{prelude::Vec, vec};
@@ -142,7 +140,7 @@ where
// time. While this is not strictly true of GRANDPA (it can have multiple pending changes,
// even across forks), this assumption simplifies our tracking of authority set changes.
let mut signal_hash = parent_header.signal_hash;
let scheduled_change = find_scheduled_change(&header);
let scheduled_change = find_grandpa_authorities_scheduled_change(&header);
// Check if our fork is expecting an authority set change
let requires_justification = if let Some(hash) = signal_hash {
@@ -339,19 +337,6 @@ where
Some(ancestors)
}
pub(crate) fn find_scheduled_change<H: HeaderT>(header: &H) -> Option<sp_finality_grandpa::ScheduledChange<H::Number>> {
let id = OpaqueDigestItemId::Consensus(&GRANDPA_ENGINE_ID);
let filter_log = |log: ConsensusLog<H::Number>| match log {
ConsensusLog::ScheduledChange(change) => Some(change),
_ => None,
};
// find the first consensus digest with the right ID which converts to
// the right kind of consensus log.
header.digest().convert_first(|l| l.try_to(id).and_then(filter_log))
}
#[cfg(test)]
mod tests {
use super::*;
@@ -361,7 +346,7 @@ mod tests {
use codec::Encode;
use frame_support::{assert_err, assert_ok};
use frame_support::{StorageMap, StorageValue};
use sp_finality_grandpa::{AuthorityId, SetId};
use sp_finality_grandpa::{AuthorityId, ConsensusLog, SetId, GRANDPA_ENGINE_ID};
use sp_runtime::{Digest, DigestItem};
fn schedule_next_change(
+18 -2
View File
@@ -26,9 +26,9 @@ use core::default::Default;
use core::fmt::Debug;
#[cfg(feature = "std")]
use serde::{Deserialize, Serialize};
use sp_finality_grandpa::{AuthorityList, SetId};
use sp_runtime::traits::Header as HeaderT;
use sp_finality_grandpa::{AuthorityList, ConsensusLog, SetId, GRANDPA_ENGINE_ID};
use sp_runtime::RuntimeDebug;
use sp_runtime::{generic::OpaqueDigestItemId, traits::Header as HeaderT};
use sp_std::vec::Vec;
pub mod justification;
@@ -140,6 +140,22 @@ impl<H: HeaderT> AncestryChecker<H, Vec<H>> for LinearAncestryChecker {
}
}
/// Find header digest that schedules next GRANDPA authorities set.
pub fn find_grandpa_authorities_scheduled_change<H: HeaderT>(
header: &H,
) -> Option<sp_finality_grandpa::ScheduledChange<H::Number>> {
let id = OpaqueDigestItemId::Consensus(&GRANDPA_ENGINE_ID);
let filter_log = |log: ConsensusLog<H::Number>| match log {
ConsensusLog::ScheduledChange(change) => Some(change),
_ => None,
};
// find the first consensus digest with the right ID which converts to
// the right kind of consensus log.
header.digest().convert_first(|l| l.try_to(id).and_then(filter_log))
}
#[cfg(test)]
mod tests {
use super::*;
+1 -12
View File
@@ -229,19 +229,8 @@ pub fn max_extrinsic_size() -> u32 {
*BlockLength::get().max.get(DispatchClass::Normal)
}
/// Name of the `MillauHeaderApi::best_block` runtime method.
pub const BEST_MILLAU_BLOCKS_METHOD: &str = "MillauHeaderApi_best_blocks";
/// Name of the `MillauHeaderApi::finalized_block` runtime method.
pub const FINALIZED_MILLAU_BLOCK_METHOD: &str = "MillauHeaderApi_finalized_block";
/// Name of the `MillauHeaderApi::is_known_block` runtime method.
pub const IS_KNOWN_MILLAU_BLOCK_METHOD: &str = "MillauHeaderApi_is_known_block";
/// Name of the `MillauHeaderApi::incomplete_headers` runtime method.
pub const INCOMPLETE_MILLAU_HEADERS_METHOD: &str = "MillauHeaderApi_incomplete_headers";
/// Name of the `RialtoFinalityApi::best_finalized` runtime method.
/// Name of the `MillauFinalityApi::best_finalized` runtime method.
pub const BEST_FINALIZED_MILLAU_HEADER_METHOD: &str = "MillauFinalityApi_best_finalized";
/// Name of the `RialtoFinalityApi::is_known_header` runtime method.
pub const IS_KNOW_MILLAU_HEADER_METHOD: &str = "MillauFinalityApi_is_known_header";
/// Name of the `ToMillauOutboundLaneApi::estimate_message_delivery_and_dispatch_fee` runtime method.
pub const TO_MILLAU_ESTIMATE_MESSAGE_FEE_METHOD: &str =
-11
View File
@@ -190,19 +190,8 @@ pub fn max_extrinsic_size() -> u32 {
*BlockLength::get().max.get(DispatchClass::Normal)
}
/// Name of the `RialtoHeaderApi::best_blocks` runtime method.
pub const BEST_RIALTO_BLOCKS_METHOD: &str = "RialtoHeaderApi_best_blocks";
/// Name of the `RialtoHeaderApi::finalized_block` runtime method.
pub const FINALIZED_RIALTO_BLOCK_METHOD: &str = "RialtoHeaderApi_finalized_block";
/// Name of the `RialtoHeaderApi::is_known_block` runtime method.
pub const IS_KNOWN_RIALTO_BLOCK_METHOD: &str = "RialtoHeaderApi_is_known_block";
/// Name of the `RialtoHeaderApi::incomplete_headers` runtime method.
pub const INCOMPLETE_RIALTO_HEADERS_METHOD: &str = "RialtoHeaderApi_incomplete_headers";
/// Name of the `RialtoFinalityApi::best_finalized` runtime method.
pub const BEST_FINALIZED_RIALTO_HEADER_METHOD: &str = "RialtoFinalityApi_best_finalized";
/// Name of the `RialtoFinalityApi::is_known_header` runtime method.
pub const IS_KNOW_RIALTO_HEADER_METHOD: &str = "RialtoFinalityApi_is_known_header";
/// Name of the `ToRialtoOutboundLaneApi::estimate_message_delivery_and_dispatch_fee` runtime method.
pub const TO_RIALTO_ESTIMATE_MESSAGE_FEE_METHOD: &str =
+20
View File
@@ -0,0 +1,20 @@
[package]
name = "finality-relay"
version = "0.1.0"
authors = ["Parity Technologies <admin@parity.io>"]
edition = "2018"
license = "GPL-3.0-or-later WITH Classpath-exception-2.0"
description = "Finality proofs relay"
[dependencies]
async-std = "1.6.5"
async-trait = "0.1.40"
backoff = "0.2"
futures = "0.3.5"
headers-relay = { path = "../headers-relay" }
log = "0.4.11"
num-traits = "0.2"
relay-utils = { path = "../utils" }
[dev-dependencies]
parking_lot = "0.11.0"
@@ -0,0 +1,581 @@
// Copyright 2019-2021 Parity Technologies (UK) Ltd.
// This file is part of Parity Bridges Common.
// Parity Bridges Common is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Bridges Common is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
//! The loop basically reads all missing headers and their finality proofs from the source client.
//! The proof for the best possible header is then submitted to the target node. The only exception
//! is the mandatory headers, which we always submit to the target node. For such headers, we
//! assume that the persistent proof either exists, or will eventually become available.
use crate::{FinalityProof, FinalitySyncPipeline, SourceHeader};
use async_trait::async_trait;
use backoff::backoff::Backoff;
use futures::{select, Future, FutureExt, Stream, StreamExt};
use headers_relay::sync_loop_metrics::SyncLoopMetrics;
use num_traits::{One, Saturating};
use relay_utils::{
metrics::{start as metrics_start, GlobalMetrics, MetricsParams},
relay_loop::Client as RelayClient,
retry_backoff, FailedClient, MaybeConnectionError,
};
use std::{
pin::Pin,
time::{Duration, Instant},
};
/// Finality proof synchronization loop parameters.
#[derive(Debug, Clone)]
pub struct FinalitySyncParams {
/// Interval at which we check updates on both clients. Normally should be larger than
/// `min(source_block_time, target_block_time)`.
///
/// This parameter may be used to limit transactions rate. Increase the value && you'll get
/// infrequent updates => sparse headers => potential slow down of bridge applications, but pallet storage
/// won't be super large. Decrease the value to near `source_block_time` and you'll get
/// transaction for (almost) every block of the source chain => all source headers will be known
/// to the target chain => bridge applications will run faster, but pallet storage may explode
/// (but if pruning is there, then it's fine).
pub tick: Duration,
/// Number of finality proofs to keep in internal buffer between loop wakeups.
///
/// While in "major syncing" state, we still read finality proofs from the stream. They're stored
/// in the internal buffer between loop wakeups. When we're close to the tip of the chain, we may
/// meet finality delays if headers are not finalized frequently. So instead of waiting for next
/// finality proof to appear in the stream, we may use existing proof from that buffer.
pub recent_finality_proofs_limit: usize,
/// Timeout before we treat our transactions as lost and restart the whole sync process.
pub stall_timeout: Duration,
}
/// Source client used in finality synchronization loop.
#[async_trait]
pub trait SourceClient<P: FinalitySyncPipeline>: RelayClient {
/// Stream of new finality proofs. The stream is allowed to miss proofs for some
/// headers, even if those headers are mandatory.
type FinalityProofsStream: Stream<Item = P::FinalityProof>;
/// Get best finalized block number.
async fn best_finalized_block_number(&self) -> Result<P::Number, Self::Error>;
/// Get canonical header and its finality proof by number.
async fn header_and_finality_proof(
&self,
number: P::Number,
) -> Result<(P::Header, Option<P::FinalityProof>), Self::Error>;
/// Subscribe to new finality proofs.
async fn finality_proofs(&self) -> Result<Self::FinalityProofsStream, Self::Error>;
}
/// Target client used in finality synchronization loop.
#[async_trait]
pub trait TargetClient<P: FinalitySyncPipeline>: RelayClient {
/// Get best finalized source block number.
async fn best_finalized_source_block_number(&self) -> Result<P::Number, Self::Error>;
/// Submit header finality proof.
async fn submit_finality_proof(&self, header: P::Header, proof: P::FinalityProof) -> Result<(), Self::Error>;
}
/// Run finality proofs synchronization loop.
pub fn run<P: FinalitySyncPipeline>(
source_client: impl SourceClient<P>,
target_client: impl TargetClient<P>,
sync_params: FinalitySyncParams,
metrics_params: Option<MetricsParams>,
exit_signal: impl Future<Output = ()>,
) {
let exit_signal = exit_signal.shared();
let metrics_global = GlobalMetrics::default();
let metrics_sync = SyncLoopMetrics::default();
let metrics_enabled = metrics_params.is_some();
metrics_start(
format!("{}_to_{}_Sync", P::SOURCE_NAME, P::TARGET_NAME),
metrics_params,
&metrics_global,
&metrics_sync,
);
relay_utils::relay_loop::run(
relay_utils::relay_loop::RECONNECT_DELAY,
source_client,
target_client,
|source_client, target_client| {
run_until_connection_lost(
source_client,
target_client,
sync_params.clone(),
if metrics_enabled {
Some(metrics_global.clone())
} else {
None
},
if metrics_enabled {
Some(metrics_sync.clone())
} else {
None
},
exit_signal.clone(),
)
},
);
}
/// Unjustified headers container. Ordered by header number.
pub(crate) type UnjustifiedHeaders<P> = Vec<<P as FinalitySyncPipeline>::Header>;
/// Finality proofs container. Ordered by target header number.
pub(crate) type FinalityProofs<P> = Vec<(
<P as FinalitySyncPipeline>::Number,
<P as FinalitySyncPipeline>::FinalityProof,
)>;
/// Error that may happen inside finality synchronization loop.
#[derive(Debug)]
enum Error<P: FinalitySyncPipeline, SourceError, TargetError> {
/// Source client request has failed with given error.
Source(SourceError),
/// Target client request has failed with given error.
Target(TargetError),
/// Finality proof for mandatory header is missing from the source node.
MissingMandatoryFinalityProof(P::Number),
/// The synchronization has stalled.
Stalled,
}
impl<P, SourceError, TargetError> Error<P, SourceError, TargetError>
where
P: FinalitySyncPipeline,
SourceError: MaybeConnectionError,
TargetError: MaybeConnectionError,
{
fn fail_if_connection_error(&self) -> Result<(), FailedClient> {
match *self {
Error::Source(ref error) if error.is_connection_error() => Err(FailedClient::Source),
Error::Target(ref error) if error.is_connection_error() => Err(FailedClient::Target),
Error::Stalled => Err(FailedClient::Both),
_ => Ok(()),
}
}
}
/// Information about transaction that we have submitted.
#[derive(Debug, Clone)]
struct Transaction<Number> {
/// Time when we have submitted this transaction.
pub time: Instant,
/// The number of the header we have submitted.
pub submitted_header_number: Number,
}
/// Finality proofs stream that may be restarted.
struct RestartableFinalityProofsStream<S> {
/// Flag that the stream needs to be restarted.
needs_restart: bool,
/// The stream itself.
stream: Pin<Box<S>>,
}
/// Finality synchronization loop state.
struct FinalityLoopState<'a, P: FinalitySyncPipeline, FinalityProofsStream> {
/// Synchronization loop progress.
progress: &'a mut (Instant, Option<P::Number>),
/// Finality proofs stream.
finality_proofs_stream: &'a mut RestartableFinalityProofsStream<FinalityProofsStream>,
/// Recent finality proofs that we have read from the stream.
recent_finality_proofs: &'a mut FinalityProofs<P>,
/// Last transaction that we have submitted to the target node.
last_transaction: Option<Transaction<P::Number>>,
}
async fn run_until_connection_lost<P: FinalitySyncPipeline>(
source_client: impl SourceClient<P>,
target_client: impl TargetClient<P>,
sync_params: FinalitySyncParams,
metrics_global: Option<GlobalMetrics>,
metrics_sync: Option<SyncLoopMetrics>,
exit_signal: impl Future<Output = ()>,
) -> Result<(), FailedClient> {
let restart_finality_proofs_stream = || async {
source_client.finality_proofs().await.map_err(|error| {
log::error!(
target: "bridge",
"Failed to subscribe to {} justifications: {:?}. Going to reconnect",
P::SOURCE_NAME,
error,
);
FailedClient::Source
})
};
let exit_signal = exit_signal.fuse();
futures::pin_mut!(exit_signal);
let mut finality_proofs_stream = RestartableFinalityProofsStream {
needs_restart: false,
stream: Box::pin(restart_finality_proofs_stream().await?),
};
let mut recent_finality_proofs = Vec::new();
let mut progress = (Instant::now(), None);
let mut retry_backoff = retry_backoff();
let mut last_transaction = None;
loop {
// run loop iteration
let iteration_result = run_loop_iteration(
&source_client,
&target_client,
FinalityLoopState {
progress: &mut progress,
finality_proofs_stream: &mut finality_proofs_stream,
recent_finality_proofs: &mut recent_finality_proofs,
last_transaction: last_transaction.clone(),
},
&sync_params,
&metrics_sync,
)
.await;
// update global metrics
if let Some(ref metrics_global) = metrics_global {
metrics_global.update().await;
}
// deal with errors
let next_tick = match iteration_result {
Ok(updated_last_transaction) => {
last_transaction = updated_last_transaction;
retry_backoff.reset();
sync_params.tick
}
Err(error) => {
log::error!(target: "bridge", "Finality sync loop iteration has failed with error: {:?}", error);
error.fail_if_connection_error()?;
retry_backoff
.next_backoff()
.unwrap_or(relay_utils::relay_loop::RECONNECT_DELAY)
}
};
if finality_proofs_stream.needs_restart {
finality_proofs_stream.needs_restart = false;
finality_proofs_stream.stream = Box::pin(restart_finality_proofs_stream().await?);
}
// wait till exit signal, or new source block
select! {
_ = async_std::task::sleep(next_tick).fuse() => {},
_ = exit_signal => return Ok(()),
}
}
}
async fn run_loop_iteration<P, SC, TC>(
source_client: &SC,
target_client: &TC,
state: FinalityLoopState<'_, P, SC::FinalityProofsStream>,
sync_params: &FinalitySyncParams,
metrics_sync: &Option<SyncLoopMetrics>,
) -> Result<Option<Transaction<P::Number>>, Error<P, SC::Error, TC::Error>>
where
P: FinalitySyncPipeline,
SC: SourceClient<P>,
TC: TargetClient<P>,
{
// read best source headers ids from source and target nodes
let best_number_at_source = source_client
.best_finalized_block_number()
.await
.map_err(Error::Source)?;
let best_number_at_target = target_client
.best_finalized_source_block_number()
.await
.map_err(Error::Target)?;
if let Some(ref metrics_sync) = *metrics_sync {
metrics_sync.update_best_block_at_source(best_number_at_source);
metrics_sync.update_best_block_at_target(best_number_at_target);
}
*state.progress = print_sync_progress::<P>(*state.progress, best_number_at_source, best_number_at_target);
// if we have already submitted header, then we just need to wait for it
// if we're waiting too much, then we believe our transaction has been lost and restart sync
if let Some(last_transaction) = state.last_transaction {
if best_number_at_target >= last_transaction.submitted_header_number {
// transaction has been mined && we can continue
} else if last_transaction.time.elapsed() > sync_params.stall_timeout {
log::error!(
target: "bridge",
"Finality synchronization from {} to {} has stalled. Going to restart",
P::SOURCE_NAME,
P::TARGET_NAME,
);
return Err(Error::Stalled);
} else {
return Ok(Some(last_transaction));
}
}
// submit new header if we have something new
match select_header_to_submit(
source_client,
target_client,
state.finality_proofs_stream,
state.recent_finality_proofs,
best_number_at_source,
best_number_at_target,
sync_params,
)
.await?
{
Some((header, justification)) => {
let new_transaction = Transaction {
time: Instant::now(),
submitted_header_number: header.number(),
};
log::debug!(
target: "bridge",
"Going to submit finality proof of {} header #{:?} to {}",
P::SOURCE_NAME,
new_transaction.submitted_header_number,
P::TARGET_NAME,
);
target_client
.submit_finality_proof(header, justification)
.await
.map_err(Error::Target)?;
Ok(Some(new_transaction))
}
None => Ok(None),
}
}
async fn select_header_to_submit<P, SC, TC>(
source_client: &SC,
_target_client: &TC,
finality_proofs_stream: &mut RestartableFinalityProofsStream<SC::FinalityProofsStream>,
recent_finality_proofs: &mut FinalityProofs<P>,
best_number_at_source: P::Number,
best_number_at_target: P::Number,
sync_params: &FinalitySyncParams,
) -> Result<Option<(P::Header, P::FinalityProof)>, Error<P, SC::Error, TC::Error>>
where
P: FinalitySyncPipeline,
SC: SourceClient<P>,
TC: TargetClient<P>,
{
let mut selected_finality_proof = None;
let mut unjustified_headers = Vec::new();
// to see that the loop is progressing
log::trace!(
target: "bridge",
"Considering range of headers ({:?}; {:?}]",
best_number_at_target,
best_number_at_source,
);
// read missing headers. if we see that the header schedules GRANDPA change, we need to
// submit this header
let mut header_number = best_number_at_target + One::one();
while header_number <= best_number_at_source {
let (header, finality_proof) = source_client
.header_and_finality_proof(header_number)
.await
.map_err(Error::Source)?;
let is_mandatory = header.is_mandatory();
match (is_mandatory, finality_proof) {
(true, Some(finality_proof)) => {
log::trace!(target: "bridge", "Header {:?} is mandatory", header_number);
return Ok(Some((header, finality_proof)));
}
(true, None) => return Err(Error::MissingMandatoryFinalityProof(header.number())),
(false, Some(finality_proof)) => {
log::trace!(target: "bridge", "Header {:?} has persistent finality proof", header_number);
selected_finality_proof = Some((header, finality_proof));
prune_unjustified_headers::<P>(header_number, &mut unjustified_headers);
}
(false, None) => {
unjustified_headers.push(header);
}
}
header_number = header_number + One::one();
}
// see if we can improve finality by using recent finality proofs
if !unjustified_headers.is_empty() && !recent_finality_proofs.is_empty() {
const NOT_EMPTY_PROOF: &str = "we have checked that the vec is not empty; qed";
// we need proofs for headers in range unjustified_range_begin..=unjustified_range_end
let unjustified_range_begin = unjustified_headers.first().expect(NOT_EMPTY_PROOF).number();
let unjustified_range_end = unjustified_headers.last().expect(NOT_EMPTY_PROOF).number();
// we have proofs for headers in range buffered_range_begin..=buffered_range_end
let buffered_range_begin = recent_finality_proofs.first().expect(NOT_EMPTY_PROOF).0;
let buffered_range_end = recent_finality_proofs.last().expect(NOT_EMPTY_PROOF).0;
// we have two ranges => find intersection
let intersection_begin = std::cmp::max(unjustified_range_begin, buffered_range_begin);
let intersection_end = std::cmp::min(unjustified_range_end, buffered_range_end);
let intersection = intersection_begin..=intersection_end;
// find last proof from intersection
let selected_finality_proof_index = recent_finality_proofs
.binary_search_by_key(intersection.end(), |(number, _)| *number)
.unwrap_or_else(|index| index.saturating_sub(1));
let (selected_header_number, finality_proof) = &recent_finality_proofs[selected_finality_proof_index];
if intersection.contains(selected_header_number) {
// now remove all obsolete headers and extract selected header
let selected_header = prune_unjustified_headers::<P>(*selected_header_number, &mut unjustified_headers)
.expect("unjustified_headers contain all headers from intersection; qed");
selected_finality_proof = Some((selected_header, finality_proof.clone()));
}
}
// read all proofs from the stream, probably selecting updated proof that we're going to submit
loop {
let next_proof = finality_proofs_stream.stream.next();
let finality_proof = match next_proof.now_or_never() {
Some(Some(finality_proof)) => finality_proof,
Some(None) => {
finality_proofs_stream.needs_restart = true;
break;
}
None => break,
};
let finality_proof_target_header_number = match finality_proof.target_header_number() {
Some(target_header_number) => target_header_number,
None => {
continue;
}
};
let justified_header =
prune_unjustified_headers::<P>(finality_proof_target_header_number, &mut unjustified_headers);
if let Some(justified_header) = justified_header {
recent_finality_proofs.clear();
selected_finality_proof = Some((justified_header, finality_proof));
} else {
// the number of proofs read during single wakeup is expected to be low, so we aren't pruning
// `recent_finality_proofs` collection too often
recent_finality_proofs.push((finality_proof_target_header_number, finality_proof));
}
}
// remove obsolete 'recent' finality proofs + keep its size under certain limit
let oldest_finality_proof_to_keep = selected_finality_proof
.as_ref()
.map(|(header, _)| header.number())
.unwrap_or(best_number_at_target);
prune_recent_finality_proofs::<P>(
oldest_finality_proof_to_keep,
recent_finality_proofs,
sync_params.recent_finality_proofs_limit,
);
Ok(selected_finality_proof)
}
/// Remove headers from `unjustified_headers` collection with number lower or equal than `justified_header_number`.
///
/// Returns the header that matches `justified_header_number` (if any).
pub(crate) fn prune_unjustified_headers<P: FinalitySyncPipeline>(
justified_header_number: P::Number,
unjustified_headers: &mut UnjustifiedHeaders<P>,
) -> Option<P::Header> {
prune_ordered_vec(justified_header_number, unjustified_headers, usize::MAX, |header| {
header.number()
})
}
pub(crate) fn prune_recent_finality_proofs<P: FinalitySyncPipeline>(
justified_header_number: P::Number,
recent_finality_proofs: &mut FinalityProofs<P>,
recent_finality_proofs_limit: usize,
) {
prune_ordered_vec(
justified_header_number,
recent_finality_proofs,
recent_finality_proofs_limit,
|(header_number, _)| *header_number,
);
}
fn prune_ordered_vec<T, Number: relay_utils::BlockNumberBase>(
header_number: Number,
ordered_vec: &mut Vec<T>,
maximal_vec_size: usize,
extract_header_number: impl Fn(&T) -> Number,
) -> Option<T> {
let position = ordered_vec.binary_search_by_key(&header_number, extract_header_number);
// first extract element we're interested in
let extracted_element = match position {
Ok(position) => {
let updated_vec = ordered_vec.split_off(position + 1);
let extracted_element = ordered_vec.pop().expect(
"binary_search_by_key has returned Ok(); so there's element at `position`;\
we're splitting vec at `position+1`; so we have pruned at least 1 element;\
qed",
);
*ordered_vec = updated_vec;
Some(extracted_element)
}
Err(position) => {
*ordered_vec = ordered_vec.split_off(position);
None
}
};
// now - limit vec by size
let split_index = ordered_vec.len().saturating_sub(maximal_vec_size);
*ordered_vec = ordered_vec.split_off(split_index);
extracted_element
}
fn print_sync_progress<P: FinalitySyncPipeline>(
progress_context: (Instant, Option<P::Number>),
best_number_at_source: P::Number,
best_number_at_target: P::Number,
) -> (Instant, Option<P::Number>) {
let (prev_time, prev_best_number_at_target) = progress_context;
let now = Instant::now();
let need_update = now - prev_time > Duration::from_secs(10)
|| prev_best_number_at_target
.map(|prev_best_number_at_target| {
best_number_at_target.saturating_sub(prev_best_number_at_target) > 10.into()
})
.unwrap_or(true);
if !need_update {
return (prev_time, prev_best_number_at_target);
}
log::info!(
target: "bridge",
"Synced {:?} of {:?} headers",
best_number_at_target,
best_number_at_source,
);
(now, Some(best_number_at_target))
}
@@ -0,0 +1,339 @@
// Copyright 2019-2021 Parity Technologies (UK) Ltd.
// This file is part of Parity Bridges Common.
// Parity Bridges Common is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Bridges Common is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
//! Tests for finality synchronization loop.
#![cfg(test)]
use crate::finality_loop::{
prune_recent_finality_proofs, prune_unjustified_headers, run, FinalityProofs, FinalitySyncParams, SourceClient,
TargetClient, UnjustifiedHeaders,
};
use crate::{FinalityProof, FinalitySyncPipeline, SourceHeader};
use async_trait::async_trait;
use futures::{FutureExt, Stream, StreamExt};
use parking_lot::Mutex;
use relay_utils::{relay_loop::Client as RelayClient, MaybeConnectionError};
use std::{collections::HashMap, pin::Pin, sync::Arc, time::Duration};
type IsMandatory = bool;
type TestNumber = u64;
#[derive(Debug, Clone)]
enum TestError {
NonConnection,
}
impl MaybeConnectionError for TestError {
fn is_connection_error(&self) -> bool {
false
}
}
#[derive(Debug, Clone)]
struct TestFinalitySyncPipeline;
impl FinalitySyncPipeline for TestFinalitySyncPipeline {
const SOURCE_NAME: &'static str = "TestSource";
const TARGET_NAME: &'static str = "TestTarget";
type Hash = u64;
type Number = TestNumber;
type Header = TestSourceHeader;
type FinalityProof = TestFinalityProof;
}
#[derive(Debug, Clone, PartialEq)]
struct TestSourceHeader(IsMandatory, TestNumber);
impl SourceHeader<TestNumber> for TestSourceHeader {
fn number(&self) -> TestNumber {
self.1
}
fn is_mandatory(&self) -> bool {
self.0
}
}
#[derive(Debug, Clone, PartialEq)]
struct TestFinalityProof(Option<TestNumber>);
impl FinalityProof<TestNumber> for TestFinalityProof {
fn target_header_number(&self) -> Option<TestNumber> {
self.0
}
}
#[derive(Debug, Clone, Default)]
struct ClientsData {
source_best_block_number: TestNumber,
source_headers: HashMap<TestNumber, (TestSourceHeader, Option<TestFinalityProof>)>,
source_proofs: Vec<TestFinalityProof>,
target_best_block_number: TestNumber,
target_headers: Vec<(TestSourceHeader, TestFinalityProof)>,
}
#[derive(Clone)]
struct TestSourceClient {
on_method_call: Arc<dyn Fn(&mut ClientsData) + Send + Sync>,
data: Arc<Mutex<ClientsData>>,
}
#[async_trait]
impl RelayClient for TestSourceClient {
type Error = TestError;
async fn reconnect(&mut self) -> Result<(), TestError> {
unreachable!()
}
}
#[async_trait]
impl SourceClient<TestFinalitySyncPipeline> for TestSourceClient {
type FinalityProofsStream = Pin<Box<dyn Stream<Item = TestFinalityProof>>>;
async fn best_finalized_block_number(&self) -> Result<TestNumber, TestError> {
let mut data = self.data.lock();
(self.on_method_call)(&mut *data);
Ok(data.source_best_block_number)
}
async fn header_and_finality_proof(
&self,
number: TestNumber,
) -> Result<(TestSourceHeader, Option<TestFinalityProof>), TestError> {
let mut data = self.data.lock();
(self.on_method_call)(&mut *data);
data.source_headers
.get(&number)
.cloned()
.ok_or(TestError::NonConnection)
}
async fn finality_proofs(&self) -> Result<Self::FinalityProofsStream, TestError> {
let mut data = self.data.lock();
(self.on_method_call)(&mut *data);
Ok(futures::stream::iter(data.source_proofs.clone()).boxed())
}
}
#[derive(Clone)]
struct TestTargetClient {
on_method_call: Arc<dyn Fn(&mut ClientsData) + Send + Sync>,
data: Arc<Mutex<ClientsData>>,
}
#[async_trait]
impl RelayClient for TestTargetClient {
type Error = TestError;
async fn reconnect(&mut self) -> Result<(), TestError> {
unreachable!()
}
}
#[async_trait]
impl TargetClient<TestFinalitySyncPipeline> for TestTargetClient {
async fn best_finalized_source_block_number(&self) -> Result<TestNumber, TestError> {
let mut data = self.data.lock();
(self.on_method_call)(&mut *data);
Ok(data.target_best_block_number)
}
async fn submit_finality_proof(&self, header: TestSourceHeader, proof: TestFinalityProof) -> Result<(), TestError> {
let mut data = self.data.lock();
(self.on_method_call)(&mut *data);
data.target_best_block_number = header.number();
data.target_headers.push((header, proof));
Ok(())
}
}
fn run_sync_loop(state_function: impl Fn(&mut ClientsData) -> bool + Send + Sync + 'static) -> ClientsData {
let (exit_sender, exit_receiver) = futures::channel::mpsc::unbounded();
let internal_state_function: Arc<dyn Fn(&mut ClientsData) + Send + Sync> = Arc::new(move |data| {
if state_function(data) {
exit_sender.unbounded_send(()).unwrap();
}
});
let clients_data = Arc::new(Mutex::new(ClientsData {
source_best_block_number: 10,
source_headers: vec![
(6, (TestSourceHeader(false, 6), None)),
(7, (TestSourceHeader(false, 7), Some(TestFinalityProof(Some(7))))),
(8, (TestSourceHeader(true, 8), Some(TestFinalityProof(Some(8))))),
(9, (TestSourceHeader(false, 9), Some(TestFinalityProof(Some(9))))),
(10, (TestSourceHeader(false, 10), None)),
]
.into_iter()
.collect(),
source_proofs: vec![TestFinalityProof(Some(12)), TestFinalityProof(Some(14))],
target_best_block_number: 5,
target_headers: vec![],
}));
let source_client = TestSourceClient {
on_method_call: internal_state_function.clone(),
data: clients_data.clone(),
};
let target_client = TestTargetClient {
on_method_call: internal_state_function,
data: clients_data.clone(),
};
let sync_params = FinalitySyncParams {
tick: Duration::from_secs(0),
recent_finality_proofs_limit: 1024,
stall_timeout: Duration::from_secs(1),
};
run(
source_client,
target_client,
sync_params,
None,
exit_receiver.into_future().map(|(_, _)| ()),
);
let clients_data = clients_data.lock().clone();
clients_data
}
#[test]
fn finality_sync_loop_works() {
let client_data = run_sync_loop(|data| {
// header#7 has persistent finality proof, but it isn't mandatory => it isn't submitted, because
// header#8 has persistent finality proof && it is mandatory => it is submitted
// header#9 has persistent finality proof, but it isn't mandatory => it is submitted, because
// there are no more persistent finality proofs
//
// once this ^^^ is done, we generate more blocks && read proof for blocks 12, 14 and 16 from the stream
// but we only submit proof for 16
//
// proof for block 15 is ignored - we haven't managed to decode it
if data.target_best_block_number == 9 {
data.source_best_block_number = 17;
data.source_headers.insert(11, (TestSourceHeader(false, 11), None));
data.source_headers
.insert(12, (TestSourceHeader(false, 12), Some(TestFinalityProof(Some(12)))));
data.source_headers.insert(13, (TestSourceHeader(false, 13), None));
data.source_headers
.insert(14, (TestSourceHeader(false, 14), Some(TestFinalityProof(Some(14)))));
data.source_headers
.insert(15, (TestSourceHeader(false, 15), Some(TestFinalityProof(None))));
data.source_headers
.insert(16, (TestSourceHeader(false, 16), Some(TestFinalityProof(Some(16)))));
data.source_headers.insert(17, (TestSourceHeader(false, 17), None));
}
data.target_best_block_number == 16
});
assert_eq!(
client_data.target_headers,
vec![
(TestSourceHeader(true, 8), TestFinalityProof(Some(8))),
(TestSourceHeader(false, 9), TestFinalityProof(Some(9))),
(TestSourceHeader(false, 16), TestFinalityProof(Some(16))),
],
);
}
#[test]
fn prune_unjustified_headers_works() {
let original_unjustified_headers: UnjustifiedHeaders<TestFinalitySyncPipeline> = vec![
TestSourceHeader(false, 10),
TestSourceHeader(false, 13),
TestSourceHeader(false, 15),
TestSourceHeader(false, 17),
TestSourceHeader(false, 19),
]
.into_iter()
.collect();
// when header is in the collection
let mut unjustified_headers = original_unjustified_headers.clone();
assert_eq!(
prune_unjustified_headers::<TestFinalitySyncPipeline>(10, &mut unjustified_headers),
Some(TestSourceHeader(false, 10)),
);
assert_eq!(&original_unjustified_headers[1..], unjustified_headers,);
// when the header doesn't exist in the collection
let mut unjustified_headers = original_unjustified_headers.clone();
assert_eq!(
prune_unjustified_headers::<TestFinalitySyncPipeline>(11, &mut unjustified_headers),
None,
);
assert_eq!(&original_unjustified_headers[1..], unjustified_headers,);
// when last entry is pruned
let mut unjustified_headers = original_unjustified_headers.clone();
assert_eq!(
prune_unjustified_headers::<TestFinalitySyncPipeline>(19, &mut unjustified_headers),
Some(TestSourceHeader(false, 19)),
);
assert_eq!(&original_unjustified_headers[5..], unjustified_headers,);
// when we try and prune past last entry
let mut unjustified_headers = original_unjustified_headers.clone();
assert_eq!(
prune_unjustified_headers::<TestFinalitySyncPipeline>(20, &mut unjustified_headers),
None,
);
assert_eq!(&original_unjustified_headers[5..], unjustified_headers,);
}
#[test]
fn prune_recent_finality_proofs_works() {
let original_recent_finality_proofs: FinalityProofs<TestFinalitySyncPipeline> = vec![
(10, TestFinalityProof(Some(10))),
(13, TestFinalityProof(Some(13))),
(15, TestFinalityProof(Some(15))),
(17, TestFinalityProof(Some(17))),
(19, TestFinalityProof(Some(19))),
]
.into_iter()
.collect();
// when there's proof for justified header in the vec
let mut recent_finality_proofs = original_recent_finality_proofs.clone();
prune_recent_finality_proofs::<TestFinalitySyncPipeline>(10, &mut recent_finality_proofs, 1024);
assert_eq!(&original_recent_finality_proofs[1..], recent_finality_proofs,);
// when there are no proof for justified header in the vec
let mut recent_finality_proofs = original_recent_finality_proofs.clone();
prune_recent_finality_proofs::<TestFinalitySyncPipeline>(11, &mut recent_finality_proofs, 1024);
assert_eq!(&original_recent_finality_proofs[1..], recent_finality_proofs,);
// when there are too many entries after initial prune && they also need to be pruned
let mut recent_finality_proofs = original_recent_finality_proofs.clone();
prune_recent_finality_proofs::<TestFinalitySyncPipeline>(10, &mut recent_finality_proofs, 2);
assert_eq!(&original_recent_finality_proofs[3..], recent_finality_proofs,);
// when last entry is pruned
let mut recent_finality_proofs = original_recent_finality_proofs.clone();
prune_recent_finality_proofs::<TestFinalitySyncPipeline>(19, &mut recent_finality_proofs, 2);
assert_eq!(&original_recent_finality_proofs[5..], recent_finality_proofs,);
// when post-last entry is pruned
let mut recent_finality_proofs = original_recent_finality_proofs.clone();
prune_recent_finality_proofs::<TestFinalitySyncPipeline>(20, &mut recent_finality_proofs, 2);
assert_eq!(&original_recent_finality_proofs[5..], recent_finality_proofs,);
}
+60
View File
@@ -0,0 +1,60 @@
// Copyright 2019-2021 Parity Technologies (UK) Ltd.
// This file is part of Parity Bridges Common.
// Parity Bridges Common is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Bridges Common is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
//! This crate has single entrypoint to run synchronization loop that is built around finality
//! proofs, as opposed to headers synchronization loop, which is built around headers. The headers
//! are still submitted to the target node, but are treated as auxiliary data as we are not trying
//! to submit all source headers to the target node.
pub use crate::finality_loop::{run, FinalitySyncParams, SourceClient, TargetClient};
use std::fmt::Debug;
mod finality_loop;
mod finality_loop_tests;
/// Finality proofs synchronization pipeline.
pub trait FinalitySyncPipeline: Clone + Debug + Send + Sync {
/// Name of the finality proofs source.
const SOURCE_NAME: &'static str;
/// Name of the finality proofs target.
const TARGET_NAME: &'static str;
/// Headers we're syncing are identified by this hash.
type Hash: Eq + Clone + Copy + Send + Sync + Debug;
/// Headers we're syncing are identified by this number.
type Number: relay_utils::BlockNumberBase;
/// Type of header that we're syncing.
type Header: SourceHeader<Self::Number>;
/// Finality proof type.
type FinalityProof: FinalityProof<Self::Number>;
}
/// Header that we're receiving from source node.
pub trait SourceHeader<Number>: Clone + Debug + PartialEq + Send + Sync {
/// Returns number of header.
fn number(&self) -> Number;
/// Returns true if this header needs to be submitted to target node.
fn is_mandatory(&self) -> bool;
}
/// Abstract finality proof that is justifying block finality.
pub trait FinalityProof<Number>: Clone + Send + Sync + Debug {
/// Return header id that this proof is generated for.
///
/// None is returned if proof is invalid from relayer PoV.
fn target_header_number(&self) -> Option<Number>;
}
@@ -57,18 +57,28 @@ impl Default for SyncLoopMetrics {
}
impl SyncLoopMetrics {
/// Update best block number at source.
pub fn update_best_block_at_source<Number: Into<u64>>(&self, source_best_number: Number) {
self.best_block_numbers
.with_label_values(&["source"])
.set(source_best_number.into());
}
/// Update best block number at target.
pub fn update_best_block_at_target<Number: Into<u64>>(&self, target_best_number: Number) {
self.best_block_numbers
.with_label_values(&["target"])
.set(target_best_number.into());
}
/// Update metrics.
pub fn update<P: HeadersSyncPipeline>(&self, sync: &HeadersSync<P>) {
let headers = sync.headers();
let source_best_number = sync.source_best_number().unwrap_or_else(Zero::zero);
let target_best_number = sync.target_best_header().map(|id| id.0).unwrap_or_else(Zero::zero);
self.best_block_numbers
.with_label_values(&["source"])
.set(source_best_number.into());
self.best_block_numbers
.with_label_values(&["target"])
.set(target_best_number.into());
self.update_best_block_at_source(source_best_number);
self.update_best_block_at_target(target_best_number);
self.blocks_in_state
.with_label_values(&["maybe_orphan"])
@@ -18,8 +18,10 @@ rand = "0.7"
# Bridge dependencies
bp-header-chain = { path = "../../primitives/header-chain" }
bp-message-lane = { path = "../../primitives/message-lane" }
bp-runtime = { path = "../../primitives/runtime" }
finality-relay = { path = "../finality-relay" }
headers-relay = { path = "../headers-relay" }
relay-utils = { path = "../utils" }
+10 -8
View File
@@ -23,7 +23,9 @@ use num_traits::{CheckedSub, Zero};
use sp_core::{storage::StorageKey, Pair};
use sp_runtime::{
generic::SignedBlock,
traits::{AtLeast32Bit, Dispatchable, MaybeDisplay, MaybeSerialize, MaybeSerializeDeserialize, Member},
traits::{
AtLeast32Bit, Block as BlockT, Dispatchable, MaybeDisplay, MaybeSerialize, MaybeSerializeDeserialize, Member,
},
Justification,
};
use std::{fmt::Debug, time::Duration};
@@ -51,7 +53,7 @@ pub trait Chain: ChainBase {
+ AtLeast32Bit
+ Copy;
/// Block type.
type SignedBlock: Member + Serialize + DeserializeOwned + BlockWithJustification;
type SignedBlock: Member + Serialize + DeserializeOwned + BlockWithJustification<Self::Header>;
/// The aggregated `Call` type.
type Call: Dispatchable + Debug;
}
@@ -67,7 +69,9 @@ pub trait ChainWithBalances: Chain {
}
/// Block with justification.
pub trait BlockWithJustification {
pub trait BlockWithJustification<Header> {
/// Return block header.
fn header(&self) -> Header;
/// Return block justification, if known.
fn justification(&self) -> Option<&Justification>;
}
@@ -90,13 +94,11 @@ pub trait TransactionSignScheme {
) -> Self::SignedTransaction;
}
impl BlockWithJustification for () {
fn justification(&self) -> Option<&Justification> {
None
impl<Block: BlockT> BlockWithJustification<Block::Header> for SignedBlock<Block> {
fn header(&self) -> Block::Header {
self.block.header().clone()
}
}
impl<Block> BlockWithJustification for SignedBlock<Block> {
fn justification(&self) -> Option<&Justification> {
self.justification.as_ref()
}
@@ -0,0 +1,147 @@
// Copyright 2019-2021 Parity Technologies (UK) Ltd.
// This file is part of Parity Bridges Common.
// Parity Bridges Common is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Bridges Common is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
//! Default generic implementation of finality source for basic Substrate client.
use crate::chain::{BlockWithJustification, Chain};
use crate::client::Client;
use crate::error::Error;
use crate::sync_header::SyncHeader;
use async_trait::async_trait;
use finality_relay::{FinalityProof, FinalitySyncPipeline, SourceClient, SourceHeader};
use futures::stream::{unfold, Stream, StreamExt};
use relay_utils::relay_loop::Client as RelayClient;
use sp_runtime::traits::Header as HeaderT;
use std::{marker::PhantomData, pin::Pin};
/// Wrapped raw Justification.
#[derive(Debug, Clone)]
pub struct Justification<Header> {
raw_justification: sp_runtime::Justification,
_phantom: PhantomData<Header>,
}
impl<Header> Justification<Header> {
/// Extract raw justification.
pub fn into_inner(self) -> sp_runtime::Justification {
self.raw_justification
}
}
impl<Header> FinalityProof<Header::Number> for Justification<Header>
where
Header: HeaderT,
{
fn target_header_number(&self) -> Option<Header::Number> {
bp_header_chain::justification::decode_justification_target::<Header>(&self.raw_justification)
.ok()
.map(|(_, number)| number)
}
}
/// Substrate node as finality source.
pub struct FinalitySource<C: Chain, P> {
client: Client<C>,
_phantom: PhantomData<P>,
}
impl<C: Chain, P> FinalitySource<C, P> {
/// Create new headers source using given client.
pub fn new(client: Client<C>) -> Self {
FinalitySource {
client,
_phantom: Default::default(),
}
}
}
impl<C: Chain, P> Clone for FinalitySource<C, P> {
fn clone(&self) -> Self {
FinalitySource {
client: self.client.clone(),
_phantom: Default::default(),
}
}
}
#[async_trait]
impl<C: Chain, P: FinalitySyncPipeline> RelayClient for FinalitySource<C, P> {
type Error = Error;
async fn reconnect(&mut self) -> Result<(), Error> {
self.client.reconnect().await
}
}
#[async_trait]
impl<C, P> SourceClient<P> for FinalitySource<C, P>
where
C: Chain,
C::BlockNumber: relay_utils::BlockNumberBase,
P: FinalitySyncPipeline<
Hash = C::Hash,
Number = C::BlockNumber,
Header = SyncHeader<C::Header>,
FinalityProof = Justification<C::Header>,
>,
P::Header: SourceHeader<C::BlockNumber>,
{
type FinalityProofsStream = Pin<Box<dyn Stream<Item = Justification<C::Header>>>>;
async fn best_finalized_block_number(&self) -> Result<P::Number, Error> {
// we **CAN** continue to relay finality proofs if source node is out of sync, because
// target node may be missing proofs that are already available at the source
let finalized_header_hash = self.client.best_finalized_header_hash().await?;
let finalized_header = self.client.header_by_hash(finalized_header_hash).await?;
Ok(*finalized_header.number())
}
async fn header_and_finality_proof(
&self,
number: P::Number,
) -> Result<(P::Header, Option<P::FinalityProof>), Error> {
let header_hash = self.client.block_hash_by_number(number).await?;
let signed_block = self.client.get_block(Some(header_hash)).await?;
Ok((
signed_block.header().into(),
signed_block
.justification()
.cloned()
.map(|raw_justification| Justification {
raw_justification,
_phantom: Default::default(),
}),
))
}
async fn finality_proofs(&self) -> Result<Self::FinalityProofsStream, Error> {
Ok(unfold(
self.client.clone().subscribe_justifications().await?,
move |mut subscription| async move {
let next_justification = subscription.next().await?;
Some((
Justification {
raw_justification: next_justification.0,
_phantom: Default::default(),
},
subscription,
))
},
)
.boxed())
}
}
+2 -1
View File
@@ -187,7 +187,8 @@ mod tests {
type AccountId = u32;
type Index = u32;
type SignedBlock = ();
type SignedBlock =
sp_runtime::generic::SignedBlock<sp_runtime::generic::Block<Self::Header, sp_runtime::OpaqueExtrinsic>>;
type Call = ();
}
@@ -24,6 +24,7 @@ mod error;
mod rpc;
mod sync_header;
pub mod finality_source;
pub mod guard;
pub mod headers_source;
@@ -14,6 +14,8 @@
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
use bp_header_chain::find_grandpa_authorities_scheduled_change;
use finality_relay::SourceHeader as FinalitySourceHeader;
use headers_relay::sync_types::SourceHeader;
use num_traits::{CheckedSub, One};
use relay_utils::HeaderId;
@@ -47,7 +49,7 @@ impl<Header> From<Header> for SyncHeader<Header> {
impl<Header: HeaderT> SourceHeader<Header::Hash, Header::Number> for SyncHeader<Header> {
fn id(&self) -> HeaderId<Header::Hash, Header::Number> {
relay_utils::HeaderId(*self.number(), self.hash())
relay_utils::HeaderId(*self.0.number(), self.hash())
}
fn parent_id(&self) -> HeaderId<Header::Hash, Header::Number> {
@@ -59,3 +61,13 @@ impl<Header: HeaderT> SourceHeader<Header::Hash, Header::Number> for SyncHeader<
)
}
}
impl<Header: HeaderT> FinalitySourceHeader<Header::Number> for SyncHeader<Header> {
fn number(&self) -> Header::Number {
*self.0.number()
}
fn is_mandatory(&self) -> bool {
find_grandpa_authorities_scheduled_change(&self.0).is_some()
}
}
+2 -1
View File
@@ -26,12 +26,13 @@ bp-polkadot = { path = "../../primitives/polkadot" }
bp-runtime = { path = "../../primitives/runtime" }
bp-rialto = { path = "../../primitives/rialto" }
bridge-runtime-common = { path = "../../bin/runtime-common" }
finality-relay = { path = "../finality-relay" }
headers-relay = { path = "../headers-relay" }
messages-relay = { path = "../messages-relay" }
millau-runtime = { path = "../../bin/millau/runtime" }
pallet-bridge-call-dispatch = { path = "../../modules/call-dispatch" }
pallet-finality-verifier = { path = "../../modules/finality-verifier" }
pallet-message-lane = { path = "../../modules/message-lane" }
pallet-substrate-bridge = { path = "../../modules/substrate" }
relay-kusama-client = { path = "../kusama-client" }
relay-millau-client = { path = "../millau-client" }
relay-polkadot-client = { path = "../polkadot-client" }
@@ -0,0 +1,130 @@
// Copyright 2019-2021 Parity Technologies (UK) Ltd.
// This file is part of Parity Bridges Common.
// Parity Bridges Common is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Bridges Common is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
//! Substrate-to-Substrate headers sync entrypoint.
use crate::finality_target::SubstrateFinalityTarget;
use async_trait::async_trait;
use codec::Encode;
use finality_relay::{FinalitySyncParams, FinalitySyncPipeline};
use relay_substrate_client::{
finality_source::{FinalitySource, Justification},
BlockNumberOf, Chain, Client, Error as SubstrateError, HashOf, SyncHeader,
};
use relay_utils::BlockNumberBase;
use std::{fmt::Debug, marker::PhantomData, time::Duration};
/// Default synchronization loop timeout.
const STALL_TIMEOUT: Duration = Duration::from_secs(120);
/// Default limit of recent finality proofs.
///
/// Finality delay of 4096 blocks is unlikely to happen in practice in
/// Substrate+GRANDPA based chains (good to know).
const RECENT_FINALITY_PROOFS_LIMIT: usize = 4096;
/// Headers sync pipeline for Substrate <-> Substrate relays.
#[async_trait]
pub trait SubstrateFinalitySyncPipeline: FinalitySyncPipeline {
/// Name of the runtime method that returns id of best finalized source header at target chain.
const BEST_FINALIZED_SOURCE_HEADER_ID_AT_TARGET: &'static str;
/// Signed transaction type.
type SignedTransaction: Send + Sync + Encode;
/// Make submit header transaction.
async fn make_submit_finality_proof_transaction(
&self,
header: Self::Header,
proof: Self::FinalityProof,
) -> Result<Self::SignedTransaction, SubstrateError>;
}
/// Substrate-to-Substrate finality proof pipeline.
#[derive(Debug, Clone)]
pub struct SubstrateFinalityToSubstrate<SourceChain, TargetChain: Chain, TargetSign> {
/// Client for the target chain.
pub(crate) target_client: Client<TargetChain>,
/// Data required to sign target chain transactions.
pub(crate) target_sign: TargetSign,
/// Unused generic arguments dump.
_marker: PhantomData<SourceChain>,
}
impl<SourceChain, TargetChain: Chain, TargetSign> SubstrateFinalityToSubstrate<SourceChain, TargetChain, TargetSign> {
/// Create new Substrate-to-Substrate headers pipeline.
pub fn new(target_client: Client<TargetChain>, target_sign: TargetSign) -> Self {
SubstrateFinalityToSubstrate {
target_client,
target_sign,
_marker: Default::default(),
}
}
}
impl<SourceChain, TargetChain, TargetSign> FinalitySyncPipeline
for SubstrateFinalityToSubstrate<SourceChain, TargetChain, TargetSign>
where
SourceChain: Clone + Chain + Debug,
BlockNumberOf<SourceChain>: BlockNumberBase,
TargetChain: Clone + Chain + Debug,
TargetSign: Clone + Send + Sync + Debug,
{
const SOURCE_NAME: &'static str = SourceChain::NAME;
const TARGET_NAME: &'static str = TargetChain::NAME;
type Hash = HashOf<SourceChain>;
type Number = BlockNumberOf<SourceChain>;
type Header = SyncHeader<SourceChain::Header>;
type FinalityProof = Justification<SourceChain::Header>;
}
/// Run Substrate-to-Substrate finality sync.
pub async fn run<SourceChain, TargetChain, P>(
pipeline: P,
source_client: Client<SourceChain>,
target_client: Client<TargetChain>,
metrics_params: Option<relay_utils::metrics::MetricsParams>,
) where
P: SubstrateFinalitySyncPipeline<
Hash = HashOf<SourceChain>,
Number = BlockNumberOf<SourceChain>,
Header = SyncHeader<SourceChain::Header>,
FinalityProof = Justification<SourceChain::Header>,
>,
SourceChain: Clone + Chain,
BlockNumberOf<SourceChain>: BlockNumberBase,
TargetChain: Clone + Chain,
{
log::info!(
target: "bridge",
"Starting {} -> {} finality proof relay",
SourceChain::NAME,
TargetChain::NAME,
);
finality_relay::run(
FinalitySource::new(source_client),
SubstrateFinalityTarget::new(target_client, pipeline),
FinalitySyncParams {
tick: std::cmp::max(SourceChain::AVERAGE_BLOCK_INTERVAL, TargetChain::AVERAGE_BLOCK_INTERVAL),
recent_finality_proofs_limit: RECENT_FINALITY_PROOFS_LIMIT,
stall_timeout: STALL_TIMEOUT,
},
metrics_params,
futures::future::pending(),
);
}
@@ -0,0 +1,91 @@
// Copyright 2019-2020 Parity Technologies (UK) Ltd.
// This file is part of Parity Bridges Common.
// Parity Bridges Common is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Bridges Common is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
//! Substrate client as Substrate finality proof target. The chain we connect to should have
//! runtime that implements `<BridgedChainName>FinalityApi` to allow bridging with
//! <BridgedName> chain.
use crate::finality_pipeline::SubstrateFinalitySyncPipeline;
use async_trait::async_trait;
use codec::{Decode, Encode};
use finality_relay::TargetClient;
use futures::TryFutureExt;
use relay_substrate_client::{Chain, Client, Error as SubstrateError};
use relay_utils::relay_loop::Client as RelayClient;
use sp_core::Bytes;
/// Substrate client as Substrate finality target.
pub struct SubstrateFinalityTarget<C: Chain, P> {
client: Client<C>,
pipeline: P,
}
impl<C: Chain, P> SubstrateFinalityTarget<C, P> {
/// Create new Substrate headers target.
pub fn new(client: Client<C>, pipeline: P) -> Self {
SubstrateFinalityTarget { client, pipeline }
}
}
impl<C: Chain, P: SubstrateFinalitySyncPipeline> Clone for SubstrateFinalityTarget<C, P> {
fn clone(&self) -> Self {
SubstrateFinalityTarget {
client: self.client.clone(),
pipeline: self.pipeline.clone(),
}
}
}
#[async_trait]
impl<C: Chain, P: SubstrateFinalitySyncPipeline> RelayClient for SubstrateFinalityTarget<C, P> {
type Error = SubstrateError;
async fn reconnect(&mut self) -> Result<(), SubstrateError> {
self.client.reconnect().await
}
}
#[async_trait]
impl<C, P> TargetClient<P> for SubstrateFinalityTarget<C, P>
where
C: Chain,
P::Number: Decode,
P::Hash: Decode,
P: SubstrateFinalitySyncPipeline,
{
async fn best_finalized_source_block_number(&self) -> Result<P::Number, SubstrateError> {
// we can't continue to relay finality if target node is out of sync, because
// it may have already received (some of) headers that we're going to relay
self.client.ensure_synced().await?;
Ok(crate::messages_source::read_client_state::<C, P::Hash, P::Number>(
&self.client,
P::BEST_FINALIZED_SOURCE_HEADER_ID_AT_TARGET,
)
.await?
.best_finalized_peer_at_best_self
.0)
}
async fn submit_finality_proof(&self, header: P::Header, proof: P::FinalityProof) -> Result<(), SubstrateError> {
self.pipeline
.make_submit_finality_proof_transaction(header, proof)
.and_then(|tx| self.client.submit_extrinsic(Bytes(tx.encode())))
.await
.map(drop)
}
}
@@ -17,12 +17,12 @@
//! Initialize Substrate -> Substrate headers bridge.
//!
//! Initialization is a transaction that calls `initialize()` function of the
//! `pallet-substrate-bridge` pallet. This transaction brings initial header
//! `pallet-finality-verifier` pallet. This transaction brings initial header
//! and authorities set from source to target chain. The headers sync starts
//! with this header.
use codec::Decode;
use pallet_substrate_bridge::InitializationData;
use pallet_finality_verifier::InitializationData;
use relay_substrate_client::{Chain, Client};
use sp_core::Bytes;
use sp_finality_grandpa::{AuthorityList as GrandpaAuthoritiesSet, SetId as GrandpaAuthoritiesSetId};
@@ -132,10 +132,6 @@ async fn prepare_initialization_data<SourceChain: Chain>(
header: initial_header,
authority_list: initial_authorities_set,
set_id: initial_authorities_set_id.unwrap_or(0),
// There may be multiple scheduled changes, so on real chains we should select proper
// moment, when there's nothing scheduled. On ephemeral (temporary) chains, it is ok to
// start with genesis.
scheduled_change: None,
is_halted: false,
})
}
@@ -1,458 +0,0 @@
// Copyright 2019-2020 Parity Technologies (UK) Ltd.
// This file is part of Parity Bridges Common.
// Parity Bridges Common is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Bridges Common is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
//! Substrate-to-Substrate headers synchronization maintain procedure.
//!
//! Regular headers synchronization only depends on persistent justifications
//! that are generated when authorities set changes. This happens rarely on
//! real-word chains. So some other way to finalize headers is required.
//!
//! Full nodes are listening to GRANDPA messages, so they may have track authorities
//! votes on their own. They're returning both persistent and ephemeral justifications
//! (justifications that are not stored in the database and not broadcasted over network)
//! throught `grandpa_subscribeJustifications` RPC subscription.
//!
//! The idea of this maintain procedure is that when we see justification that 'improves'
//! best finalized header on the target chain, we submit this justification to the target
//! node.
use crate::headers_pipeline::SubstrateHeadersSyncPipeline;
use async_std::sync::{Arc, Mutex};
use async_trait::async_trait;
use codec::{Decode, Encode};
use futures::future::{poll_fn, FutureExt, TryFutureExt};
use headers_relay::{
sync::HeadersSync,
sync_loop::SyncMaintain,
sync_types::{HeaderIdOf, HeaderStatus},
};
use relay_substrate_client::{Chain, Client, Error as SubstrateError, JustificationsSubscription};
use relay_utils::HeaderId;
use sp_core::Bytes;
use sp_runtime::{traits::Header as HeaderT, Justification};
use std::{collections::VecDeque, marker::PhantomData, task::Poll};
/// Substrate-to-Substrate headers synchronization maintain procedure.
pub struct SubstrateHeadersToSubstrateMaintain<P: SubstrateHeadersSyncPipeline, SourceChain: Chain, TargetChain: Chain>
{
pipeline: P,
source_client: Client<SourceChain>,
target_client: Client<TargetChain>,
justifications: Arc<Mutex<Justifications<P>>>,
_marker: PhantomData<SourceChain>,
}
/// Future and already received justifications from the source chain.
struct Justifications<P: SubstrateHeadersSyncPipeline> {
/// Justifications stream. None if it hasn't been initialized yet, or it has been dropped
/// by the rpc library.
stream: Option<JustificationsSubscription>,
/// Justifications that we have read from the stream but have not sent to the
/// target node, because their targets were still not synced.
queue: VecDeque<(HeaderIdOf<P>, Justification)>,
}
impl<P: SubstrateHeadersSyncPipeline, SourceChain: Chain, TargetChain: Chain>
SubstrateHeadersToSubstrateMaintain<P, SourceChain, TargetChain>
{
/// Create new maintain procedure.
pub async fn new(pipeline: P, source_client: Client<SourceChain>, target_client: Client<TargetChain>) -> Self {
let justifications = subscribe_justifications(&source_client).await;
SubstrateHeadersToSubstrateMaintain {
pipeline,
source_client,
target_client,
justifications: Arc::new(Mutex::new(Justifications {
stream: justifications,
queue: VecDeque::new(),
})),
_marker: Default::default(),
}
}
}
#[async_trait]
impl<P: SubstrateHeadersSyncPipeline, SourceChain: Chain, TargetChain: Chain> Clone
for SubstrateHeadersToSubstrateMaintain<P, SourceChain, TargetChain>
{
fn clone(&self) -> Self {
SubstrateHeadersToSubstrateMaintain {
pipeline: self.pipeline.clone(),
source_client: self.source_client.clone(),
target_client: self.target_client.clone(),
justifications: self.justifications.clone(),
_marker: Default::default(),
}
}
}
#[async_trait]
impl<P, SourceChain, TargetChain> SyncMaintain<P> for SubstrateHeadersToSubstrateMaintain<P, SourceChain, TargetChain>
where
SourceChain: Chain,
<SourceChain::Header as HeaderT>::Number: Into<P::Number>,
<SourceChain::Header as HeaderT>::Hash: Into<P::Hash>,
TargetChain: Chain,
P::Number: Decode,
P::Hash: Decode,
P: SubstrateHeadersSyncPipeline<Completion = Justification, Extra = ()>,
{
async fn maintain(&self, sync: &mut HeadersSync<P>) {
// lock justifications before doing anything else
let mut justifications = match self.justifications.try_lock() {
Some(justifications) => justifications,
None => {
// this should never happen, as we use single-thread executor
log::warn!(target: "bridge", "Failed to acquire {} justifications lock", P::SOURCE_NAME);
return;
}
};
// we need to read best finalized header from the target node to be able to
// choose justification to submit
let best_finalized = match best_finalized_header_id::<P, _>(&self.target_client).await {
Ok(best_finalized) => best_finalized,
Err(error) => {
log::warn!(
target: "bridge",
"Failed to read best finalized {} block from maintain: {:?}",
P::SOURCE_NAME,
error,
);
return;
}
};
log::debug!(
target: "bridge",
"Read best finalized {} block from {}: {:?}",
P::SOURCE_NAME,
P::TARGET_NAME,
best_finalized,
);
// Select justification to submit to the target node. We're submitting at most one justification
// on every maintain call. So maintain rate directly affects finalization rate.
let (resubscribe, justification_to_submit) = poll_fn(|context| {
// read justifications from the stream and push to the queue
let resubscribe = !justifications.read_from_stream::<SourceChain::Header>(context);
// remove all obsolete justifications from the queue
remove_obsolete::<P>(&mut justifications.queue, best_finalized);
// select justification to submit
Poll::Ready((resubscribe, select_justification(&mut justifications.queue, sync)))
})
.await;
// if justifications subscription has been dropped, resubscribe
if resubscribe {
justifications.stream = subscribe_justifications(&self.source_client).await;
}
// finally - submit selected justification
if let Some((target, justification)) = justification_to_submit {
let submit_result = self
.pipeline
.make_complete_header_transaction(target, justification)
.and_then(|tx| self.target_client.submit_extrinsic(Bytes(tx.encode())))
.await;
match submit_result {
Ok(_) => log::debug!(
target: "bridge",
"Submitted justification received over {} subscription. Target: {:?}",
P::SOURCE_NAME,
target,
),
Err(error) => log::warn!(
target: "bridge",
"Failed to submit justification received over {} subscription for {:?}: {:?}",
P::SOURCE_NAME,
target,
error,
),
}
}
}
}
impl<P> Justifications<P>
where
P::Number: Decode,
P::Hash: Decode,
P: SubstrateHeadersSyncPipeline<Completion = Justification, Extra = ()>,
{
/// Read justifications from the subscription stream without blocking.
///
/// Returns `true` if justifications stream is still readable and `false` if it has been
/// dropped by the RPC crate && we need to resubscribe.
#[must_use]
fn read_from_stream<'a, SourceHeader>(&mut self, context: &mut std::task::Context<'a>) -> bool
where
SourceHeader: HeaderT,
SourceHeader::Number: Into<P::Number>,
SourceHeader::Hash: Into<P::Hash>,
{
let stream = match self.stream.as_mut() {
Some(stream) => stream,
None => return false,
};
loop {
let maybe_next_justification = stream.next();
futures::pin_mut!(maybe_next_justification);
let maybe_next_justification = maybe_next_justification.poll_unpin(context);
let justification = match maybe_next_justification {
Poll::Ready(justification) => justification,
Poll::Pending => return true,
};
let justification = match justification {
Some(justification) => justification,
None => {
log::warn!(
target: "bridge",
"{} justifications stream has been dropped. Will be trying to resubscribe",
P::SOURCE_NAME,
);
return false;
}
};
// decode justification target
let target = bp_header_chain::justification::decode_justification_target::<SourceHeader>(&justification);
let target = match target {
Ok((target_hash, target_number)) => HeaderId(target_number.into(), target_hash.into()),
Err(error) => {
log::warn!(
target: "bridge",
"Failed to decode justification from {} subscription: {:?}",
P::SOURCE_NAME,
error,
);
continue;
}
};
log::debug!(
target: "bridge",
"Received {} justification over subscription. Target: {:?}",
P::SOURCE_NAME,
target,
);
self.queue.push_back((target, justification.0));
}
}
}
/// Clean queue of all justifications that are justifying already finalized blocks.
fn remove_obsolete<P: SubstrateHeadersSyncPipeline>(
queue: &mut VecDeque<(HeaderIdOf<P>, Justification)>,
best_finalized: HeaderIdOf<P>,
) {
while queue
.front()
.map(|(target, _)| target.0 <= best_finalized.0)
.unwrap_or(false)
{
queue.pop_front();
}
}
/// Select appropriate justification that would improve best finalized block on target node.
///
/// It is assumed that the selected justification will be submitted to the target node. The
/// justification itself and all preceeding justifications are removed from the queue.
fn select_justification<P>(
queue: &mut VecDeque<(HeaderIdOf<P>, Justification)>,
sync: &mut HeadersSync<P>,
) -> Option<(HeaderIdOf<P>, Justification)>
where
P: SubstrateHeadersSyncPipeline<Completion = Justification>,
{
let mut selected_justification = None;
while let Some((target, justification)) = queue.pop_front() {
// if we're waiting for this justification, report it
if sync.headers().requires_completion_data(&target) {
sync.headers_mut().completion_response(&target, Some(justification));
// we won't submit previous justifications as we going to submit justification for
// next header
selected_justification = None;
// we won't submit next justifications as we need to submit previous justifications
// first
break;
}
// if we know that the header is already synced (it is known to the target node), let's
// select it for submission. We still may select better justification on the next iteration.
if sync.headers().status(&target) == HeaderStatus::Synced {
selected_justification = Some((target, justification));
continue;
}
// finally - return justification back to the queue
queue.push_back((target, justification));
break;
}
selected_justification
}
/// Returns best finalized source header on the target chain.
async fn best_finalized_header_id<P, C>(client: &Client<C>) -> Result<HeaderIdOf<P>, SubstrateError>
where
P: SubstrateHeadersSyncPipeline,
P::Number: Decode,
P::Hash: Decode,
C: Chain,
{
let call = P::FINALIZED_BLOCK_METHOD.into();
let data = Bytes(Vec::new());
let encoded_response = client.state_call(call, data, None).await?;
let decoded_response: (P::Number, P::Hash) =
Decode::decode(&mut &encoded_response.0[..]).map_err(SubstrateError::ResponseParseFailed)?;
let best_header_id = HeaderId(decoded_response.0, decoded_response.1);
Ok(best_header_id)
}
/// Subscribe to justifications stream at source node.
async fn subscribe_justifications<C: Chain>(client: &Client<C>) -> Option<JustificationsSubscription> {
match client.subscribe_justifications().await {
Ok(source_justifications) => {
log::debug!(
target: "bridge",
"Successfully (re)subscribed to {} justifications",
C::NAME,
);
Some(source_justifications)
}
Err(error) => {
log::warn!(
target: "bridge",
"Failed to subscribe to {} justifications: {:?}",
C::NAME,
error,
);
None
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::headers_pipeline::sync_params;
use crate::millau_headers_to_rialto::MillauHeadersToRialto;
fn parent_hash(index: u8) -> bp_millau::Hash {
if index == 1 {
Default::default()
} else {
header(index - 1).hash()
}
}
fn header_hash(index: u8) -> bp_millau::Hash {
header(index).hash()
}
fn header(index: u8) -> bp_millau::Header {
bp_millau::Header::new(
index as _,
Default::default(),
Default::default(),
parent_hash(index),
Default::default(),
)
}
#[test]
fn obsolete_justifications_are_removed() {
let mut queue = vec![
(HeaderId(1, header_hash(1)), vec![1]),
(HeaderId(2, header_hash(2)), vec![2]),
(HeaderId(3, header_hash(3)), vec![3]),
]
.into_iter()
.collect();
remove_obsolete::<MillauHeadersToRialto>(&mut queue, HeaderId(2, header_hash(2)));
assert_eq!(
queue,
vec![(HeaderId(3, header_hash(3)), vec![3])]
.into_iter()
.collect::<VecDeque<_>>(),
);
}
#[test]
fn latest_justification_is_selected() {
let mut queue = vec![
(HeaderId(1, header_hash(1)), vec![1]),
(HeaderId(2, header_hash(2)), vec![2]),
(HeaderId(3, header_hash(3)), vec![3]),
]
.into_iter()
.collect();
let mut sync = HeadersSync::<MillauHeadersToRialto>::new(sync_params());
sync.headers_mut().header_response(header(1).into());
sync.headers_mut().header_response(header(2).into());
sync.headers_mut().header_response(header(3).into());
sync.target_best_header_response(HeaderId(2, header_hash(2)));
assert_eq!(
select_justification(&mut queue, &mut sync),
Some((HeaderId(2, header_hash(2)), vec![2])),
);
}
#[test]
fn required_justification_is_reported() {
let mut queue = vec![
(HeaderId(1, header_hash(1)), vec![1]),
(HeaderId(2, header_hash(2)), vec![2]),
(HeaderId(3, header_hash(3)), vec![3]),
]
.into_iter()
.collect();
let mut sync = HeadersSync::<MillauHeadersToRialto>::new(sync_params());
sync.headers_mut().header_response(header(1).into());
sync.headers_mut().header_response(header(2).into());
sync.headers_mut().header_response(header(3).into());
sync.headers_mut()
.incomplete_headers_response(vec![HeaderId(2, header_hash(2))].into_iter().collect());
sync.target_best_header_response(HeaderId(2, header_hash(2)));
assert_eq!(sync.headers_mut().header_to_complete(), None,);
assert_eq!(select_justification(&mut queue, &mut sync), None,);
assert_eq!(
sync.headers_mut().header_to_complete(),
Some((HeaderId(2, header_hash(2)), &vec![2])),
);
}
}
@@ -1,166 +0,0 @@
// Copyright 2019-2020 Parity Technologies (UK) Ltd.
// This file is part of Parity Bridges Common.
// Parity Bridges Common is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Bridges Common is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
//! Substrate-to-Substrate headers sync entrypoint.
use crate::{headers_maintain::SubstrateHeadersToSubstrateMaintain, headers_target::SubstrateHeadersTarget};
use async_trait::async_trait;
use codec::Encode;
use headers_relay::{
sync::{HeadersSyncParams, TargetTransactionMode},
sync_types::{HeaderIdOf, HeadersSyncPipeline, QueuedHeader, SourceHeader},
};
use relay_substrate_client::{
headers_source::HeadersSource, BlockNumberOf, Chain, Client, Error as SubstrateError, HashOf,
};
use relay_utils::BlockNumberBase;
use sp_runtime::Justification;
use std::marker::PhantomData;
/// Headers sync pipeline for Substrate <-> Substrate relays.
#[async_trait]
pub trait SubstrateHeadersSyncPipeline: HeadersSyncPipeline {
/// Name of the `best_block` runtime method.
const BEST_BLOCK_METHOD: &'static str;
/// Name of the `finalized_block` runtime method.
const FINALIZED_BLOCK_METHOD: &'static str;
/// Name of the `is_known_block` runtime method.
const IS_KNOWN_BLOCK_METHOD: &'static str;
/// Name of the `incomplete_headers` runtime method.
const INCOMPLETE_HEADERS_METHOD: &'static str;
/// Signed transaction type.
type SignedTransaction: Send + Sync + Encode;
/// Make submit header transaction.
async fn make_submit_header_transaction(
&self,
header: QueuedHeader<Self>,
) -> Result<Self::SignedTransaction, SubstrateError>;
/// Make completion transaction for the header.
async fn make_complete_header_transaction(
&self,
id: HeaderIdOf<Self>,
completion: Justification,
) -> Result<Self::SignedTransaction, SubstrateError>;
}
/// Substrate-to-Substrate headers pipeline.
#[derive(Debug, Clone)]
pub struct SubstrateHeadersToSubstrate<SourceChain, SourceSyncHeader, TargetChain: Chain, TargetSign> {
/// Client for the target chain.
pub(crate) target_client: Client<TargetChain>,
/// Data required to sign target chain transactions.
pub(crate) target_sign: TargetSign,
/// Unused generic arguments dump.
_marker: PhantomData<(SourceChain, SourceSyncHeader)>,
}
impl<SourceChain, SourceSyncHeader, TargetChain: Chain, TargetSign>
SubstrateHeadersToSubstrate<SourceChain, SourceSyncHeader, TargetChain, TargetSign>
{
/// Create new Substrate-to-Substrate headers pipeline.
pub fn new(target_client: Client<TargetChain>, target_sign: TargetSign) -> Self {
SubstrateHeadersToSubstrate {
target_client,
target_sign,
_marker: Default::default(),
}
}
}
impl<SourceChain, SourceSyncHeader, TargetChain, TargetSign> HeadersSyncPipeline
for SubstrateHeadersToSubstrate<SourceChain, SourceSyncHeader, TargetChain, TargetSign>
where
SourceChain: Clone + Chain,
BlockNumberOf<SourceChain>: BlockNumberBase,
SourceSyncHeader:
SourceHeader<HashOf<SourceChain>, BlockNumberOf<SourceChain>> + std::ops::Deref<Target = SourceChain::Header>,
TargetChain: Clone + Chain,
TargetSign: Clone + Send + Sync,
{
const SOURCE_NAME: &'static str = SourceChain::NAME;
const TARGET_NAME: &'static str = TargetChain::NAME;
type Hash = HashOf<SourceChain>;
type Number = BlockNumberOf<SourceChain>;
type Header = SourceSyncHeader;
type Extra = ();
type Completion = Justification;
fn estimate_size(source: &QueuedHeader<Self>) -> usize {
source.header().encode().len()
}
}
/// Return sync parameters for Substrate-to-Substrate headers sync.
pub fn sync_params() -> HeadersSyncParams {
HeadersSyncParams {
max_future_headers_to_download: 32,
max_headers_in_submitted_status: 8,
max_headers_in_single_submit: 1,
max_headers_size_in_single_submit: 1024 * 1024,
prune_depth: 256,
target_tx_mode: TargetTransactionMode::Signed,
}
}
/// Run Substrate-to-Substrate headers sync.
pub async fn run<SourceChain, TargetChain, P>(
pipeline: P,
source_client: Client<SourceChain>,
target_client: Client<TargetChain>,
metrics_params: Option<relay_utils::metrics::MetricsParams>,
) where
P: SubstrateHeadersSyncPipeline<
Hash = HashOf<SourceChain>,
Number = BlockNumberOf<SourceChain>,
Completion = Justification,
Extra = (),
>,
P::Header: SourceHeader<HashOf<SourceChain>, BlockNumberOf<SourceChain>>,
SourceChain: Clone + Chain,
SourceChain::Header: Into<P::Header>,
BlockNumberOf<SourceChain>: BlockNumberBase,
TargetChain: Clone + Chain,
{
let sync_maintain = SubstrateHeadersToSubstrateMaintain::<_, SourceChain, _>::new(
pipeline.clone(),
source_client.clone(),
target_client.clone(),
)
.await;
log::info!(
target: "bridge",
"Starting {} -> {} headers relay",
SourceChain::NAME,
TargetChain::NAME,
);
headers_relay::sync_loop::run(
HeadersSource::new(source_client),
SourceChain::AVERAGE_BLOCK_INTERVAL,
SubstrateHeadersTarget::new(target_client, pipeline),
TargetChain::AVERAGE_BLOCK_INTERVAL,
sync_maintain,
sync_params(),
metrics_params,
futures::future::pending(),
);
}
@@ -1,168 +0,0 @@
// Copyright 2019-2020 Parity Technologies (UK) Ltd.
// This file is part of Parity Bridges Common.
// Parity Bridges Common is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Bridges Common is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
//! Substrate client as Substrate headers target. The chain we connect to should have
//! runtime that implements `<BridgedChainName>HeaderApi` to allow bridging with
//! <BridgedName> chain.
use crate::headers_pipeline::SubstrateHeadersSyncPipeline;
use async_trait::async_trait;
use codec::{Decode, Encode};
use futures::TryFutureExt;
use headers_relay::{
sync_loop::TargetClient,
sync_types::{HeaderIdOf, QueuedHeader, SubmittedHeaders},
};
use relay_substrate_client::{Chain, Client, Error as SubstrateError};
use relay_utils::{relay_loop::Client as RelayClient, HeaderId};
use sp_core::Bytes;
use sp_runtime::Justification;
use std::collections::HashSet;
/// Substrate client as Substrate headers target.
pub struct SubstrateHeadersTarget<C: Chain, P> {
client: Client<C>,
pipeline: P,
}
impl<C: Chain, P> SubstrateHeadersTarget<C, P> {
/// Create new Substrate headers target.
pub fn new(client: Client<C>, pipeline: P) -> Self {
SubstrateHeadersTarget { client, pipeline }
}
}
impl<C: Chain, P: SubstrateHeadersSyncPipeline> Clone for SubstrateHeadersTarget<C, P> {
fn clone(&self) -> Self {
SubstrateHeadersTarget {
client: self.client.clone(),
pipeline: self.pipeline.clone(),
}
}
}
#[async_trait]
impl<C: Chain, P: SubstrateHeadersSyncPipeline> RelayClient for SubstrateHeadersTarget<C, P> {
type Error = SubstrateError;
async fn reconnect(&mut self) -> Result<(), SubstrateError> {
self.client.reconnect().await
}
}
#[async_trait]
impl<C, P> TargetClient<P> for SubstrateHeadersTarget<C, P>
where
C: Chain,
P::Number: Decode,
P::Hash: Decode + Encode,
P: SubstrateHeadersSyncPipeline<Completion = Justification, Extra = ()>,
{
async fn best_header_id(&self) -> Result<HeaderIdOf<P>, SubstrateError> {
// we can't continue to relay headers if target node is out of sync, because
// it may have already received (some of) headers that we're going to relay
self.client.ensure_synced().await?;
let call = P::BEST_BLOCK_METHOD.into();
let data = Bytes(Vec::new());
let encoded_response = self.client.state_call(call, data, None).await?;
let decoded_response: Vec<(P::Number, P::Hash)> =
Decode::decode(&mut &encoded_response.0[..]).map_err(SubstrateError::ResponseParseFailed)?;
// If we parse an empty list of headers it means that bridge pallet has not been initalized
// yet. Otherwise we expect to always have at least one header.
decoded_response
.last()
.ok_or(SubstrateError::UninitializedBridgePallet)
.map(|(num, hash)| HeaderId(*num, *hash))
}
async fn is_known_header(&self, id: HeaderIdOf<P>) -> Result<(HeaderIdOf<P>, bool), SubstrateError> {
let call = P::IS_KNOWN_BLOCK_METHOD.into();
let data = Bytes(id.1.encode());
let encoded_response = self.client.state_call(call, data, None).await?;
let is_known_block: bool =
Decode::decode(&mut &encoded_response.0[..]).map_err(SubstrateError::ResponseParseFailed)?;
Ok((id, is_known_block))
}
async fn submit_headers(
&self,
mut headers: Vec<QueuedHeader<P>>,
) -> SubmittedHeaders<HeaderIdOf<P>, SubstrateError> {
debug_assert_eq!(
headers.len(),
1,
"Substrate pallet only supports single header / transaction"
);
let header = headers.remove(0);
let id = header.id();
let submit_transaction_result = self
.pipeline
.make_submit_header_transaction(header)
.and_then(|tx| self.client.submit_extrinsic(Bytes(tx.encode())))
.await;
match submit_transaction_result {
Ok(_) => SubmittedHeaders {
submitted: vec![id],
incomplete: Vec::new(),
rejected: Vec::new(),
fatal_error: None,
},
Err(error) => SubmittedHeaders {
submitted: Vec::new(),
incomplete: Vec::new(),
rejected: vec![id],
fatal_error: Some(error),
},
}
}
async fn incomplete_headers_ids(&self) -> Result<HashSet<HeaderIdOf<P>>, SubstrateError> {
let call = P::INCOMPLETE_HEADERS_METHOD.into();
let data = Bytes(Vec::new());
let encoded_response = self.client.state_call(call, data, None).await?;
let decoded_response: Vec<(P::Number, P::Hash)> =
Decode::decode(&mut &encoded_response.0[..]).map_err(SubstrateError::ResponseParseFailed)?;
let incomplete_headers = decoded_response
.into_iter()
.map(|(number, hash)| HeaderId(number, hash))
.collect();
Ok(incomplete_headers)
}
async fn complete_header(
&self,
id: HeaderIdOf<P>,
completion: Justification,
) -> Result<HeaderIdOf<P>, SubstrateError> {
let tx = self.pipeline.make_complete_header_transaction(id, completion).await?;
self.client.submit_extrinsic(Bytes(tx.encode())).await?;
Ok(id)
}
async fn requires_extra(&self, header: QueuedHeader<P>) -> Result<(HeaderIdOf<P>, bool), SubstrateError> {
Ok((header.id(), false))
}
}
+4 -5
View File
@@ -38,10 +38,9 @@ pub type MillauClient = relay_substrate_client::Client<Millau>;
pub type RialtoClient = relay_substrate_client::Client<Rialto>;
mod cli;
mod finality_pipeline;
mod finality_target;
mod headers_initialize;
mod headers_maintain;
mod headers_pipeline;
mod headers_target;
mod messages_lane;
mod messages_source;
mod messages_target;
@@ -101,7 +100,7 @@ async fn run_init_bridge(command: cli::InitBridge) -> Result<(), String> {
&rialto_sign.signer,
rialto_signer_next_index,
rialto_runtime::SudoCall::sudo(Box::new(
rialto_runtime::BridgeMillauCall::initialize(initialization_data).into(),
rialto_runtime::FinalityBridgeMillauCall::initialize(initialization_data).into(),
))
.into(),
)
@@ -137,7 +136,7 @@ async fn run_init_bridge(command: cli::InitBridge) -> Result<(), String> {
&millau_sign.signer,
millau_signer_next_index,
millau_runtime::SudoCall::sudo(Box::new(
millau_runtime::BridgeRialtoCall::initialize(initialization_data).into(),
millau_runtime::FinalityBridgeRialtoCall::initialize(initialization_data).into(),
))
.into(),
)
@@ -1,4 +1,4 @@
// Copyright 2019-2020 Parity Technologies (UK) Ltd.
// Copyright 2019-2021 Parity Technologies (UK) Ltd.
// This file is part of Parity Bridges Common.
// Parity Bridges Common is free software: you can redistribute it and/or modify
@@ -17,70 +17,52 @@
//! Millau-to-Rialto headers sync entrypoint.
use crate::{
headers_pipeline::{SubstrateHeadersSyncPipeline, SubstrateHeadersToSubstrate},
finality_pipeline::{SubstrateFinalitySyncPipeline, SubstrateFinalityToSubstrate},
MillauClient, RialtoClient,
};
use async_trait::async_trait;
use bp_millau::{
BEST_MILLAU_BLOCKS_METHOD, FINALIZED_MILLAU_BLOCK_METHOD, INCOMPLETE_MILLAU_HEADERS_METHOD,
IS_KNOWN_MILLAU_BLOCK_METHOD,
};
use headers_relay::sync_types::QueuedHeader;
use relay_millau_client::{HeaderId as MillauHeaderId, Millau, SyncHeader as MillauSyncHeader};
use relay_rialto_client::{BridgeMillauCall, Rialto, SigningParams as RialtoSigningParams};
use relay_substrate_client::{Error as SubstrateError, TransactionSignScheme};
use relay_millau_client::{Millau, SyncHeader as MillauSyncHeader};
use relay_rialto_client::{Rialto, SigningParams as RialtoSigningParams};
use relay_substrate_client::{finality_source::Justification, Error as SubstrateError, TransactionSignScheme};
use sp_core::Pair;
use sp_runtime::Justification;
/// Millau-to-Rialto headers sync pipeline.
pub(crate) type MillauHeadersToRialto =
SubstrateHeadersToSubstrate<Millau, MillauSyncHeader, Rialto, RialtoSigningParams>;
/// Millau header in-the-queue.
type QueuedMillauHeader = QueuedHeader<MillauHeadersToRialto>;
/// Millau-to-Rialto finality sync pipeline.
pub(crate) type MillauFinalityToRialto = SubstrateFinalityToSubstrate<Millau, Rialto, RialtoSigningParams>;
#[async_trait]
impl SubstrateHeadersSyncPipeline for MillauHeadersToRialto {
const BEST_BLOCK_METHOD: &'static str = BEST_MILLAU_BLOCKS_METHOD;
const FINALIZED_BLOCK_METHOD: &'static str = FINALIZED_MILLAU_BLOCK_METHOD;
const IS_KNOWN_BLOCK_METHOD: &'static str = IS_KNOWN_MILLAU_BLOCK_METHOD;
const INCOMPLETE_HEADERS_METHOD: &'static str = INCOMPLETE_MILLAU_HEADERS_METHOD;
impl SubstrateFinalitySyncPipeline for MillauFinalityToRialto {
const BEST_FINALIZED_SOURCE_HEADER_ID_AT_TARGET: &'static str = bp_millau::BEST_FINALIZED_MILLAU_HEADER_METHOD;
type SignedTransaction = <Rialto as TransactionSignScheme>::SignedTransaction;
async fn make_submit_header_transaction(
async fn make_submit_finality_proof_transaction(
&self,
header: QueuedMillauHeader,
header: MillauSyncHeader,
proof: Justification<bp_millau::Header>,
) -> Result<Self::SignedTransaction, SubstrateError> {
let account_id = self.target_sign.signer.public().as_array_ref().clone().into();
let nonce = self.target_client.next_account_index(account_id).await?;
let call = BridgeMillauCall::import_signed_header(header.header().clone().into_inner()).into();
let transaction = Rialto::sign_transaction(&self.target_client, &self.target_sign.signer, nonce, call);
Ok(transaction)
}
async fn make_complete_header_transaction(
&self,
id: MillauHeaderId,
completion: Justification,
) -> Result<Self::SignedTransaction, SubstrateError> {
let account_id = self.target_sign.signer.public().as_array_ref().clone().into();
let nonce = self.target_client.next_account_index(account_id).await?;
let call = BridgeMillauCall::finalize_header(id.1, completion).into();
let call = rialto_runtime::FinalityBridgeMillauCall::submit_finality_proof(
header.into_inner(),
proof.into_inner(),
(),
)
.into();
let transaction = Rialto::sign_transaction(&self.target_client, &self.target_sign.signer, nonce, call);
Ok(transaction)
}
}
/// Run Millau-to-Rialto headers sync.
/// Run Millau-to-Rialto finality sync.
pub async fn run(
millau_client: MillauClient,
rialto_client: RialtoClient,
rialto_sign: RialtoSigningParams,
metrics_params: Option<relay_utils::metrics::MetricsParams>,
) {
crate::headers_pipeline::run(
MillauHeadersToRialto::new(rialto_client.clone(), rialto_sign),
crate::finality_pipeline::run(
MillauFinalityToRialto::new(rialto_client.clone(), rialto_sign),
millau_client,
rialto_client,
metrics_params,
@@ -51,8 +51,8 @@ impl SubstrateMessageLane for MillauMessagesToRialto {
bp_millau::FROM_MILLAU_LATEST_CONFIRMED_NONCE_METHOD;
const INBOUND_LANE_UNREWARDED_RELAYERS_STATE: &'static str = bp_millau::FROM_MILLAU_UNREWARDED_RELAYERS_STATE;
const BEST_FINALIZED_SOURCE_HEADER_ID_AT_TARGET: &'static str = bp_millau::FINALIZED_MILLAU_BLOCK_METHOD;
const BEST_FINALIZED_TARGET_HEADER_ID_AT_SOURCE: &'static str = bp_rialto::FINALIZED_RIALTO_BLOCK_METHOD;
const BEST_FINALIZED_SOURCE_HEADER_ID_AT_TARGET: &'static str = bp_millau::BEST_FINALIZED_MILLAU_HEADER_METHOD;
const BEST_FINALIZED_TARGET_HEADER_ID_AT_SOURCE: &'static str = bp_rialto::BEST_FINALIZED_RIALTO_HEADER_METHOD;
type SourceSignedTransaction = <Millau as TransactionSignScheme>::SignedTransaction;
type TargetSignedTransaction = <Rialto as TransactionSignScheme>::SignedTransaction;
@@ -17,69 +17,52 @@
//! Rialto-to-Millau headers sync entrypoint.
use crate::{
headers_pipeline::{SubstrateHeadersSyncPipeline, SubstrateHeadersToSubstrate},
finality_pipeline::{SubstrateFinalitySyncPipeline, SubstrateFinalityToSubstrate},
MillauClient, RialtoClient,
};
use async_trait::async_trait;
use bp_rialto::{
BEST_RIALTO_BLOCKS_METHOD, FINALIZED_RIALTO_BLOCK_METHOD, INCOMPLETE_RIALTO_HEADERS_METHOD,
IS_KNOWN_RIALTO_BLOCK_METHOD,
};
use headers_relay::sync_types::QueuedHeader;
use relay_millau_client::{BridgeRialtoCall, Millau, SigningParams as MillauSigningParams};
use relay_rialto_client::{HeaderId as RialtoHeaderId, Rialto, SyncHeader as RialtoSyncHeader};
use relay_substrate_client::{Error as SubstrateError, TransactionSignScheme};
use relay_millau_client::{Millau, SigningParams as MillauSigningParams};
use relay_rialto_client::{Rialto, SyncHeader as RialtoSyncHeader};
use relay_substrate_client::{finality_source::Justification, Error as SubstrateError, TransactionSignScheme};
use sp_core::Pair;
use sp_runtime::Justification;
/// Rialto-to-Millau headers sync pipeline.
type RialtoHeadersToMillau = SubstrateHeadersToSubstrate<Rialto, RialtoSyncHeader, Millau, MillauSigningParams>;
/// Rialto header in-the-queue.
type QueuedRialtoHeader = QueuedHeader<RialtoHeadersToMillau>;
/// Rialto-to-Millau finality sync pipeline.
pub(crate) type RialtoFinalityToMillau = SubstrateFinalityToSubstrate<Rialto, Millau, MillauSigningParams>;
#[async_trait]
impl SubstrateHeadersSyncPipeline for RialtoHeadersToMillau {
const BEST_BLOCK_METHOD: &'static str = BEST_RIALTO_BLOCKS_METHOD;
const FINALIZED_BLOCK_METHOD: &'static str = FINALIZED_RIALTO_BLOCK_METHOD;
const IS_KNOWN_BLOCK_METHOD: &'static str = IS_KNOWN_RIALTO_BLOCK_METHOD;
const INCOMPLETE_HEADERS_METHOD: &'static str = INCOMPLETE_RIALTO_HEADERS_METHOD;
impl SubstrateFinalitySyncPipeline for RialtoFinalityToMillau {
const BEST_FINALIZED_SOURCE_HEADER_ID_AT_TARGET: &'static str = bp_rialto::BEST_FINALIZED_RIALTO_HEADER_METHOD;
type SignedTransaction = <Millau as TransactionSignScheme>::SignedTransaction;
async fn make_submit_header_transaction(
async fn make_submit_finality_proof_transaction(
&self,
header: QueuedRialtoHeader,
header: RialtoSyncHeader,
proof: Justification<bp_rialto::Header>,
) -> Result<Self::SignedTransaction, SubstrateError> {
let account_id = self.target_sign.signer.public().as_array_ref().clone().into();
let nonce = self.target_client.next_account_index(account_id).await?;
let call = BridgeRialtoCall::import_signed_header(header.header().clone().into_inner()).into();
let transaction = Millau::sign_transaction(&self.target_client, &self.target_sign.signer, nonce, call);
Ok(transaction)
}
async fn make_complete_header_transaction(
&self,
id: RialtoHeaderId,
completion: Justification,
) -> Result<Self::SignedTransaction, SubstrateError> {
let account_id = self.target_sign.signer.public().as_array_ref().clone().into();
let nonce = self.target_client.next_account_index(account_id).await?;
let call = BridgeRialtoCall::finalize_header(id.1, completion).into();
let call = millau_runtime::FinalityBridgeRialtoCall::submit_finality_proof(
header.into_inner(),
proof.into_inner(),
(),
)
.into();
let transaction = Millau::sign_transaction(&self.target_client, &self.target_sign.signer, nonce, call);
Ok(transaction)
}
}
/// Run Rialto-to-Millau headers sync.
/// Run Rialto-to-Millau finality sync.
pub async fn run(
rialto_client: RialtoClient,
millau_client: MillauClient,
millau_sign: MillauSigningParams,
metrics_params: Option<relay_utils::metrics::MetricsParams>,
) {
crate::headers_pipeline::run(
RialtoHeadersToMillau::new(millau_client.clone(), millau_sign),
crate::finality_pipeline::run(
RialtoFinalityToMillau::new(millau_client.clone(), millau_sign),
rialto_client,
millau_client,
metrics_params,
@@ -51,8 +51,8 @@ impl SubstrateMessageLane for RialtoMessagesToMillau {
bp_rialto::FROM_RIALTO_LATEST_CONFIRMED_NONCE_METHOD;
const INBOUND_LANE_UNREWARDED_RELAYERS_STATE: &'static str = bp_rialto::FROM_RIALTO_UNREWARDED_RELAYERS_STATE;
const BEST_FINALIZED_SOURCE_HEADER_ID_AT_TARGET: &'static str = bp_rialto::FINALIZED_RIALTO_BLOCK_METHOD;
const BEST_FINALIZED_TARGET_HEADER_ID_AT_SOURCE: &'static str = bp_millau::FINALIZED_MILLAU_BLOCK_METHOD;
const BEST_FINALIZED_SOURCE_HEADER_ID_AT_TARGET: &'static str = bp_rialto::BEST_FINALIZED_RIALTO_HEADER_METHOD;
const BEST_FINALIZED_TARGET_HEADER_ID_AT_SOURCE: &'static str = bp_millau::BEST_FINALIZED_MILLAU_HEADER_METHOD;
type SourceSignedTransaction = <Rialto as TransactionSignScheme>::SignedTransaction;
type TargetSignedTransaction = <Millau as TransactionSignScheme>::SignedTransaction;