Batch transactions in complex relays (#1669)

* batch transactions in message relay: API prototype

* get rid of Box<dyn BatchTransaction> and actually submit it

* test batch transactions

* message_lane_loop_works_with_batch_transactions

* removed logger

* BatchConfirmationTransaction + BatchDeliveryTransaction

* more prototyping

* fmt

* continue with batch calls

* impl BatchCallBuilder for ()

* BatchDeliveryTransaction impl

* BundledBatchCallBuilder

* proper impl of BundledBatchCallBuilder + use it in RialtoParachain -> Millau

* impl prove_header in OnDemandHeadersRelay

* impl OnDemandParachainsRelay::prove_header (needs extensive tests)

* added a couple of TODOs

* return Result<Option<BatchTx>> when asking for more headers

* prove headers when reauire_* is called && return proper headers from required_header_id

* split parachains::prove_header and test select_headers_to_prove

* more traces and leave TODOs

* use finality stream in SubstrateFinalitySource::prove_block_finality

* prove parachain head at block, selected by headers relay

* const ANCIENT_BLOCK_THRESHOLD

* TODO -> proof

* clippy and spelling

* BatchCallBuilder::build_batch_call() returns Result

* read first proof from two streams

* FailedToFindFinalityProof -> FinalityProofNotFound

* changed select_headers_to_prove to version from PR review
This commit is contained in:
Svyatoslav Nikolsky
2022-12-16 15:04:36 +03:00
committed by Bastian Köcher
parent a732a04ed4
commit be27bd5e97
28 changed files with 1190 additions and 152 deletions
@@ -59,4 +59,7 @@ impl SubstrateMessageLane for BridgeHubRococoMessagesToBridgeHubWococoMessageLan
BridgeHubRococoMessagesToBridgeHubWococoMessageLaneReceiveMessagesProofCallBuilder;
type ReceiveMessagesDeliveryProofCallBuilder =
BridgeHubRococoMessagesToBridgeHubWococoMessageLaneReceiveMessagesDeliveryProofCallBuilder;
type SourceBatchCallBuilder = ();
type TargetBatchCallBuilder = ();
}
@@ -59,4 +59,7 @@ impl SubstrateMessageLane for BridgeHubWococoMessagesToBridgeHubRococoMessageLan
BridgeHubWococoMessagesToBridgeHubRococoMessageLaneReceiveMessagesProofCallBuilder;
type ReceiveMessagesDeliveryProofCallBuilder =
BridgeHubWococoMessagesToBridgeHubRococoMessageLaneReceiveMessagesDeliveryProofCallBuilder;
type SourceBatchCallBuilder = ();
type TargetBatchCallBuilder = ();
}
@@ -41,4 +41,7 @@ impl SubstrateMessageLane for MillauMessagesToRialto {
millau_runtime::Runtime,
millau_runtime::WithRialtoMessagesInstance,
>;
type SourceBatchCallBuilder = ();
type TargetBatchCallBuilder = ();
}
@@ -41,4 +41,7 @@ impl SubstrateMessageLane for MillauMessagesToRialtoParachain {
millau_runtime::Runtime,
millau_runtime::WithRialtoParachainMessagesInstance,
>;
type SourceBatchCallBuilder = ();
type TargetBatchCallBuilder = ();
}
@@ -41,4 +41,7 @@ impl SubstrateMessageLane for RialtoMessagesToMillau {
rialto_runtime::Runtime,
rialto_runtime::WithMillauMessagesInstance,
>;
type SourceBatchCallBuilder = ();
type TargetBatchCallBuilder = ();
}
@@ -18,9 +18,12 @@
use relay_millau_client::Millau;
use relay_rialto_parachain_client::RialtoParachain;
use substrate_relay_helper::messages_lane::{
DirectReceiveMessagesDeliveryProofCallBuilder, DirectReceiveMessagesProofCallBuilder,
SubstrateMessageLane,
use substrate_relay_helper::{
messages_lane::{
DirectReceiveMessagesDeliveryProofCallBuilder, DirectReceiveMessagesProofCallBuilder,
SubstrateMessageLane,
},
BundledBatchCallBuilder,
};
/// Description of RialtoParachain -> Millau messages bridge.
@@ -41,4 +44,7 @@ impl SubstrateMessageLane for RialtoParachainMessagesToMillau {
rialto_parachain_runtime::Runtime,
rialto_parachain_runtime::WithMillauMessagesInstance,
>;
type SourceBatchCallBuilder = ();
type TargetBatchCallBuilder = BundledBatchCallBuilder<millau_runtime::Runtime>;
}
@@ -59,7 +59,7 @@ use crate::{
declare_chain_cli_schema,
};
use bp_messages::LaneId;
use bp_runtime::{BalanceOf, BlockNumberOf};
use bp_runtime::BalanceOf;
use relay_substrate_client::{
AccountIdOf, AccountKeyPairOf, Chain, ChainWithBalances, ChainWithTransactions, Client,
Parachain,
@@ -167,8 +167,8 @@ where
/// Returns message relay parameters.
fn messages_relay_params(
&self,
source_to_target_headers_relay: Arc<dyn OnDemandRelay<BlockNumberOf<Source>>>,
target_to_source_headers_relay: Arc<dyn OnDemandRelay<BlockNumberOf<Target>>>,
source_to_target_headers_relay: Arc<dyn OnDemandRelay<Source, Target>>,
target_to_source_headers_relay: Arc<dyn OnDemandRelay<Target, Source>>,
lane_id: LaneId,
) -> MessagesRelayParams<Bridge::MessagesLane> {
MessagesRelayParams {
@@ -243,8 +243,8 @@ trait Full2WayBridgeBase: Sized + Send + Sync {
async fn start_on_demand_headers_relayers(
&mut self,
) -> anyhow::Result<(
Arc<dyn OnDemandRelay<BlockNumberOf<Self::Left>>>,
Arc<dyn OnDemandRelay<BlockNumberOf<Self::Right>>>,
Arc<dyn OnDemandRelay<Self::Left, Self::Right>>,
Arc<dyn OnDemandRelay<Self::Right, Self::Left>>,
)>;
}
@@ -23,7 +23,6 @@ use crate::cli::{
CliChain,
};
use bp_polkadot_core::parachains::ParaHash;
use bp_runtime::BlockNumberOf;
use pallet_bridge_parachains::{RelayBlockHash, RelayBlockHasher, RelayBlockNumber};
use relay_substrate_client::{
AccountIdOf, AccountKeyPairOf, Chain, ChainWithTransactions, Client, Parachain,
@@ -215,8 +214,8 @@ where
async fn start_on_demand_headers_relayers(
&mut self,
) -> anyhow::Result<(
Arc<dyn OnDemandRelay<BlockNumberOf<Self::Left>>>,
Arc<dyn OnDemandRelay<BlockNumberOf<Self::Right>>>,
Arc<dyn OnDemandRelay<Self::Left, Self::Right>>,
Arc<dyn OnDemandRelay<Self::Right, Self::Left>>,
)> {
self.common.left.accounts.push(TaggedAccount::Headers {
id: self.right_headers_to_left_transaction_params.signer.public().into(),
@@ -249,31 +248,31 @@ where
.await?;
let left_relay_to_right_on_demand_headers =
OnDemandHeadersRelay::new::<<L2R as ParachainToRelayHeadersCliBridge>::RelayFinality>(
OnDemandHeadersRelay::<<L2R as ParachainToRelayHeadersCliBridge>::RelayFinality>::new(
self.left_relay.clone(),
self.common.right.client.clone(),
self.left_headers_to_right_transaction_params.clone(),
self.common.shared.only_mandatory_headers,
);
let right_relay_to_left_on_demand_headers =
OnDemandHeadersRelay::new::<<R2L as ParachainToRelayHeadersCliBridge>::RelayFinality>(
OnDemandHeadersRelay::<<R2L as ParachainToRelayHeadersCliBridge>::RelayFinality>::new(
self.right_relay.clone(),
self.common.left.client.clone(),
self.right_headers_to_left_transaction_params.clone(),
self.common.shared.only_mandatory_headers,
);
let left_to_right_on_demand_parachains = OnDemandParachainsRelay::new::<
let left_to_right_on_demand_parachains = OnDemandParachainsRelay::<
<L2R as ParachainToRelayHeadersCliBridge>::ParachainFinality,
>(
>::new(
self.left_relay.clone(),
self.common.right.client.clone(),
self.left_parachains_to_right_transaction_params.clone(),
Arc::new(left_relay_to_right_on_demand_headers),
);
let right_to_left_on_demand_parachains = OnDemandParachainsRelay::new::<
let right_to_left_on_demand_parachains = OnDemandParachainsRelay::<
<R2L as ParachainToRelayHeadersCliBridge>::ParachainFinality,
>(
>::new(
self.right_relay.clone(),
self.common.left.client.clone(),
self.right_parachains_to_left_transaction_params.clone(),
@@ -26,7 +26,6 @@ use crate::cli::{
CliChain,
};
use bp_polkadot_core::parachains::ParaHash;
use bp_runtime::BlockNumberOf;
use pallet_bridge_parachains::{RelayBlockHash, RelayBlockHasher, RelayBlockNumber};
use relay_substrate_client::{
AccountIdOf, AccountKeyPairOf, Chain, ChainWithTransactions, Client, Parachain,
@@ -199,8 +198,8 @@ where
async fn start_on_demand_headers_relayers(
&mut self,
) -> anyhow::Result<(
Arc<dyn OnDemandRelay<BlockNumberOf<Self::Left>>>,
Arc<dyn OnDemandRelay<BlockNumberOf<Self::Right>>>,
Arc<dyn OnDemandRelay<Self::Left, Self::Right>>,
Arc<dyn OnDemandRelay<Self::Right, Self::Left>>,
)> {
self.common.left.accounts.push(TaggedAccount::Headers {
id: self.right_headers_to_left_transaction_params.signer.public().into(),
@@ -229,22 +228,22 @@ where
.await?;
let left_to_right_on_demand_headers =
OnDemandHeadersRelay::new::<<L2R as RelayToRelayHeadersCliBridge>::Finality>(
OnDemandHeadersRelay::<<L2R as RelayToRelayHeadersCliBridge>::Finality>::new(
self.common.left.client.clone(),
self.common.right.client.clone(),
self.left_headers_to_right_transaction_params.clone(),
self.common.shared.only_mandatory_headers,
);
let right_relay_to_left_on_demand_headers =
OnDemandHeadersRelay::new::<<R2L as ParachainToRelayHeadersCliBridge>::RelayFinality>(
OnDemandHeadersRelay::<<R2L as ParachainToRelayHeadersCliBridge>::RelayFinality>::new(
self.right_relay.clone(),
self.common.left.client.clone(),
self.right_headers_to_left_transaction_params.clone(),
self.common.shared.only_mandatory_headers,
);
let right_to_left_on_demand_parachains = OnDemandParachainsRelay::new::<
let right_to_left_on_demand_parachains = OnDemandParachainsRelay::<
<R2L as ParachainToRelayHeadersCliBridge>::ParachainFinality,
>(
>::new(
self.right_relay.clone(),
self.common.left.client.clone(),
self.right_parachains_to_left_transaction_params.clone(),
@@ -22,7 +22,6 @@ use crate::cli::{
relay_headers_and_messages::{Full2WayBridgeBase, Full2WayBridgeCommonParams},
CliChain,
};
use bp_runtime::BlockNumberOf;
use relay_substrate_client::{AccountIdOf, AccountKeyPairOf, ChainWithTransactions};
use sp_core::Pair;
use substrate_relay_helper::{
@@ -149,8 +148,8 @@ where
async fn start_on_demand_headers_relayers(
&mut self,
) -> anyhow::Result<(
Arc<dyn OnDemandRelay<BlockNumberOf<Self::Left>>>,
Arc<dyn OnDemandRelay<BlockNumberOf<Self::Right>>>,
Arc<dyn OnDemandRelay<Self::Left, Self::Right>>,
Arc<dyn OnDemandRelay<Self::Right, Self::Left>>,
)> {
self.common.right.accounts.push(TaggedAccount::Headers {
id: self.left_to_right_transaction_params.signer.public().into(),
@@ -175,14 +174,14 @@ where
.await?;
let left_to_right_on_demand_headers =
OnDemandHeadersRelay::new::<<L2R as RelayToRelayHeadersCliBridge>::Finality>(
OnDemandHeadersRelay::<<L2R as RelayToRelayHeadersCliBridge>::Finality>::new(
self.common.left.client.clone(),
self.common.right.client.clone(),
self.left_to_right_transaction_params.clone(),
self.common.shared.only_mandatory_headers,
);
let right_to_left_on_demand_headers =
OnDemandHeadersRelay::new::<<R2L as RelayToRelayHeadersCliBridge>::Finality>(
OnDemandHeadersRelay::<<R2L as RelayToRelayHeadersCliBridge>::Finality>::new(
self.common.right.client.clone(),
self.common.left.client.clone(),
self.right_to_left_transaction_params.clone(),
@@ -21,6 +21,7 @@ thiserror = "1.0.26"
bp-header-chain = { path = "../../primitives/header-chain" }
bp-messages = { path = "../../primitives/messages" }
bp-polkadot-core = { path = "../../primitives/polkadot-core" }
bp-runtime = { path = "../../primitives/runtime" }
pallet-bridge-messages = { path = "../../modules/messages" }
finality-relay = { path = "../finality" }
@@ -57,6 +57,18 @@ const SUB_API_GRANDPA_AUTHORITIES: &str = "GrandpaApi_grandpa_authorities";
const SUB_API_TXPOOL_VALIDATE_TRANSACTION: &str = "TaggedTransactionQueue_validate_transaction";
const MAX_SUBSCRIPTION_CAPACITY: usize = 4096;
/// The difference between best block number and number of its ancestor, that is enough
/// for us to consider that ancestor an "ancient" block with dropped state.
///
/// The relay does not assume that it is connected to the archive node, so it always tries
/// to use the best available chain state. But sometimes it still may use state of some
/// old block. If the state of that block is already dropped, relay will see errors when
/// e.g. it tries to prove something.
///
/// By default Substrate-based nodes are storing state for last 256 blocks. We'll use
/// half of this value.
pub const ANCIENT_BLOCK_THRESHOLD: u32 = 128;
/// Opaque justifications subscription type.
pub struct Subscription<T>(pub(crate) Mutex<futures::channel::mpsc::Receiver<Option<T>>>);
@@ -16,6 +16,7 @@
//! Substrate node RPC errors.
use bp_polkadot_core::parachains::ParaId;
use jsonrpsee::core::Error as RpcError;
use relay_utils::MaybeConnectionError;
use sc_rpc_api::system::Health;
@@ -45,6 +46,12 @@ pub enum Error {
/// Runtime storage is missing some mandatory value.
#[error("Mandatory storage value is missing from the runtime storage.")]
MissingMandatoryStorageValue,
/// Required parachain head is not present at the relay chain.
#[error("Parachain {0:?} head {1} is missing from the relay chain storage.")]
MissingRequiredParachainHead(ParaId, u64),
/// Failed to find finality proof for the given header.
#[error("Failed to find finality proof for header {0}.")]
FinalityProofNotFound(u64),
/// The client we're connected to is not synced, so we can't rely on its state.
#[error("Substrate client is not synced {0}.")]
ClientNotSynced(Health),
+4 -1
View File
@@ -37,7 +37,10 @@ pub use crate::{
ChainWithGrandpa, ChainWithMessages, ChainWithTransactions, Parachain, RelayChain,
SignParam, TransactionStatusOf, UnsignedTransaction,
},
client::{ChainRuntimeVersion, Client, OpaqueGrandpaAuthoritiesSet, Subscription},
client::{
ChainRuntimeVersion, Client, OpaqueGrandpaAuthoritiesSet, Subscription,
ANCIENT_BLOCK_THRESHOLD,
},
error::{Error, Result},
rpc::{SubstrateBeefyFinalityClient, SubstrateFinalityClient, SubstrateGrandpaFinalityClient},
sync_header::SyncHeader,
@@ -41,6 +41,7 @@ bp-messages = { path = "../../primitives/messages" }
frame-support = { git = "https://github.com/paritytech/substrate", branch = "master" }
frame-system = { git = "https://github.com/paritytech/substrate", branch = "master" }
pallet-balances = { git = "https://github.com/paritytech/substrate", branch = "master" }
pallet-utility = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-finality-grandpa = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "master" }
@@ -20,13 +20,18 @@ use crate::finality::{engine::Engine, FinalitySyncPipelineAdapter, SubstrateFina
use async_std::sync::{Arc, Mutex};
use async_trait::async_trait;
use bp_header_chain::FinalityProof;
use codec::Decode;
use finality_relay::SourceClient;
use futures::stream::{unfold, Stream, StreamExt};
use futures::{
select,
stream::{try_unfold, unfold, Stream, StreamExt, TryStreamExt},
};
use num_traits::One;
use relay_substrate_client::{
BlockNumberOf, BlockWithJustification, Chain, Client, Error, HeaderOf,
};
use relay_utils::relay_loop::Client as RelayClient;
use relay_utils::{relay_loop::Client as RelayClient, UniqueSaturatedInto};
use std::pin::Pin;
/// Shared updatable reference to the maximal header number that we want to sync from the source.
@@ -70,6 +75,111 @@ impl<P: SubstrateFinalitySyncPipeline> SubstrateFinalitySource<P> {
// target node may be missing proofs that are already available at the source
self.client.best_finalized_header_number().await
}
/// Return header and its justification of the given block or its descendant that
/// has a GRANDPA justification.
///
/// This method is optimized for cases when `block_number` is close to the best finalized
/// chain block.
pub async fn prove_block_finality(
&self,
block_number: BlockNumberOf<P::SourceChain>,
) -> Result<
(relay_substrate_client::SyncHeader<HeaderOf<P::SourceChain>>, SubstrateFinalityProof<P>),
Error,
> {
// first, subscribe to proofs
let next_persistent_proof =
self.persistent_proofs_stream(block_number + One::one()).await?.fuse();
let next_ephemeral_proof = self.ephemeral_proofs_stream(block_number).await?.fuse();
// in perfect world we'll need to return justfication for the requested `block_number`
let (header, maybe_proof) = self.header_and_finality_proof(block_number).await?;
if let Some(proof) = maybe_proof {
return Ok((header, proof))
}
// otherwise we don't care which header to return, so let's select first
futures::pin_mut!(next_persistent_proof, next_ephemeral_proof);
loop {
select! {
maybe_header_and_proof = next_persistent_proof.next() => match maybe_header_and_proof {
Some(header_and_proof) => return header_and_proof,
None => continue,
},
maybe_header_and_proof = next_ephemeral_proof.next() => match maybe_header_and_proof {
Some(header_and_proof) => return header_and_proof,
None => continue,
},
complete => return Err(Error::FinalityProofNotFound(block_number.unique_saturated_into()))
}
}
}
/// Returns stream of headers and their persistent proofs, starting from given block.
async fn persistent_proofs_stream(
&self,
block_number: BlockNumberOf<P::SourceChain>,
) -> Result<
impl Stream<
Item = Result<
(
relay_substrate_client::SyncHeader<HeaderOf<P::SourceChain>>,
SubstrateFinalityProof<P>,
),
Error,
>,
>,
Error,
> {
let client = self.client.clone();
let best_finalized_block_number = self.client.best_finalized_header_number().await?;
Ok(try_unfold((client, block_number), move |(client, current_block_number)| async move {
// if we've passed the `best_finalized_block_number`, we no longer need persistent
// justifications
if current_block_number > best_finalized_block_number {
return Ok(None)
}
let (header, maybe_proof) =
header_and_finality_proof::<P>(&client, current_block_number).await?;
let next_block_number = current_block_number + One::one();
let next_state = (client, next_block_number);
Ok(Some((maybe_proof.map(|proof| (header, proof)), next_state)))
})
.try_filter_map(|maybe_result| async { Ok(maybe_result) }))
}
/// Returns stream of headers and their ephemeral proofs, starting from given block.
async fn ephemeral_proofs_stream(
&self,
block_number: BlockNumberOf<P::SourceChain>,
) -> Result<
impl Stream<
Item = Result<
(
relay_substrate_client::SyncHeader<HeaderOf<P::SourceChain>>,
SubstrateFinalityProof<P>,
),
Error,
>,
>,
Error,
> {
let client = self.client.clone();
Ok(self.finality_proofs().await?.map(Ok).try_filter_map(move |proof| {
let client = client.clone();
async move {
if proof.target_header_number() < block_number {
return Ok(None)
}
let header = client.header_by_number(proof.target_header_number()).await?;
Ok(Some((header.into(), proof)))
}
}))
}
}
impl<P: SubstrateFinalitySyncPipeline> Clone for SubstrateFinalitySource<P> {
@@ -119,18 +229,7 @@ impl<P: SubstrateFinalitySyncPipeline> SourceClient<FinalitySyncPipelineAdapter<
),
Error,
> {
let header_hash = self.client.block_hash_by_number(number).await?;
let signed_block = self.client.get_block(Some(header_hash)).await?;
let justification = signed_block
.justification(P::FinalityEngine::ID)
.map(|raw_justification| {
SubstrateFinalityProof::<P>::decode(&mut raw_justification.as_slice())
})
.transpose()
.map_err(Error::ResponseParseFailed)?;
Ok((signed_block.header().into(), justification))
header_and_finality_proof::<P>(&self.client, number).await
}
async fn finality_proofs(&self) -> Result<Self::FinalityProofsStream, Error> {
@@ -173,3 +272,27 @@ impl<P: SubstrateFinalitySyncPipeline> SourceClient<FinalitySyncPipelineAdapter<
.boxed())
}
}
async fn header_and_finality_proof<P: SubstrateFinalitySyncPipeline>(
client: &Client<P::SourceChain>,
number: BlockNumberOf<P::SourceChain>,
) -> Result<
(
relay_substrate_client::SyncHeader<HeaderOf<P::SourceChain>>,
Option<SubstrateFinalityProof<P>>,
),
Error,
> {
let header_hash = client.block_hash_by_number(number).await?;
let signed_block = client.get_block(Some(header_hash)).await?;
let justification = signed_block
.justification(P::FinalityEngine::ID)
.map(|raw_justification| {
SubstrateFinalityProof::<P>::decode(&mut raw_justification.as_slice())
})
.transpose()
.map_err(Error::ResponseParseFailed)?;
Ok((signed_block.header().into(), justification))
}
@@ -18,6 +18,9 @@
#![warn(missing_docs)]
use relay_substrate_client::Error as SubstrateError;
use std::marker::PhantomData;
pub mod error;
pub mod finality;
pub mod messages_lane;
@@ -96,3 +99,52 @@ impl<AccountId> TaggedAccount<AccountId> {
}
}
}
/// Batch call builder.
pub trait BatchCallBuilder<Call> {
/// Associated error type.
type Error;
/// If `true`, then batch calls are supported at the chain.
const BATCH_CALL_SUPPORTED: bool;
/// Create batch call from given calls vector.
fn build_batch_call(_calls: Vec<Call>) -> Result<Call, Self::Error>;
}
impl<Call> BatchCallBuilder<Call> for () {
type Error = SubstrateError;
const BATCH_CALL_SUPPORTED: bool = false;
fn build_batch_call(_calls: Vec<Call>) -> Result<Call, SubstrateError> {
debug_assert!(
false,
"only called if `BATCH_CALL_SUPPORTED` is true;\
`BATCH_CALL_SUPPORTED` is false;\
qed"
);
Err(SubstrateError::Custom("<() as BatchCallBuilder>::build_batch_call() is called".into()))
}
}
/// Batch call builder for bundled runtimes.
pub struct BundledBatchCallBuilder<R>(PhantomData<R>);
impl<R> BatchCallBuilder<<R as frame_system::Config>::RuntimeCall> for BundledBatchCallBuilder<R>
where
R: pallet_utility::Config<RuntimeCall = <R as frame_system::Config>::RuntimeCall>,
<R as frame_system::Config>::RuntimeCall: From<pallet_utility::Call<R>>,
{
type Error = SubstrateError;
const BATCH_CALL_SUPPORTED: bool = true;
fn build_batch_call(
mut calls: Vec<<R as frame_system::Config>::RuntimeCall>,
) -> Result<<R as frame_system::Config>::RuntimeCall, SubstrateError> {
Ok(if calls.len() == 1 {
calls.remove(0)
} else {
pallet_utility::Call::batch_all { calls }.into()
})
}
}
@@ -20,7 +20,7 @@ use crate::{
messages_source::{SubstrateMessagesProof, SubstrateMessagesSource},
messages_target::{SubstrateMessagesDeliveryProof, SubstrateMessagesTarget},
on_demand::OnDemandRelay,
TransactionParams,
BatchCallBuilder, TransactionParams,
};
use async_std::sync::Arc;
@@ -35,7 +35,7 @@ use messages_relay::message_lane::MessageLane;
use pallet_bridge_messages::{Call as BridgeMessagesCall, Config as BridgeMessagesConfig};
use relay_substrate_client::{
transaction_stall_timeout, AccountKeyPairOf, BalanceOf, BlockNumberOf, CallOf, Chain,
ChainWithMessages, ChainWithTransactions, Client, HashOf,
ChainWithMessages, ChainWithTransactions, Client, Error as SubstrateError, HashOf,
};
use relay_utils::{
metrics::{GlobalMetrics, MetricsParams, StandaloneMetric},
@@ -55,11 +55,16 @@ pub trait SubstrateMessageLane: 'static + Clone + Debug + Send + Sync {
type ReceiveMessagesProofCallBuilder: ReceiveMessagesProofCallBuilder<Self>;
/// How receive messages delivery proof call is built?
type ReceiveMessagesDeliveryProofCallBuilder: ReceiveMessagesDeliveryProofCallBuilder<Self>;
/// How batch calls are built at the source chain?
type SourceBatchCallBuilder: BatchCallBuilder<CallOf<Self::SourceChain>, Error = SubstrateError>;
/// How batch calls are built at the target chain?
type TargetBatchCallBuilder: BatchCallBuilder<CallOf<Self::TargetChain>, Error = SubstrateError>;
}
/// Adapter that allows all `SubstrateMessageLane` to act as `MessageLane`.
#[derive(Clone, Debug)]
pub(crate) struct MessageLaneAdapter<P: SubstrateMessageLane> {
pub struct MessageLaneAdapter<P: SubstrateMessageLane> {
_phantom: PhantomData<P>,
}
@@ -90,10 +95,10 @@ pub struct MessagesRelayParams<P: SubstrateMessageLane> {
pub target_transaction_params: TransactionParams<AccountKeyPairOf<P::TargetChain>>,
/// Optional on-demand source to target headers relay.
pub source_to_target_headers_relay:
Option<Arc<dyn OnDemandRelay<BlockNumberOf<P::SourceChain>>>>,
Option<Arc<dyn OnDemandRelay<P::SourceChain, P::TargetChain>>>,
/// Optional on-demand target to source headers relay.
pub target_to_source_headers_relay:
Option<Arc<dyn OnDemandRelay<BlockNumberOf<P::TargetChain>>>>,
Option<Arc<dyn OnDemandRelay<P::TargetChain, P::SourceChain>>>,
/// Identifier of lane that needs to be served.
pub lane_id: LaneId,
/// Metrics parameters.
@@ -24,7 +24,7 @@ use crate::{
},
messages_target::SubstrateMessagesDeliveryProof,
on_demand::OnDemandRelay,
TransactionParams,
BatchCallBuilder, TransactionParams,
};
use async_std::sync::Arc;
@@ -41,14 +41,14 @@ use frame_support::weights::Weight;
use messages_relay::{
message_lane::{MessageLane, SourceHeaderIdOf, TargetHeaderIdOf},
message_lane_loop::{
ClientState, MessageDetails, MessageDetailsMap, MessageProofParameters, SourceClient,
SourceClientState,
BatchTransaction, ClientState, MessageDetails, MessageDetailsMap, MessageProofParameters,
SourceClient, SourceClientState,
},
};
use num_traits::Zero;
use relay_substrate_client::{
AccountIdOf, AccountKeyPairOf, BalanceOf, BlockNumberOf, Chain, ChainWithMessages, Client,
Error as SubstrateError, HashOf, HeaderIdOf, IndexOf, SignParam, TransactionEra,
AccountIdOf, AccountKeyPairOf, BalanceOf, BlockNumberOf, CallOf, Chain, ChainWithMessages,
Client, Error as SubstrateError, HashOf, HeaderIdOf, IndexOf, SignParam, TransactionEra,
TransactionTracker, UnsignedTransaction,
};
use relay_utils::{relay_loop::Client as RelayClient, HeaderId};
@@ -68,7 +68,7 @@ pub struct SubstrateMessagesSource<P: SubstrateMessageLane> {
target_client: Client<P::TargetChain>,
lane_id: LaneId,
transaction_params: TransactionParams<AccountKeyPairOf<P::SourceChain>>,
target_to_source_headers_relay: Option<Arc<dyn OnDemandRelay<BlockNumberOf<P::TargetChain>>>>,
target_to_source_headers_relay: Option<Arc<dyn OnDemandRelay<P::TargetChain, P::SourceChain>>>,
}
impl<P: SubstrateMessageLane> SubstrateMessagesSource<P> {
@@ -79,7 +79,7 @@ impl<P: SubstrateMessageLane> SubstrateMessagesSource<P> {
lane_id: LaneId,
transaction_params: TransactionParams<AccountKeyPairOf<P::SourceChain>>,
target_to_source_headers_relay: Option<
Arc<dyn OnDemandRelay<BlockNumberOf<P::TargetChain>>>,
Arc<dyn OnDemandRelay<P::TargetChain, P::SourceChain>>,
>,
) -> Self {
SubstrateMessagesSource {
@@ -140,6 +140,7 @@ impl<P: SubstrateMessageLane> SourceClient<MessageLaneAdapter<P>> for SubstrateM
where
AccountIdOf<P::SourceChain>: From<<AccountKeyPairOf<P::SourceChain> as Pair>::Public>,
{
type BatchTransaction = BatchConfirmationTransaction<P>;
type TransactionTracker = TransactionTracker<P::SourceChain, Client<P::SourceChain>>;
async fn state(&self) -> Result<SourceClientState<MessageLaneAdapter<P>>, SubstrateError> {
@@ -360,10 +361,93 @@ where
.await
}
async fn require_target_header_on_source(&self, id: TargetHeaderIdOf<MessageLaneAdapter<P>>) {
async fn require_target_header_on_source(
&self,
id: TargetHeaderIdOf<MessageLaneAdapter<P>>,
) -> Result<Option<Self::BatchTransaction>, SubstrateError> {
if let Some(ref target_to_source_headers_relay) = self.target_to_source_headers_relay {
if P::SourceBatchCallBuilder::BATCH_CALL_SUPPORTED {
return BatchConfirmationTransaction::<P>::new(self.clone(), id).await.map(Some)
}
target_to_source_headers_relay.require_more_headers(id.0).await;
}
Ok(None)
}
}
/// Batch transaction that brings target headers + and delivery confirmations to the source node.
pub struct BatchConfirmationTransaction<P: SubstrateMessageLane> {
messages_source: SubstrateMessagesSource<P>,
proved_header: TargetHeaderIdOf<MessageLaneAdapter<P>>,
prove_calls: Vec<CallOf<P::SourceChain>>,
}
impl<P: SubstrateMessageLane> BatchConfirmationTransaction<P> {
async fn new(
messages_source: SubstrateMessagesSource<P>,
required_target_header_on_source: TargetHeaderIdOf<MessageLaneAdapter<P>>,
) -> Result<Self, SubstrateError> {
let (proved_header, prove_calls) = messages_source
.target_to_source_headers_relay
.as_ref()
.expect("BatchConfirmationTransaction is only created when target_to_source_headers_relay is Some; qed")
.prove_header(required_target_header_on_source.0)
.await?;
Ok(Self { messages_source, proved_header, prove_calls })
}
}
#[async_trait]
impl<P: SubstrateMessageLane>
BatchTransaction<
TargetHeaderIdOf<MessageLaneAdapter<P>>,
<MessageLaneAdapter<P> as MessageLane>::MessagesReceivingProof,
TransactionTracker<P::SourceChain, Client<P::SourceChain>>,
SubstrateError,
> for BatchConfirmationTransaction<P>
where
AccountIdOf<P::SourceChain>: From<<AccountKeyPairOf<P::SourceChain> as Pair>::Public>,
{
fn required_header_id(&self) -> TargetHeaderIdOf<MessageLaneAdapter<P>> {
self.proved_header
}
async fn append_proof_and_send(
self,
proof: <MessageLaneAdapter<P> as MessageLane>::MessagesReceivingProof,
) -> Result<TransactionTracker<P::SourceChain, Client<P::SourceChain>>, SubstrateError> {
let mut calls = self.prove_calls;
calls.push(
P::ReceiveMessagesDeliveryProofCallBuilder::build_receive_messages_delivery_proof_call(
proof, false,
),
);
let batch_call = P::SourceBatchCallBuilder::build_batch_call(calls)?;
let (spec_version, transaction_version) =
self.messages_source.source_client.simple_runtime_version().await?;
self.messages_source
.source_client
.submit_and_watch_signed_extrinsic(
self.messages_source.transaction_params.signer.public().into(),
SignParam::<P::SourceChain> {
spec_version,
transaction_version,
genesis_hash: *self.messages_source.source_client.genesis_hash(),
signer: self.messages_source.transaction_params.signer.clone(),
},
move |best_block_id, transaction_nonce| {
Ok(UnsignedTransaction::new(batch_call.into(), transaction_nonce).era(
TransactionEra::new(
best_block_id,
self.messages_source.transaction_params.mortality,
),
))
},
)
.await
}
}
@@ -22,7 +22,7 @@ use crate::{
messages_lane::{MessageLaneAdapter, ReceiveMessagesProofCallBuilder, SubstrateMessageLane},
messages_source::{ensure_messages_pallet_active, read_client_state, SubstrateMessagesProof},
on_demand::OnDemandRelay,
TransactionParams,
BatchCallBuilder, TransactionParams,
};
use async_std::sync::Arc;
@@ -34,10 +34,10 @@ use bp_messages::{
use bridge_runtime_common::messages::source::FromBridgedChainMessagesDeliveryProof;
use messages_relay::{
message_lane::{MessageLane, SourceHeaderIdOf, TargetHeaderIdOf},
message_lane_loop::{NoncesSubmitArtifacts, TargetClient, TargetClientState},
message_lane_loop::{BatchTransaction, NoncesSubmitArtifacts, TargetClient, TargetClientState},
};
use relay_substrate_client::{
AccountIdOf, AccountKeyPairOf, BalanceOf, BlockNumberOf, Chain, ChainWithMessages, Client,
AccountIdOf, AccountKeyPairOf, BalanceOf, CallOf, Chain, ChainWithMessages, Client,
Error as SubstrateError, HashOf, HeaderIdOf, IndexOf, SignParam, TransactionEra,
TransactionTracker, UnsignedTransaction,
};
@@ -56,7 +56,7 @@ pub struct SubstrateMessagesTarget<P: SubstrateMessageLane> {
lane_id: LaneId,
relayer_id_at_source: AccountIdOf<P::SourceChain>,
transaction_params: TransactionParams<AccountKeyPairOf<P::TargetChain>>,
source_to_target_headers_relay: Option<Arc<dyn OnDemandRelay<BlockNumberOf<P::SourceChain>>>>,
source_to_target_headers_relay: Option<Arc<dyn OnDemandRelay<P::SourceChain, P::TargetChain>>>,
}
impl<P: SubstrateMessageLane> SubstrateMessagesTarget<P> {
@@ -68,7 +68,7 @@ impl<P: SubstrateMessageLane> SubstrateMessagesTarget<P> {
relayer_id_at_source: AccountIdOf<P::SourceChain>,
transaction_params: TransactionParams<AccountKeyPairOf<P::TargetChain>>,
source_to_target_headers_relay: Option<
Arc<dyn OnDemandRelay<BlockNumberOf<P::SourceChain>>>,
Arc<dyn OnDemandRelay<P::SourceChain, P::TargetChain>>,
>,
) -> Self {
SubstrateMessagesTarget {
@@ -132,6 +132,7 @@ where
AccountIdOf<P::TargetChain>: From<<AccountKeyPairOf<P::TargetChain> as Pair>::Public>,
BalanceOf<P::SourceChain>: TryFrom<BalanceOf<P::TargetChain>>,
{
type BatchTransaction = BatchDeliveryTransaction<P>;
type TransactionTracker = TransactionTracker<P::TargetChain, Client<P::TargetChain>>;
async fn state(&self) -> Result<TargetClientState<MessageLaneAdapter<P>>, SubstrateError> {
@@ -267,13 +268,115 @@ where
Ok(NoncesSubmitArtifacts { nonces, tx_tracker })
}
async fn require_source_header_on_target(&self, id: SourceHeaderIdOf<MessageLaneAdapter<P>>) {
async fn require_source_header_on_target(
&self,
id: SourceHeaderIdOf<MessageLaneAdapter<P>>,
) -> Result<Option<Self::BatchTransaction>, SubstrateError> {
if let Some(ref source_to_target_headers_relay) = self.source_to_target_headers_relay {
if P::TargetBatchCallBuilder::BATCH_CALL_SUPPORTED {
return BatchDeliveryTransaction::<P>::new(self.clone(), id).await.map(Some)
}
source_to_target_headers_relay.require_more_headers(id.0).await;
}
Ok(None)
}
}
/// Batch transaction that brings target headers + and delivery confirmations to the source node.
pub struct BatchDeliveryTransaction<P: SubstrateMessageLane> {
messages_target: SubstrateMessagesTarget<P>,
proved_header: SourceHeaderIdOf<MessageLaneAdapter<P>>,
prove_calls: Vec<CallOf<P::TargetChain>>,
}
impl<P: SubstrateMessageLane> BatchDeliveryTransaction<P> {
async fn new(
messages_target: SubstrateMessagesTarget<P>,
required_source_header_on_target: SourceHeaderIdOf<MessageLaneAdapter<P>>,
) -> Result<Self, SubstrateError> {
let (proved_header, prove_calls) = messages_target
.source_to_target_headers_relay
.as_ref()
.expect("BatchDeliveryTransaction is only created when source_to_target_headers_relay is Some; qed")
.prove_header(required_source_header_on_target.0)
.await?;
Ok(Self { messages_target, proved_header, prove_calls })
}
}
#[async_trait]
impl<P: SubstrateMessageLane>
BatchTransaction<
SourceHeaderIdOf<MessageLaneAdapter<P>>,
<MessageLaneAdapter<P> as MessageLane>::MessagesProof,
TransactionTracker<P::TargetChain, Client<P::TargetChain>>,
SubstrateError,
> for BatchDeliveryTransaction<P>
where
AccountIdOf<P::TargetChain>: From<<AccountKeyPairOf<P::TargetChain> as Pair>::Public>,
{
fn required_header_id(&self) -> SourceHeaderIdOf<MessageLaneAdapter<P>> {
self.proved_header
}
async fn append_proof_and_send(
self,
proof: <MessageLaneAdapter<P> as MessageLane>::MessagesProof,
) -> Result<TransactionTracker<P::TargetChain, Client<P::TargetChain>>, SubstrateError> {
let mut calls = self.prove_calls;
calls.push(make_messages_delivery_call::<P>(
self.messages_target.relayer_id_at_source,
proof.1.nonces_start..=proof.1.nonces_end,
proof,
false,
));
let batch_call = P::TargetBatchCallBuilder::build_batch_call(calls)?;
let (spec_version, transaction_version) =
self.messages_target.target_client.simple_runtime_version().await?;
self.messages_target
.target_client
.submit_and_watch_signed_extrinsic(
self.messages_target.transaction_params.signer.public().into(),
SignParam::<P::TargetChain> {
spec_version,
transaction_version,
genesis_hash: *self.messages_target.target_client.genesis_hash(),
signer: self.messages_target.transaction_params.signer.clone(),
},
move |best_block_id, transaction_nonce| {
Ok(UnsignedTransaction::new(batch_call.into(), transaction_nonce).era(
TransactionEra::new(
best_block_id,
self.messages_target.transaction_params.mortality,
),
))
},
)
.await
}
}
/// Make messages delivery call from given proof.
fn make_messages_delivery_call<P: SubstrateMessageLane>(
relayer_id_at_source: AccountIdOf<P::SourceChain>,
nonces: RangeInclusive<MessageNonce>,
proof: SubstrateMessagesProof<P::SourceChain>,
trace_call: bool,
) -> CallOf<P::TargetChain> {
let messages_count = nonces.end() - nonces.start() + 1;
let dispatch_weight = proof.0;
P::ReceiveMessagesProofCallBuilder::build_receive_messages_proof_call(
relayer_id_at_source,
proof,
messages_count as _,
dispatch_weight,
trace_call,
)
}
/// Make messages delivery transaction from given proof.
fn make_messages_delivery_transaction<P: SubstrateMessageLane>(
target_transaction_params: &TransactionParams<AccountKeyPairOf<P::TargetChain>>,
@@ -284,15 +387,7 @@ fn make_messages_delivery_transaction<P: SubstrateMessageLane>(
proof: SubstrateMessagesProof<P::SourceChain>,
trace_call: bool,
) -> Result<UnsignedTransaction<P::TargetChain>, SubstrateError> {
let messages_count = nonces.end() - nonces.start() + 1;
let dispatch_weight = proof.0;
let call = P::ReceiveMessagesProofCallBuilder::build_receive_messages_proof_call(
relayer_id_at_source,
proof,
messages_count as _,
dispatch_weight,
trace_call,
);
let call = make_messages_delivery_call::<P>(relayer_id_at_source, nonces, proof, trace_call);
Ok(UnsignedTransaction::new(call.into(), transaction_nonce)
.era(TransactionEra::new(target_best_block_id, target_transaction_params.mortality)))
}
@@ -16,15 +16,21 @@
//! On-demand Substrate -> Substrate header finality relay.
use crate::finality::SubmitFinalityProofCallBuilder;
use async_std::sync::{Arc, Mutex};
use async_trait::async_trait;
use bp_header_chain::ConsensusLogReader;
use bp_runtime::HeaderIdProvider;
use futures::{select, FutureExt};
use num_traits::{One, Zero};
use sp_runtime::traits::Header;
use finality_relay::{FinalitySyncParams, TargetClient as FinalityTargetClient};
use relay_substrate_client::{AccountIdOf, AccountKeyPairOf, BlockNumberOf, Chain, Client};
use relay_substrate_client::{
AccountIdOf, AccountKeyPairOf, BlockNumberOf, CallOf, Chain, Client, Error as SubstrateError,
HeaderIdOf,
};
use relay_utils::{
metrics::MetricsParams, relay_loop::Client as RelayClient, FailedClient, MaybeConnectionError,
STALL_TIMEOUT,
@@ -47,16 +53,18 @@ use crate::{
/// relay) needs it to continue its regular work. When enough headers are relayed, on-demand stops
/// syncing headers.
#[derive(Clone)]
pub struct OnDemandHeadersRelay<SourceChain: Chain> {
pub struct OnDemandHeadersRelay<P: SubstrateFinalitySyncPipeline> {
/// Relay task name.
relay_task_name: String,
/// Shared reference to maximal required finalized header number.
required_header_number: RequiredHeaderNumberRef<SourceChain>,
required_header_number: RequiredHeaderNumberRef<P::SourceChain>,
/// Client of the source chain.
source_client: Client<P::SourceChain>,
}
impl<SourceChain: Chain> OnDemandHeadersRelay<SourceChain> {
impl<P: SubstrateFinalitySyncPipeline> OnDemandHeadersRelay<P> {
/// Create new on-demand headers relay.
pub fn new<P: SubstrateFinalitySyncPipeline<SourceChain = SourceChain>>(
pub fn new(
source_client: Client<P::SourceChain>,
target_client: Client<P::TargetChain>,
target_transaction_params: TransactionParams<AccountKeyPairOf<P::TargetChain>>,
@@ -70,6 +78,7 @@ impl<SourceChain: Chain> OnDemandHeadersRelay<SourceChain> {
let this = OnDemandHeadersRelay {
relay_task_name: on_demand_headers_relay_name::<P::SourceChain, P::TargetChain>(),
required_header_number: required_header_number.clone(),
source_client: source_client.clone(),
};
async_std::task::spawn(async move {
background_task::<P>(
@@ -87,23 +96,49 @@ impl<SourceChain: Chain> OnDemandHeadersRelay<SourceChain> {
}
#[async_trait]
impl<SourceChain: Chain> OnDemandRelay<BlockNumberOf<SourceChain>>
for OnDemandHeadersRelay<SourceChain>
impl<P: SubstrateFinalitySyncPipeline> OnDemandRelay<P::SourceChain, P::TargetChain>
for OnDemandHeadersRelay<P>
{
async fn require_more_headers(&self, required_header: BlockNumberOf<SourceChain>) {
async fn require_more_headers(&self, required_header: BlockNumberOf<P::SourceChain>) {
let mut required_header_number = self.required_header_number.lock().await;
if required_header > *required_header_number {
log::trace!(
target: "bridge",
"[{}] More {} headers required. Going to sync up to the {}",
self.relay_task_name,
SourceChain::NAME,
P::SourceChain::NAME,
required_header,
);
*required_header_number = required_header;
}
}
async fn prove_header(
&self,
required_header: BlockNumberOf<P::SourceChain>,
) -> Result<(HeaderIdOf<P::SourceChain>, Vec<CallOf<P::TargetChain>>), SubstrateError> {
// first find proper header (either `required_header`) or its descendant
let finality_source = SubstrateFinalitySource::<P>::new(self.source_client.clone(), None);
let (header, proof) = finality_source.prove_block_finality(required_header).await?;
let header_id = header.id();
log::debug!(
target: "bridge",
"[{}] Requested to prove {} head {:?}. Selected to prove {} head {:?}",
self.relay_task_name,
P::SourceChain::NAME,
required_header,
P::SourceChain::NAME,
header_id,
);
// and then craft the submit-proof call
let call =
P::SubmitFinalityProofCallBuilder::build_submit_finality_proof_call(header, proof);
Ok((header_id, vec![call]))
}
}
/// Background task that is responsible for starting headers relay.
@@ -18,18 +18,28 @@
//! on-demand pipelines.
use async_trait::async_trait;
use relay_substrate_client::{BlockNumberOf, CallOf, Chain, Error as SubstrateError, HeaderIdOf};
pub mod headers;
pub mod parachains;
/// On-demand headers relay that is relaying finalizing headers only when requested.
#[async_trait]
pub trait OnDemandRelay<SourceHeaderNumber>: Send + Sync {
pub trait OnDemandRelay<SourceChain: Chain, TargetChain: Chain>: Send + Sync {
/// Ask relay to relay source header with given number to the target chain.
///
/// Depending on implementation, on-demand relay may also relay `required_header` ancestors
/// (e.g. if they're mandatory), or its descendants. The request is considered complete if
/// the best avbailable header at the target chain has number that is larger than or equal
/// to the `required_header`.
async fn require_more_headers(&self, required_header: SourceHeaderNumber);
async fn require_more_headers(&self, required_header: BlockNumberOf<SourceChain>);
/// Ask relay to prove source `required_header` to the `TargetChain`.
///
/// Returns number of header that is proved (it may be the `required_header` or one of its
/// descendants) and calls for delivering the proof.
async fn prove_header(
&self,
required_header: BlockNumberOf<SourceChain>,
) -> Result<(HeaderIdOf<SourceChain>, Vec<CallOf<TargetChain>>), SubstrateError>;
}
@@ -21,7 +21,7 @@ use crate::{
on_demand::OnDemandRelay,
parachains::{
source::ParachainsSource, target::ParachainsTarget, ParachainsPipelineAdapter,
SubstrateParachainsPipeline,
SubmitParachainHeadsCallBuilder, SubstrateParachainsPipeline,
},
TransactionParams,
};
@@ -31,18 +31,21 @@ use async_std::{
sync::{Arc, Mutex},
};
use async_trait::async_trait;
use bp_polkadot_core::parachains::ParaHash;
use bp_polkadot_core::parachains::{ParaHash, ParaId};
use bp_runtime::HeaderIdProvider;
use futures::{select, FutureExt};
use num_traits::Zero;
use pallet_bridge_parachains::{RelayBlockHash, RelayBlockHasher, RelayBlockNumber};
use parachains_relay::parachains_loop::{AvailableHeader, ParachainSyncParams, TargetClient};
use parachains_relay::parachains_loop::{
AvailableHeader, ParachainSyncParams, SourceClient, TargetClient,
};
use relay_substrate_client::{
AccountIdOf, AccountKeyPairOf, BlockNumberOf, Chain, Client, Error as SubstrateError, HashOf,
ParachainBase,
AccountIdOf, AccountKeyPairOf, BlockNumberOf, CallOf, Chain, Client, Error as SubstrateError,
HashOf, HeaderIdOf, ParachainBase, ANCIENT_BLOCK_THRESHOLD,
};
use relay_utils::{
metrics::MetricsParams, relay_loop::Client as RelayClient, FailedClient, HeaderId,
metrics::MetricsParams, relay_loop::Client as RelayClient, BlockNumberBase, FailedClient,
HeaderId, UniqueSaturatedInto,
};
use std::fmt::Debug;
@@ -52,25 +55,32 @@ use std::fmt::Debug;
/// (e.g. messages relay) needs it to continue its regular work. When enough parachain headers
/// are relayed, on-demand stops syncing headers.
#[derive(Clone)]
pub struct OnDemandParachainsRelay<SourceParachain: Chain> {
pub struct OnDemandParachainsRelay<P: SubstrateParachainsPipeline> {
/// Relay task name.
relay_task_name: String,
/// Channel used to communicate with background task and ask for relay of parachain heads.
required_header_number_sender: Sender<BlockNumberOf<SourceParachain>>,
required_header_number_sender: Sender<BlockNumberOf<P::SourceParachain>>,
/// Source relay chain client.
source_relay_client: Client<P::SourceRelayChain>,
/// Target chain client.
target_client: Client<P::TargetChain>,
/// On-demand relay chain relay.
on_demand_source_relay_to_target_headers:
Arc<dyn OnDemandRelay<P::SourceRelayChain, P::TargetChain>>,
}
impl<SourceParachain: Chain> OnDemandParachainsRelay<SourceParachain> {
impl<P: SubstrateParachainsPipeline> OnDemandParachainsRelay<P> {
/// Create new on-demand parachains relay.
///
/// Note that the argument is the source relay chain client, not the parachain client.
/// That's because parachain finality is determined by the relay chain and we don't
/// need to connect to the parachain itself here.
pub fn new<P: SubstrateParachainsPipeline<SourceParachain = SourceParachain>>(
pub fn new(
source_relay_client: Client<P::SourceRelayChain>,
target_client: Client<P::TargetChain>,
target_transaction_params: TransactionParams<AccountKeyPairOf<P::TargetChain>>,
on_demand_source_relay_to_target_headers: Arc<
dyn OnDemandRelay<BlockNumberOf<P::SourceRelayChain>>,
dyn OnDemandRelay<P::SourceRelayChain, P::TargetChain>,
>,
) -> Self
where
@@ -82,8 +92,13 @@ impl<SourceParachain: Chain> OnDemandParachainsRelay<SourceParachain> {
{
let (required_header_number_sender, required_header_number_receiver) = unbounded();
let this = OnDemandParachainsRelay {
relay_task_name: on_demand_parachains_relay_name::<SourceParachain, P::TargetChain>(),
relay_task_name: on_demand_parachains_relay_name::<P::SourceParachain, P::TargetChain>(
),
required_header_number_sender,
source_relay_client: source_relay_client.clone(),
target_client: target_client.clone(),
on_demand_source_relay_to_target_headers: on_demand_source_relay_to_target_headers
.clone(),
};
async_std::task::spawn(async move {
background_task::<P>(
@@ -101,23 +116,109 @@ impl<SourceParachain: Chain> OnDemandParachainsRelay<SourceParachain> {
}
#[async_trait]
impl<SourceParachain> OnDemandRelay<BlockNumberOf<SourceParachain>>
for OnDemandParachainsRelay<SourceParachain>
impl<P: SubstrateParachainsPipeline> OnDemandRelay<P::SourceParachain, P::TargetChain>
for OnDemandParachainsRelay<P>
where
SourceParachain: Chain,
P::SourceParachain: Chain<Hash = ParaHash>,
{
async fn require_more_headers(&self, required_header: BlockNumberOf<SourceParachain>) {
async fn require_more_headers(&self, required_header: BlockNumberOf<P::SourceParachain>) {
if let Err(e) = self.required_header_number_sender.send(required_header).await {
log::trace!(
target: "bridge",
"[{}] Failed to request {} header {:?}: {:?}",
self.relay_task_name,
SourceParachain::NAME,
P::SourceParachain::NAME,
required_header,
e,
);
}
}
/// Ask relay to prove source `required_header` to the `TargetChain`.
async fn prove_header(
&self,
required_parachain_header: BlockNumberOf<P::SourceParachain>,
) -> Result<(HeaderIdOf<P::SourceParachain>, Vec<CallOf<P::TargetChain>>), SubstrateError> {
// select headers to prove
let parachains_source = ParachainsSource::<P>::new(
self.source_relay_client.clone(),
Arc::new(Mutex::new(AvailableHeader::Missing)),
);
let env = (self, &parachains_source);
let (need_to_prove_relay_block, selected_relay_block, selected_parachain_block) =
select_headers_to_prove(env, required_parachain_header).await?;
log::debug!(
target: "bridge",
"[{}] Requested to prove {} head {:?}. Selected to prove {} head {:?} and {} head {:?}",
self.relay_task_name,
P::SourceParachain::NAME,
required_parachain_header,
P::SourceParachain::NAME,
selected_parachain_block,
P::SourceRelayChain::NAME,
if need_to_prove_relay_block {
Some(selected_relay_block)
} else {
None
},
);
// now let's prove relay chain block (if needed)
let mut calls = Vec::new();
let mut proved_relay_block = selected_relay_block;
if need_to_prove_relay_block {
let (relay_block, relay_prove_call) = self
.on_demand_source_relay_to_target_headers
.prove_header(selected_relay_block.number())
.await?;
proved_relay_block = relay_block;
calls.extend(relay_prove_call);
}
// despite what we've selected before (in `select_headers_to_prove` call), if headers relay
// have chose the different header (e.g. because there's no GRANDPA jusstification for it),
// we need to prove parachain head available at this header
let para_id = ParaId(P::SourceParachain::PARACHAIN_ID);
let mut proved_parachain_block = selected_parachain_block;
if proved_relay_block != selected_relay_block {
proved_parachain_block = parachains_source
.on_chain_para_head_id(proved_relay_block, para_id)
.await?
// this could happen e.g. if parachain has been offboarded?
.ok_or_else(|| {
SubstrateError::MissingRequiredParachainHead(
para_id,
proved_relay_block.number().unique_saturated_into(),
)
})?;
log::debug!(
target: "bridge",
"[{}] Selected to prove {} head {:?} and {} head {:?}. Instead proved {} head {:?} and {} head {:?}",
self.relay_task_name,
P::SourceParachain::NAME,
selected_parachain_block,
P::SourceRelayChain::NAME,
selected_relay_block,
P::SourceParachain::NAME,
proved_parachain_block,
P::SourceRelayChain::NAME,
proved_relay_block,
);
}
// and finally - prove parachain head
let (para_proof, para_hashes) =
parachains_source.prove_parachain_heads(proved_relay_block, &[para_id]).await?;
calls.push(P::SubmitParachainHeadsCallBuilder::build_submit_parachain_heads_call(
proved_relay_block,
para_hashes.into_iter().map(|h| (para_id, h)).collect(),
para_proof,
));
Ok((proved_parachain_block, calls))
}
}
/// Background task that is responsible for starting parachain headers relay.
@@ -126,7 +227,7 @@ async fn background_task<P: SubstrateParachainsPipeline>(
target_client: Client<P::TargetChain>,
target_transaction_params: TransactionParams<AccountKeyPairOf<P::TargetChain>>,
on_demand_source_relay_to_target_headers: Arc<
dyn OnDemandRelay<BlockNumberOf<P::SourceRelayChain>>,
dyn OnDemandRelay<P::SourceRelayChain, P::TargetChain>,
>,
required_parachain_header_number_receiver: Receiver<BlockNumberOf<P::SourceParachain>>,
) where
@@ -487,6 +588,125 @@ where
RelayState::RelayingParaHeader(para_header_at_source)
}
/// Environment for the `select_headers_to_prove` call.
#[async_trait]
trait SelectHeadersToProveEnvironment<RBN, RBH, PBN, PBH> {
/// Returns associated parachain id.
fn parachain_id(&self) -> ParaId;
/// Returns best finalized relay block.
async fn best_finalized_relay_block_at_source(
&self,
) -> Result<HeaderId<RBH, RBN>, SubstrateError>;
/// Returns best finalized relay block that is known at `P::TargetChain`.
async fn best_finalized_relay_block_at_target(
&self,
) -> Result<HeaderId<RBH, RBN>, SubstrateError>;
/// Returns best finalized parachain block at given source relay chain block.
async fn best_finalized_para_block_at_source(
&self,
at_relay_block: HeaderId<RBH, RBN>,
) -> Result<Option<HeaderId<PBH, PBN>>, SubstrateError>;
}
#[async_trait]
impl<'a, P: SubstrateParachainsPipeline>
SelectHeadersToProveEnvironment<
BlockNumberOf<P::SourceRelayChain>,
HashOf<P::SourceRelayChain>,
BlockNumberOf<P::SourceParachain>,
HashOf<P::SourceParachain>,
> for (&'a OnDemandParachainsRelay<P>, &'a ParachainsSource<P>)
{
fn parachain_id(&self) -> ParaId {
ParaId(P::SourceParachain::PARACHAIN_ID)
}
async fn best_finalized_relay_block_at_source(
&self,
) -> Result<HeaderIdOf<P::SourceRelayChain>, SubstrateError> {
Ok(self.0.source_relay_client.best_finalized_header().await?.id())
}
async fn best_finalized_relay_block_at_target(
&self,
) -> Result<HeaderIdOf<P::SourceRelayChain>, SubstrateError> {
Ok(crate::messages_source::read_client_state::<P::TargetChain, P::SourceRelayChain>(
&self.0.target_client,
None,
P::SourceRelayChain::BEST_FINALIZED_HEADER_ID_METHOD,
)
.await?
.best_finalized_peer_at_best_self)
}
async fn best_finalized_para_block_at_source(
&self,
at_relay_block: HeaderIdOf<P::SourceRelayChain>,
) -> Result<Option<HeaderIdOf<P::SourceParachain>>, SubstrateError> {
self.1.on_chain_para_head_id(at_relay_block, self.parachain_id()).await
}
}
/// Given request to prove `required_parachain_header`, select actual headers that need to be
/// proved.
async fn select_headers_to_prove<RBN, RBH, PBN, PBH>(
env: impl SelectHeadersToProveEnvironment<RBN, RBH, PBN, PBH>,
required_parachain_header: PBN,
) -> Result<(bool, HeaderId<RBH, RBN>, HeaderId<PBH, PBN>), SubstrateError>
where
RBH: Copy,
RBN: BlockNumberBase,
PBH: Copy,
PBN: BlockNumberBase,
{
// parachains proof also requires relay header proof. Let's first select relay block
// number that we'll be dealing with
let best_finalized_relay_block_at_source = env.best_finalized_relay_block_at_source().await?;
let best_finalized_relay_block_at_target = env.best_finalized_relay_block_at_target().await?;
// if we can't prove `required_header` even using `best_finalized_relay_block_at_source`, we
// can't do anything here
// (this shall not actually happen, given current code, because we only require finalized
// headers)
let best_possible_parachain_block = env
.best_finalized_para_block_at_source(best_finalized_relay_block_at_source)
.await?
.filter(|best_possible_parachain_block| {
best_possible_parachain_block.number() >= required_parachain_header
})
.ok_or(SubstrateError::MissingRequiredParachainHead(
env.parachain_id(),
required_parachain_header.unique_saturated_into(),
))?;
// now let's check if `required_header` may be proved using
// `best_finalized_relay_block_at_target`
let selection = env
.best_finalized_para_block_at_source(best_finalized_relay_block_at_target)
.await?
.filter(|best_finalized_para_block_at_target| {
best_finalized_para_block_at_target.number() >= required_parachain_header
})
.map(|best_finalized_para_block_at_target| {
(false, best_finalized_relay_block_at_target, best_finalized_para_block_at_target)
})
// we don't require source node to be archive, so we can't craft storage proofs using
// ancient headers. So if the `best_finalized_relay_block_at_target` is too ancient, we
// can't craft storage proofs using it
.filter(|(_, selected_relay_block, _)| {
let difference = best_finalized_relay_block_at_source
.number()
.saturating_sub(selected_relay_block.number());
difference <= RBN::from(ANCIENT_BLOCK_THRESHOLD)
});
Ok(selection.unwrap_or((
true,
best_finalized_relay_block_at_source,
best_possible_parachain_block,
)))
}
#[cfg(test)]
mod tests {
use super::*;
@@ -705,4 +925,80 @@ mod tests {
RelayState::RelayingRelayHeader(800),
);
}
// tuple is:
//
// - best_finalized_relay_block_at_source
// - best_finalized_relay_block_at_target
// - best_finalized_para_block_at_source at best_finalized_relay_block_at_source
// - best_finalized_para_block_at_source at best_finalized_relay_block_at_target
#[async_trait]
impl SelectHeadersToProveEnvironment<u32, u32, u32, u32> for (u32, u32, u32, u32) {
fn parachain_id(&self) -> ParaId {
ParaId(0)
}
async fn best_finalized_relay_block_at_source(
&self,
) -> Result<HeaderId<u32, u32>, SubstrateError> {
Ok(HeaderId(self.0, self.0))
}
async fn best_finalized_relay_block_at_target(
&self,
) -> Result<HeaderId<u32, u32>, SubstrateError> {
Ok(HeaderId(self.1, self.1))
}
async fn best_finalized_para_block_at_source(
&self,
at_relay_block: HeaderId<u32, u32>,
) -> Result<Option<HeaderId<u32, u32>>, SubstrateError> {
if at_relay_block.0 == self.0 {
Ok(Some(HeaderId(self.2, self.2)))
} else if at_relay_block.0 == self.1 {
Ok(Some(HeaderId(self.3, self.3)))
} else {
Ok(None)
}
}
}
#[async_std::test]
async fn select_headers_to_prove_returns_err_if_required_para_block_is_missing_at_source() {
assert!(matches!(
select_headers_to_prove((20_u32, 10_u32, 200_u32, 100_u32), 300_u32,).await,
Err(SubstrateError::MissingRequiredParachainHead(ParaId(0), 300_u64)),
));
}
#[async_std::test]
async fn select_headers_to_prove_fails_to_use_existing_ancient_relay_block() {
assert_eq!(
select_headers_to_prove((220_u32, 10_u32, 200_u32, 100_u32), 100_u32,)
.await
.map_err(drop),
Ok((true, HeaderId(220, 220), HeaderId(200, 200))),
);
}
#[async_std::test]
async fn select_headers_to_prove_is_able_to_use_existing_recent_relay_block() {
assert_eq!(
select_headers_to_prove((40_u32, 10_u32, 200_u32, 100_u32), 100_u32,)
.await
.map_err(drop),
Ok((false, HeaderId(10, 10), HeaderId(100, 100))),
);
}
#[async_std::test]
async fn select_headers_to_prove_uses_new_relay_block() {
assert_eq!(
select_headers_to_prove((20_u32, 10_u32, 200_u32, 100_u32), 200_u32,)
.await
.map_err(drop),
Ok((true, HeaderId(20, 20), HeaderId(200, 200))),
);
}
}
+232 -29
View File
@@ -109,9 +109,28 @@ pub struct NoncesSubmitArtifacts<T> {
pub tx_tracker: T,
}
/// Batch transaction that already submit some headers and needs to be extended with
/// messages/delivery proof before sending.
#[async_trait]
pub trait BatchTransaction<HeaderId, Proof, TransactionTracker, Error>: Send {
/// Header that was required in the original call and which is bundled within this
/// batch transaction.
fn required_header_id(&self) -> HeaderId;
/// Append proof and send transaction to the connected node.
async fn append_proof_and_send(self, proof: Proof) -> Result<TransactionTracker, Error>;
}
/// Source client trait.
#[async_trait]
pub trait SourceClient<P: MessageLane>: RelayClient {
/// Type of batch transaction that submits finality and message receiving proof.
type BatchTransaction: BatchTransaction<
TargetHeaderIdOf<P>,
P::MessagesReceivingProof,
Self::TransactionTracker,
Self::Error,
>;
/// Transaction tracker to track submitted transactions.
type TransactionTracker: TransactionTracker<HeaderId = SourceHeaderIdOf<P>>;
@@ -156,12 +175,31 @@ pub trait SourceClient<P: MessageLane>: RelayClient {
) -> Result<Self::TransactionTracker, Self::Error>;
/// We need given finalized target header on source to continue synchronization.
async fn require_target_header_on_source(&self, id: TargetHeaderIdOf<P>);
///
/// We assume that the absence of header `id` has already been checked by caller.
///
/// The client may return `Some(_)`, which means that nothing has happened yet and
/// the caller must generate and append message receiving proof to the batch transaction
/// to actually send it (along with required header) to the node.
///
/// If function has returned `None`, it means that the caller now must wait for the
/// appearance of the target header `id` at the source client.
async fn require_target_header_on_source(
&self,
id: TargetHeaderIdOf<P>,
) -> Result<Option<Self::BatchTransaction>, Self::Error>;
}
/// Target client trait.
#[async_trait]
pub trait TargetClient<P: MessageLane>: RelayClient {
/// Type of batch transaction that submits finality and messages proof.
type BatchTransaction: BatchTransaction<
SourceHeaderIdOf<P>,
P::MessagesProof,
Self::TransactionTracker,
Self::Error,
>;
/// Transaction tracker to track submitted transactions.
type TransactionTracker: TransactionTracker<HeaderId = TargetHeaderIdOf<P>>;
@@ -201,7 +239,17 @@ pub trait TargetClient<P: MessageLane>: RelayClient {
) -> Result<NoncesSubmitArtifacts<Self::TransactionTracker>, Self::Error>;
/// We need given finalized source header on target to continue synchronization.
async fn require_source_header_on_target(&self, id: SourceHeaderIdOf<P>);
///
/// The client may return `Some(_)`, which means that nothing has happened yet and
/// the caller must generate and append messages proof to the batch transaction
/// to actually send it (along with required header) to the node.
///
/// If function has returned `None`, it means that the caller now must wait for the
/// appearance of the source header `id` at the target client.
async fn require_source_header_on_target(
&self,
id: SourceHeaderIdOf<P>,
) -> Result<Option<Self::BatchTransaction>, Self::Error>;
}
/// State of the client.
@@ -483,6 +531,61 @@ pub(crate) mod tests {
type TargetHeaderHash = TestTargetHeaderHash;
}
#[derive(Clone, Debug)]
pub struct TestMessagesBatchTransaction {
data: Arc<Mutex<TestClientData>>,
required_header_id: TestSourceHeaderId,
tx_tracker: TestTransactionTracker,
}
#[async_trait]
impl BatchTransaction<TestSourceHeaderId, TestMessagesProof, TestTransactionTracker, TestError>
for TestMessagesBatchTransaction
{
fn required_header_id(&self) -> TestSourceHeaderId {
self.required_header_id
}
async fn append_proof_and_send(
self,
proof: TestMessagesProof,
) -> Result<TestTransactionTracker, TestError> {
let mut data = self.data.lock();
data.receive_messages(proof);
Ok(self.tx_tracker)
}
}
#[derive(Clone, Debug)]
pub struct TestConfirmationBatchTransaction {
data: Arc<Mutex<TestClientData>>,
required_header_id: TestTargetHeaderId,
tx_tracker: TestTransactionTracker,
}
#[async_trait]
impl
BatchTransaction<
TestTargetHeaderId,
TestMessagesReceivingProof,
TestTransactionTracker,
TestError,
> for TestConfirmationBatchTransaction
{
fn required_header_id(&self) -> TestTargetHeaderId {
self.required_header_id
}
async fn append_proof_and_send(
self,
proof: TestMessagesReceivingProof,
) -> Result<TestTransactionTracker, TestError> {
let mut data = self.data.lock();
data.receive_messages_delivery_proof(proof);
Ok(self.tx_tracker)
}
}
#[derive(Clone, Debug)]
pub struct TestTransactionTracker(TrackedTransactionStatus<TestTargetHeaderId>);
@@ -517,8 +620,10 @@ pub(crate) mod tests {
target_latest_confirmed_received_nonce: MessageNonce,
target_tracked_transaction_status: TrackedTransactionStatus<TestTargetHeaderId>,
submitted_messages_proofs: Vec<TestMessagesProof>,
target_to_source_batch_transaction: Option<TestConfirmationBatchTransaction>,
target_to_source_header_required: Option<TestTargetHeaderId>,
target_to_source_header_requirements: Vec<TestTargetHeaderId>,
source_to_target_batch_transaction: Option<TestMessagesBatchTransaction>,
source_to_target_header_required: Option<TestSourceHeaderId>,
source_to_target_header_requirements: Vec<TestSourceHeaderId>,
}
@@ -546,14 +651,38 @@ pub(crate) mod tests {
Default::default(),
)),
submitted_messages_proofs: Vec::new(),
target_to_source_batch_transaction: None,
target_to_source_header_required: None,
target_to_source_header_requirements: Vec::new(),
source_to_target_batch_transaction: None,
source_to_target_header_required: None,
source_to_target_header_requirements: Vec::new(),
}
}
}
impl TestClientData {
fn receive_messages(&mut self, proof: TestMessagesProof) {
self.target_state.best_self =
HeaderId(self.target_state.best_self.0 + 1, self.target_state.best_self.1 + 1);
self.target_state.best_finalized_self = self.target_state.best_self;
self.target_latest_received_nonce = *proof.0.end();
if let Some(target_latest_confirmed_received_nonce) = proof.1 {
self.target_latest_confirmed_received_nonce =
target_latest_confirmed_received_nonce;
}
self.submitted_messages_proofs.push(proof);
}
fn receive_messages_delivery_proof(&mut self, proof: TestMessagesReceivingProof) {
self.source_state.best_self =
HeaderId(self.source_state.best_self.0 + 1, self.source_state.best_self.1 + 1);
self.source_state.best_finalized_self = self.source_state.best_self;
self.submitted_messages_receiving_proofs.push(proof);
self.source_latest_confirmed_received_nonce = proof;
}
}
#[derive(Clone)]
pub struct TestSourceClient {
data: Arc<Mutex<TestClientData>>,
@@ -588,6 +717,7 @@ pub(crate) mod tests {
#[async_trait]
impl SourceClient<TestMessageLane> for TestSourceClient {
type BatchTransaction = TestConfirmationBatchTransaction;
type TransactionTracker = TestTransactionTracker;
async fn state(&self) -> Result<SourceClientState<TestMessageLane>, TestError> {
@@ -675,21 +805,25 @@ pub(crate) mod tests {
) -> Result<Self::TransactionTracker, TestError> {
let mut data = self.data.lock();
(self.tick)(&mut data);
data.source_state.best_self =
HeaderId(data.source_state.best_self.0 + 1, data.source_state.best_self.1 + 1);
data.source_state.best_finalized_self = data.source_state.best_self;
data.submitted_messages_receiving_proofs.push(proof);
data.source_latest_confirmed_received_nonce = proof;
data.receive_messages_delivery_proof(proof);
(self.post_tick)(&mut data);
Ok(TestTransactionTracker(data.source_tracked_transaction_status))
}
async fn require_target_header_on_source(&self, id: TargetHeaderIdOf<TestMessageLane>) {
async fn require_target_header_on_source(
&self,
id: TargetHeaderIdOf<TestMessageLane>,
) -> Result<Option<Self::BatchTransaction>, Self::Error> {
let mut data = self.data.lock();
data.target_to_source_header_required = Some(id);
data.target_to_source_header_requirements.push(id);
(self.tick)(&mut data);
(self.post_tick)(&mut data);
Ok(data.target_to_source_batch_transaction.take().map(|mut tx| {
tx.required_header_id = id;
tx
}))
}
}
@@ -727,6 +861,7 @@ pub(crate) mod tests {
#[async_trait]
impl TargetClient<TestMessageLane> for TestTargetClient {
type BatchTransaction = TestMessagesBatchTransaction;
type TransactionTracker = TestTransactionTracker;
async fn state(&self) -> Result<TargetClientState<TestMessageLane>, TestError> {
@@ -798,15 +933,7 @@ pub(crate) mod tests {
if data.is_target_fails {
return Err(TestError)
}
data.target_state.best_self =
HeaderId(data.target_state.best_self.0 + 1, data.target_state.best_self.1 + 1);
data.target_state.best_finalized_self = data.target_state.best_self;
data.target_latest_received_nonce = *proof.0.end();
if let Some(target_latest_confirmed_received_nonce) = proof.1 {
data.target_latest_confirmed_received_nonce =
target_latest_confirmed_received_nonce;
}
data.submitted_messages_proofs.push(proof);
data.receive_messages(proof);
(self.post_tick)(&mut data);
Ok(NoncesSubmitArtifacts {
nonces,
@@ -814,17 +941,25 @@ pub(crate) mod tests {
})
}
async fn require_source_header_on_target(&self, id: SourceHeaderIdOf<TestMessageLane>) {
async fn require_source_header_on_target(
&self,
id: SourceHeaderIdOf<TestMessageLane>,
) -> Result<Option<Self::BatchTransaction>, Self::Error> {
let mut data = self.data.lock();
data.source_to_target_header_required = Some(id);
data.source_to_target_header_requirements.push(id);
(self.tick)(&mut data);
(self.post_tick)(&mut data);
Ok(data.source_to_target_batch_transaction.take().map(|mut tx| {
tx.required_header_id = id;
tx
}))
}
}
fn run_loop_test(
data: TestClientData,
data: Arc<Mutex<TestClientData>>,
source_tick: Arc<dyn Fn(&mut TestClientData) + Send + Sync>,
source_post_tick: Arc<dyn Fn(&mut TestClientData) + Send + Sync>,
target_tick: Arc<dyn Fn(&mut TestClientData) + Send + Sync>,
@@ -832,8 +967,6 @@ pub(crate) mod tests {
exit_signal: impl Future<Output = ()> + 'static + Send,
) -> TestClientData {
async_std::task::block_on(async {
let data = Arc::new(Mutex::new(data));
let source_client = TestSourceClient {
data: data.clone(),
tick: source_tick,
@@ -876,7 +1009,7 @@ pub(crate) mod tests {
// able to deliver messages.
let (exit_sender, exit_receiver) = unbounded();
let result = run_loop_test(
TestClientData {
Arc::new(Mutex::new(TestClientData {
is_source_fails: true,
source_state: ClientState {
best_self: HeaderId(0, 0),
@@ -893,7 +1026,7 @@ pub(crate) mod tests {
},
target_latest_received_nonce: 0,
..Default::default()
},
})),
Arc::new(|data: &mut TestClientData| {
if data.is_source_reconnected {
data.is_source_fails = false;
@@ -929,7 +1062,7 @@ pub(crate) mod tests {
let (source_exit_sender, exit_receiver) = unbounded();
let target_exit_sender = source_exit_sender.clone();
let result = run_loop_test(
TestClientData {
Arc::new(Mutex::new(TestClientData {
source_state: ClientState {
best_self: HeaderId(0, 0),
best_finalized_self: HeaderId(0, 0),
@@ -947,7 +1080,7 @@ pub(crate) mod tests {
target_latest_received_nonce: 0,
target_tracked_transaction_status: TrackedTransactionStatus::Lost,
..Default::default()
},
})),
Arc::new(move |data: &mut TestClientData| {
if data.is_source_reconnected {
data.source_tracked_transaction_status =
@@ -980,7 +1113,7 @@ pub(crate) mod tests {
// their corresponding nonce won't be udpated => reconnect will happen
let (exit_sender, exit_receiver) = unbounded();
let result = run_loop_test(
TestClientData {
Arc::new(Mutex::new(TestClientData {
source_state: ClientState {
best_self: HeaderId(0, 0),
best_finalized_self: HeaderId(0, 0),
@@ -996,7 +1129,7 @@ pub(crate) mod tests {
},
target_latest_received_nonce: 0,
..Default::default()
},
})),
Arc::new(move |data: &mut TestClientData| {
// blocks are produced on every tick
data.source_state.best_self =
@@ -1054,7 +1187,7 @@ pub(crate) mod tests {
fn message_lane_loop_works() {
let (exit_sender, exit_receiver) = unbounded();
let result = run_loop_test(
TestClientData {
Arc::new(Mutex::new(TestClientData {
source_state: ClientState {
best_self: HeaderId(10, 10),
best_finalized_self: HeaderId(10, 10),
@@ -1070,7 +1203,7 @@ pub(crate) mod tests {
},
target_latest_received_nonce: 0,
..Default::default()
},
})),
Arc::new(|data: &mut TestClientData| {
// blocks are produced on every tick
data.source_state.best_self =
@@ -1133,4 +1266,74 @@ pub(crate) mod tests {
assert!(!result.target_to_source_header_requirements.is_empty());
assert!(!result.source_to_target_header_requirements.is_empty());
}
#[test]
fn message_lane_loop_works_with_batch_transactions() {
let (exit_sender, exit_receiver) = unbounded();
let original_data = Arc::new(Mutex::new(TestClientData {
source_state: ClientState {
best_self: HeaderId(10, 10),
best_finalized_self: HeaderId(10, 10),
best_finalized_peer_at_best_self: HeaderId(0, 0),
actual_best_finalized_peer_at_best_self: HeaderId(0, 0),
},
source_latest_generated_nonce: 10,
target_state: ClientState {
best_self: HeaderId(0, 0),
best_finalized_self: HeaderId(0, 0),
best_finalized_peer_at_best_self: HeaderId(0, 0),
actual_best_finalized_peer_at_best_self: HeaderId(0, 0),
},
target_latest_received_nonce: 0,
..Default::default()
}));
let target_original_data = original_data.clone();
let source_original_data = original_data.clone();
let result = run_loop_test(
original_data,
Arc::new(|_| {}),
Arc::new(move |data: &mut TestClientData| {
if let Some(target_to_source_header_required) =
data.target_to_source_header_required.take()
{
data.target_to_source_batch_transaction =
Some(TestConfirmationBatchTransaction {
data: source_original_data.clone(),
required_header_id: target_to_source_header_required,
tx_tracker: TestTransactionTracker::default(),
})
}
}),
Arc::new(|_| {}),
Arc::new(move |data: &mut TestClientData| {
if let Some(source_to_target_header_required) =
data.source_to_target_header_required.take()
{
data.source_to_target_batch_transaction = Some(TestMessagesBatchTransaction {
data: target_original_data.clone(),
required_header_id: source_to_target_header_required,
tx_tracker: TestTransactionTracker::default(),
})
}
if data.source_latest_confirmed_received_nonce == 10 {
exit_sender.unbounded_send(()).unwrap();
}
}),
exit_receiver.into_future().map(|(_, _)| ()),
);
// there are no strict restrictions on when reward confirmation should come
// (because `max_unconfirmed_nonces_at_target` is `100` in tests and this confirmation
// depends on the state of both clients)
// => we do not check it here
assert_eq!(result.submitted_messages_proofs[0].0, 1..=4);
assert_eq!(result.submitted_messages_proofs[1].0, 5..=8);
assert_eq!(result.submitted_messages_proofs[2].0, 9..=10);
assert!(!result.submitted_messages_receiving_proofs.is_empty());
// check that we have at least once required new source->target or target->source headers
assert!(!result.target_to_source_header_requirements.is_empty());
assert!(!result.source_to_target_header_requirements.is_empty());
}
}
@@ -171,9 +171,13 @@ where
{
type Error = C::Error;
type TargetNoncesData = DeliveryRaceTargetNoncesData;
type BatchTransaction = C::BatchTransaction;
type TransactionTracker = C::TransactionTracker;
async fn require_source_header(&self, id: SourceHeaderIdOf<P>) {
async fn require_source_header(
&self,
id: SourceHeaderIdOf<P>,
) -> Result<Option<C::BatchTransaction>, Self::Error> {
self.client.require_source_header_on_target(id).await
}
+104 -19
View File
@@ -20,7 +20,7 @@
//! associated data - like messages, lane state, etc) to the target node by
//! generating and submitting proof.
use crate::message_lane_loop::{ClientState, NoncesSubmitArtifacts};
use crate::message_lane_loop::{BatchTransaction, ClientState, NoncesSubmitArtifacts};
use async_trait::async_trait;
use bp_messages::MessageNonce;
@@ -127,12 +127,29 @@ pub trait TargetClient<P: MessageRace> {
type Error: std::fmt::Debug + MaybeConnectionError;
/// Type of the additional data from the target client, used by the race.
type TargetNoncesData: std::fmt::Debug;
/// Type of batch transaction that submits finality and proof to the target node.
type BatchTransaction: BatchTransaction<
P::SourceHeaderId,
P::Proof,
Self::TransactionTracker,
Self::Error,
>;
/// Transaction tracker to track submitted transactions.
type TransactionTracker: TransactionTracker<HeaderId = P::TargetHeaderId>;
/// Ask headers relay to relay finalized headers up to (and including) given header
/// from race source to race target.
async fn require_source_header(&self, id: P::SourceHeaderId);
///
/// The client may return `Some(_)`, which means that nothing has happened yet and
/// the caller must generate and append proof to the batch transaction
/// to actually send it (along with required header) to the node.
///
/// If function has returned `None`, it means that the caller now must wait for the
/// appearance of the required header `id` at the target client.
async fn require_source_header(
&self,
id: P::SourceHeaderId,
) -> Result<Option<Self::BatchTransaction>, Self::Error>;
/// Return nonces that are known to the target client.
async fn nonces(
@@ -242,6 +259,7 @@ pub async fn run<P: MessageRace, SC: SourceClient<P>, TC: TargetClient<P>>(
let mut source_retry_backoff = retry_backoff();
let mut source_client_is_online = true;
let mut source_nonces_required = false;
let mut source_required_header = None;
let source_nonces = futures::future::Fuse::terminated();
let source_generate_proof = futures::future::Fuse::terminated();
let source_go_offline_future = futures::future::Fuse::terminated();
@@ -250,6 +268,8 @@ pub async fn run<P: MessageRace, SC: SourceClient<P>, TC: TargetClient<P>>(
let mut target_client_is_online = true;
let mut target_best_nonces_required = false;
let mut target_finalized_nonces_required = false;
let mut target_batch_transaction = None;
let target_require_source_header = futures::future::Fuse::terminated();
let target_best_nonces = futures::future::Fuse::terminated();
let target_finalized_nonces = futures::future::Fuse::terminated();
let target_submit_proof = futures::future::Fuse::terminated();
@@ -262,6 +282,7 @@ pub async fn run<P: MessageRace, SC: SourceClient<P>, TC: TargetClient<P>>(
source_generate_proof,
source_go_offline_future,
race_target_updated,
target_require_source_header,
target_best_nonces,
target_finalized_nonces,
target_submit_proof,
@@ -326,13 +347,10 @@ pub async fn run<P: MessageRace, SC: SourceClient<P>, TC: TargetClient<P>>(
).fail_if_connection_error(FailedClient::Source)?;
// ask for more headers if we have nonces to deliver and required headers are missing
let required_source_header_id = race_state
source_required_header = race_state
.best_finalized_source_header_id_at_best_target
.as_ref()
.and_then(|best|strategy.required_source_header_at_target(best));
if let Some(required_source_header_id) = required_source_header_id {
race_target.require_source_header(required_source_header_id).await;
}
.and_then(|best| strategy.required_source_header_at_target(best));
},
nonces = target_best_nonces => {
target_best_nonces_required = false;
@@ -378,6 +396,28 @@ pub async fn run<P: MessageRace, SC: SourceClient<P>, TC: TargetClient<P>>(
},
// proof generation and submission
maybe_batch_transaction = target_require_source_header => {
source_required_header = None;
target_client_is_online = process_future_result(
maybe_batch_transaction,
&mut target_retry_backoff,
|maybe_batch_transaction: Option<TC::BatchTransaction>| {
log::debug!(
target: "bridge",
"Target {} client has been asked for more {} headers. Batch tx: {:?}",
P::target_name(),
P::source_name(),
maybe_batch_transaction.is_some(),
);
target_batch_transaction = maybe_batch_transaction;
},
&mut target_go_offline_future,
async_std::task::sleep,
|| format!("Error asking for source headers at {}", P::target_name()),
).fail_if_connection_error(FailedClient::Target)?;
},
proof = source_generate_proof => {
source_client_is_online = process_future_result(
proof,
@@ -409,6 +449,7 @@ pub async fn run<P: MessageRace, SC: SourceClient<P>, TC: TargetClient<P>>(
P::target_name(),
);
target_batch_transaction = None;
race_state.nonces_to_submit = None;
race_state.nonces_submitted = Some(artifacts.nonces);
target_tx_tracker.set(artifacts.tx_tracker.wait().fuse());
@@ -479,8 +520,23 @@ pub async fn run<P: MessageRace, SC: SourceClient<P>, TC: TargetClient<P>>(
if source_client_is_online {
source_client_is_online = false;
// if we've started to submit batch transaction, let's prioritize it
let expected_race_state =
if let Some(ref target_batch_transaction) = target_batch_transaction {
// when selecting nonces for the batch transaction, we assume that the required
// source header is already at the target chain
let required_source_header_at_target =
target_batch_transaction.required_header_id();
let mut expected_race_state = race_state.clone();
expected_race_state.best_finalized_source_header_id_at_best_target =
Some(required_source_header_at_target);
expected_race_state
} else {
race_state.clone()
};
let nonces_to_deliver =
select_nonces_to_deliver(race_state.clone(), &mut strategy).await;
select_nonces_to_deliver(expected_race_state, &mut strategy).await;
let best_at_source = strategy.best_at_source();
if let Some((at_block, nonces_range, proof_parameters)) = nonces_to_deliver {
@@ -491,6 +547,7 @@ pub async fn run<P: MessageRace, SC: SourceClient<P>, TC: TargetClient<P>>(
nonces_range,
at_block,
);
source_generate_proof.set(
race_source.generate_proof(at_block, nonces_range, proof_parameters).fuse(),
);
@@ -518,17 +575,45 @@ pub async fn run<P: MessageRace, SC: SourceClient<P>, TC: TargetClient<P>>(
target_client_is_online = false;
if let Some((at_block, nonces_range, proof)) = race_state.nonces_to_submit.as_ref() {
log::debug!(
target: "bridge",
"Going to submit proof of messages in range {:?} to {} node",
nonces_range,
P::target_name(),
);
target_submit_proof.set(
race_target
.submit_proof(at_block.clone(), nonces_range.clone(), proof.clone())
.fuse(),
);
if let Some(target_batch_transaction) = target_batch_transaction.take() {
log::debug!(
target: "bridge",
"Going to submit batch transaction with header {:?} and proof of messages in range {:?} to {} node",
target_batch_transaction.required_header_id(),
nonces_range,
P::target_name(),
);
let nonces = nonces_range.clone();
target_submit_proof.set(
target_batch_transaction
.append_proof_and_send(proof.clone())
.map(|result| {
result
.map(|tx_tracker| NoncesSubmitArtifacts { nonces, tx_tracker })
})
.left_future()
.fuse(),
);
} else {
log::debug!(
target: "bridge",
"Going to submit proof of messages in range {:?} to {} node",
nonces_range,
P::target_name(),
);
target_submit_proof.set(
race_target
.submit_proof(at_block.clone(), nonces_range.clone(), proof.clone())
.right_future()
.fuse(),
);
}
} else if let Some(source_required_header) = source_required_header.clone() {
log::debug!(target: "bridge", "Going to require {} header {:?} at {}", P::source_name(), source_required_header, P::target_name());
target_require_source_header
.set(race_target.require_source_header(source_required_header).fuse());
} else if target_best_nonces_required {
log::debug!(target: "bridge", "Asking {} about best message nonces", P::target_name());
let at_block = race_state
@@ -155,9 +155,13 @@ where
{
type Error = C::Error;
type TargetNoncesData = ();
type BatchTransaction = C::BatchTransaction;
type TransactionTracker = C::TransactionTracker;
async fn require_source_header(&self, id: TargetHeaderIdOf<P>) {
async fn require_source_header(
&self,
id: TargetHeaderIdOf<P>,
) -> Result<Option<C::BatchTransaction>, Self::Error> {
self.client.require_target_header_on_source(id).await
}
+1 -1
View File
@@ -19,7 +19,7 @@
pub use bp_runtime::HeaderId;
pub use error::Error;
pub use relay_loop::{relay_loop, relay_metrics};
pub use sp_runtime::traits::UniqueSaturatedInto;
pub use sp_runtime::traits::{UniqueSaturatedFrom, UniqueSaturatedInto};
use async_trait::async_trait;
use backoff::{backoff::Backoff, ExponentialBackoff};