From 542ebb565463ac232d0852663f91d46d96424ddf Mon Sep 17 00:00:00 2001 From: Svyatoslav Nikolsky Date: Wed, 25 May 2022 10:54:20 +0300 Subject: [PATCH] Complex RialtoParachain <> Millau relay (#1405) * complex parachain relay * fix spelling --- bridges/bin/millau/runtime/src/lib.rs | 2 +- .../bin/millau/runtime/src/rialto_messages.rs | 2 +- .../runtime/src/rialto_parachain_messages.rs | 19 +- bridges/primitives/chain-millau/src/lib.rs | 3 - .../chain-rialto-parachain/src/lib.rs | 8 + bridges/primitives/chain-rialto/src/lib.rs | 2 + bridges/relays/bin-substrate/Cargo.toml | 1 + .../relays/bin-substrate/src/chains/millau.rs | 9 + .../src/chains/rialto_parachains_to_millau.rs | 16 +- bridges/relays/bin-substrate/src/cli/mod.rs | 2 +- .../src/cli/relay_headers_and_messages.rs | 397 +++++++-- .../bin-substrate/src/cli/relay_parachains.rs | 29 +- bridges/relays/client-rialto/src/lib.rs | 9 +- bridges/relays/client-substrate/src/chain.rs | 14 + bridges/relays/client-substrate/src/client.rs | 5 + bridges/relays/client-substrate/src/lib.rs | 4 +- bridges/relays/lib-substrate-relay/src/lib.rs | 5 +- .../lib-substrate-relay/src/messages_lane.rs | 9 +- .../src/messages_source.rs | 55 +- .../src/messages_target.rs | 13 +- .../headers.rs} | 22 +- .../lib-substrate-relay/src/on_demand/mod.rs | 35 + .../src/on_demand/parachains.rs | 797 ++++++++++++++++++ .../lib-substrate-relay/src/parachains/mod.rs | 110 +++ .../src/parachains/source.rs | 163 ++++ .../target.rs} | 130 +-- .../src/parachains_source.rs | 91 -- 27 files changed, 1639 insertions(+), 313 deletions(-) rename bridges/relays/lib-substrate-relay/src/{on_demand_headers.rs => on_demand/headers.rs} (96%) create mode 100644 bridges/relays/lib-substrate-relay/src/on_demand/mod.rs create mode 100644 bridges/relays/lib-substrate-relay/src/on_demand/parachains.rs create mode 100644 bridges/relays/lib-substrate-relay/src/parachains/mod.rs create mode 100644 bridges/relays/lib-substrate-relay/src/parachains/source.rs rename bridges/relays/lib-substrate-relay/src/{parachains_target.rs => parachains/target.rs} (50%) delete mode 100644 bridges/relays/lib-substrate-relay/src/parachains_source.rs diff --git a/bridges/bin/millau/runtime/src/lib.rs b/bridges/bin/millau/runtime/src/lib.rs index 23cf4d0b05..f032eec844 100644 --- a/bridges/bin/millau/runtime/src/lib.rs +++ b/bridges/bin/millau/runtime/src/lib.rs @@ -805,7 +805,7 @@ impl_runtime_apis! { fn best_finalized() -> (bp_rialto::BlockNumber, bp_rialto::Hash) { // the parachains finality pallet is never decoding parachain heads, so it is // only done in the integration code - use crate::rialto_parachain_messages::RIALTO_PARACHAIN_ID; + use bp_rialto_parachain::RIALTO_PARACHAIN_ID; let best_rialto_parachain_head = pallet_bridge_parachains::Pallet::< Runtime, WitRialtoParachainsInstance, diff --git a/bridges/bin/millau/runtime/src/rialto_messages.rs b/bridges/bin/millau/runtime/src/rialto_messages.rs index aac568e077..dad7591147 100644 --- a/bridges/bin/millau/runtime/src/rialto_messages.rs +++ b/bridges/bin/millau/runtime/src/rialto_messages.rs @@ -97,7 +97,7 @@ impl MessageBridge for WithRialtoMessageBridge { bridged_to_this_conversion_rate_override: Option, ) -> bp_millau::Balance { let conversion_rate = bridged_to_this_conversion_rate_override - .unwrap_or_else(|| RialtoToMillauConversionRate::get()); + .unwrap_or_else(RialtoToMillauConversionRate::get); bp_millau::Balance::try_from(conversion_rate.saturating_mul_int(bridged_balance)) .unwrap_or(bp_millau::Balance::MAX) } diff --git a/bridges/bin/millau/runtime/src/rialto_parachain_messages.rs b/bridges/bin/millau/runtime/src/rialto_parachain_messages.rs index c4d8f2c10c..3ac2690888 100644 --- a/bridges/bin/millau/runtime/src/rialto_parachain_messages.rs +++ b/bridges/bin/millau/runtime/src/rialto_parachain_messages.rs @@ -19,7 +19,7 @@ use crate::Runtime; use bp_messages::{ - source_chain::{SenderOrigin, TargetHeaderChain}, + source_chain::TargetHeaderChain, target_chain::{ProvedMessages, SourceHeaderChain}, InboundLaneData, LaneId, Message, MessageNonce, Parameter as MessagesParameter, }; @@ -36,13 +36,6 @@ use scale_info::TypeInfo; use sp_runtime::{traits::Saturating, FixedPointNumber, FixedU128}; use sp_std::convert::TryFrom; -/// Identifier of RialtoParachain in the Rialto relay chain. -/// -/// This identifier is not something that is declared either by Rialto or RialtoParachain. This -/// is an identifier of registration. So in theory it may be changed. But since bridge is going -/// to be deployed after parachain registration AND since parachain de-registration is highly -/// likely impossible, it is fine to declare this constant here. -pub const RIALTO_PARACHAIN_ID: u32 = 2000; /// Weight of 2 XCM instructions is for simple `Trap(42)` program, coming through bridge /// (it is prepended with `UniversalOrigin` instruction). It is used just for simplest manual /// tests, confirming that we don't break encoding somewhere between. @@ -109,7 +102,7 @@ impl MessageBridge for WithRialtoParachainMessageBridge { bridged_to_this_conversion_rate_override: Option, ) -> bp_millau::Balance { let conversion_rate = bridged_to_this_conversion_rate_override - .unwrap_or_else(|| RialtoParachainToMillauConversionRate::get()); + .unwrap_or_else(RialtoParachainToMillauConversionRate::get); bp_millau::Balance::try_from(conversion_rate.saturating_mul_int(bridged_balance)) .unwrap_or(bp_millau::Balance::MAX) } @@ -132,8 +125,8 @@ impl messages::ThisChainWithMessages for Millau { type Call = crate::Call; type Origin = crate::Origin; - fn is_message_accepted(send_origin: &Self::Origin, lane: &LaneId) -> bool { - (*lane == [0, 0, 0, 0] || *lane == [0, 0, 0, 1]) && send_origin.linked_account().is_some() + fn is_message_accepted(_send_origin: &Self::Origin, lane: &LaneId) -> bool { + *lane == [0, 0, 0, 0] || *lane == [0, 0, 0, 1] } fn maximal_pending_messages_at_outbound_lane() -> MessageNonce { @@ -260,7 +253,7 @@ impl TargetHeaderChain(ParaId(RIALTO_PARACHAIN_ID), proof) + >(ParaId(bp_rialto_parachain::RIALTO_PARACHAIN_ID), proof) } } @@ -282,7 +275,7 @@ impl SourceHeaderChain for RialtoParachain { bp_rialto_parachain::Header, Runtime, crate::WitRialtoParachainsInstance, - >(ParaId(RIALTO_PARACHAIN_ID), proof, messages_count) + >(ParaId(bp_rialto_parachain::RIALTO_PARACHAIN_ID), proof, messages_count) } } diff --git a/bridges/primitives/chain-millau/src/lib.rs b/bridges/primitives/chain-millau/src/lib.rs index d285283448..6cc417756a 100644 --- a/bridges/primitives/chain-millau/src/lib.rs +++ b/bridges/primitives/chain-millau/src/lib.rs @@ -284,9 +284,6 @@ pub const WITH_MILLAU_MESSAGES_PALLET_NAME: &str = "BridgeMillauMessages"; /// Name of the Rialto->Millau (actually DOT->KSM) conversion rate stored in the Millau runtime. pub const RIALTO_TO_MILLAU_CONVERSION_RATE_PARAMETER_NAME: &str = "RialtoToMillauConversionRate"; -/// Name of the With-Rialto parachains bridge pallet name in the Millau runtime. -pub const BRIDGE_PARAS_PALLET_NAME: &str = "BridgeRialtoParachains"; - /// Name of the `MillauFinalityApi::best_finalized` runtime method. pub const BEST_FINALIZED_MILLAU_HEADER_METHOD: &str = "MillauFinalityApi_best_finalized"; diff --git a/bridges/primitives/chain-rialto-parachain/src/lib.rs b/bridges/primitives/chain-rialto-parachain/src/lib.rs index 0ea849f6fe..598286b2f3 100644 --- a/bridges/primitives/chain-rialto-parachain/src/lib.rs +++ b/bridges/primitives/chain-rialto-parachain/src/lib.rs @@ -32,6 +32,14 @@ use sp_runtime::{ }; use sp_std::vec::Vec; +/// Identifier of RialtoParachain in the Rialto relay chain. +/// +/// This identifier is not something that is declared either by Rialto or RialtoParachain. This +/// is an identifier of registration. So in theory it may be changed. But since bridge is going +/// to be deployed after parachain registration AND since parachain de-registration is highly +/// likely impossible, it is fine to declare this constant here. +pub const RIALTO_PARACHAIN_ID: u32 = 2000; + /// Number of extra bytes (excluding size of storage value itself) of storage proof, built at /// RialtoParachain chain. This mostly depends on number of entries (and their density) in the /// storage trie. Some reserve is reserved to account future chain growth. diff --git a/bridges/primitives/chain-rialto/src/lib.rs b/bridges/primitives/chain-rialto/src/lib.rs index 56ef63f133..ca3381a25f 100644 --- a/bridges/primitives/chain-rialto/src/lib.rs +++ b/bridges/primitives/chain-rialto/src/lib.rs @@ -228,6 +228,8 @@ frame_support::parameter_types! { pub const WITH_RIALTO_GRANDPA_PALLET_NAME: &str = "BridgeRialtoGrandpa"; /// Name of the With-Rialto messages pallet instance that is deployed at bridged chains. pub const WITH_RIALTO_MESSAGES_PALLET_NAME: &str = "BridgeRialtoMessages"; +/// Name of the With-Rialto parachains bridge pallet instance that is deployed at bridged chains. +pub const WITH_RIALTO_BRIDGE_PARAS_PALLET_NAME: &str = "BridgeRialtoParachains"; /// Name of the Millau->Rialto (actually KSM->DOT) conversion rate stored in the Rialto runtime. pub const MILLAU_TO_RIALTO_CONVERSION_RATE_PARAMETER_NAME: &str = "MillauToRialtoConversionRate"; diff --git a/bridges/relays/bin-substrate/Cargo.toml b/bridges/relays/bin-substrate/Cargo.toml index 18cf0cae4a..c7204be320 100644 --- a/bridges/relays/bin-substrate/Cargo.toml +++ b/bridges/relays/bin-substrate/Cargo.toml @@ -41,6 +41,7 @@ messages-relay = { path = "../messages" } millau-runtime = { path = "../../bin/millau/runtime" } pallet-bridge-grandpa = { path = "../../modules/grandpa" } pallet-bridge-messages = { path = "../../modules/messages" } +pallet-bridge-parachains = { path = "../../modules/parachains" } parachains-relay = { path = "../parachains" } relay-kusama-client = { path = "../client-kusama" } relay-millau-client = { path = "../client-millau" } diff --git a/bridges/relays/bin-substrate/src/chains/millau.rs b/bridges/relays/bin-substrate/src/chains/millau.rs index a9e99771f5..4b31e423ac 100644 --- a/bridges/relays/bin-substrate/src/chains/millau.rs +++ b/bridges/relays/bin-substrate/src/chains/millau.rs @@ -43,6 +43,15 @@ impl CliEncodeMessage for Millau { }, ) .into(), + bridge::MILLAU_TO_RIALTO_PARACHAIN_INDEX => + millau_runtime::Call::BridgeRialtoParachainMessages( + millau_runtime::MessagesCall::send_message { + lane_id: lane, + payload, + delivery_and_dispatch_fee: fee, + }, + ) + .into(), _ => anyhow::bail!( "Unsupported target bridge pallet with instance index: {}", bridge_instance_index diff --git a/bridges/relays/bin-substrate/src/chains/rialto_parachains_to_millau.rs b/bridges/relays/bin-substrate/src/chains/rialto_parachains_to_millau.rs index dddf739029..078c24a3e3 100644 --- a/bridges/relays/bin-substrate/src/chains/rialto_parachains_to_millau.rs +++ b/bridges/relays/bin-substrate/src/chains/rialto_parachains_to_millau.rs @@ -19,7 +19,10 @@ use parachains_relay::ParachainsPipeline; use relay_millau_client::Millau; use relay_rialto_client::Rialto; -use substrate_relay_helper::parachains_target::DirectSubmitParachainHeadsCallBuilder; +use relay_rialto_parachain_client::RialtoParachain; +use substrate_relay_helper::parachains::{ + DirectSubmitParachainHeadsCallBuilder, SubstrateParachainsPipeline, +}; /// Rialto-to-Millau parachains sync description. #[derive(Clone, Debug)] @@ -30,6 +33,17 @@ impl ParachainsPipeline for RialtoParachainsToMillau { type TargetChain = Millau; } +impl SubstrateParachainsPipeline for RialtoParachainsToMillau { + type SourceParachain = RialtoParachain; + type SourceRelayChain = Rialto; + type TargetChain = Millau; + + type SubmitParachainHeadsCallBuilder = RialtoParachainsToMillauSubmitParachainHeadsCallBuilder; + type TransactionSignScheme = Millau; + + const SOURCE_PARACHAIN_PARA_ID: u32 = bp_rialto_parachain::RIALTO_PARACHAIN_ID; +} + /// `submit_parachain_heads` call builder for Rialto-to-Millau parachains sync pipeline. pub type RialtoParachainsToMillauSubmitParachainHeadsCallBuilder = DirectSubmitParachainHeadsCallBuilder< diff --git a/bridges/relays/bin-substrate/src/cli/mod.rs b/bridges/relays/bin-substrate/src/cli/mod.rs index bf2bede5d2..4748e42e3f 100644 --- a/bridges/relays/bin-substrate/src/cli/mod.rs +++ b/bridges/relays/bin-substrate/src/cli/mod.rs @@ -238,7 +238,7 @@ impl HexBytes { } /// Prometheus metrics params. -#[derive(StructOpt)] +#[derive(Clone, Debug, StructOpt)] pub struct PrometheusParams { /// Do not expose a Prometheus metric endpoint. #[structopt(long)] diff --git a/bridges/relays/bin-substrate/src/cli/relay_headers_and_messages.rs b/bridges/relays/bin-substrate/src/cli/relay_headers_and_messages.rs index 1b6bc41301..c3c0591047 100644 --- a/bridges/relays/bin-substrate/src/cli/relay_headers_and_messages.rs +++ b/bridges/relays/bin-substrate/src/cli/relay_headers_and_messages.rs @@ -26,17 +26,25 @@ use futures::{FutureExt, TryFutureExt}; use structopt::StructOpt; use strum::VariantNames; +use async_std::sync::Arc; +use bp_polkadot_core::parachains::ParaHash; use codec::Encode; use messages_relay::relay_strategy::MixStrategy; +use pallet_bridge_parachains::{RelayBlockHash, RelayBlockHasher, RelayBlockNumber}; use relay_substrate_client::{ - AccountIdOf, CallOf, Chain, ChainRuntimeVersion, Client, SignParam, TransactionSignScheme, - UnsignedTransaction, + AccountIdOf, AccountKeyPairOf, BlockNumberOf, CallOf, Chain, ChainRuntimeVersion, Client, + SignParam, TransactionSignScheme, UnsignedTransaction, }; use relay_utils::metrics::MetricsParams; use sp_core::{Bytes, Pair}; use substrate_relay_helper::{ - finality::SubstrateFinalitySyncPipeline, messages_lane::MessagesRelayParams, - on_demand_headers::OnDemandHeadersRelay, TransactionParams, + finality::SubstrateFinalitySyncPipeline, + messages_lane::MessagesRelayParams, + on_demand::{ + headers::OnDemandHeadersRelay, parachains::OnDemandParachainsRelay, OnDemandRelay, + }, + parachains::SubstrateParachainsPipeline, + TransactionParams, }; use crate::{ @@ -56,6 +64,7 @@ pub(crate) const CONVERSION_RATE_ALLOWED_DIFFERENCE_RATIO: f64 = 0.05; #[derive(StructOpt)] pub enum RelayHeadersAndMessages { MillauRialto(MillauRialtoHeadersAndMessages), + MillauRialtoParachain(MillauRialtoParachainHeadersAndMessages), RococoWococo(RococoWococoHeadersAndMessages), KusamaPolkadot(KusamaPolkadotHeadersAndMessages), } @@ -83,6 +92,33 @@ pub struct HeadersAndMessagesSharedParams { // terminology, which is unusable for both-way relays (if you're relaying headers from Rialto to // Millau and from Millau to Rialto, then which chain is source?). macro_rules! declare_bridge_options { + // chain, parachain, relay-chain-of-parachain + ($chain1:ident, $chain2:ident, $chain3:ident) => { + paste::item! { + #[doc = $chain1 ", " $chain2 " and " $chain3 " headers+messages relay params."] + #[derive(StructOpt)] + pub struct [<$chain1 $chain2 HeadersAndMessages>] { + #[structopt(flatten)] + shared: HeadersAndMessagesSharedParams, + #[structopt(flatten)] + left: [<$chain1 ConnectionParams>], + #[structopt(flatten)] + left_sign: [<$chain1 SigningParams>], + #[structopt(flatten)] + left_messages_pallet_owner: [<$chain1 MessagesPalletOwnerSigningParams>], + #[structopt(flatten)] + right: [<$chain2 ConnectionParams>], + #[structopt(flatten)] + right_sign: [<$chain2 SigningParams>], + #[structopt(flatten)] + right_messages_pallet_owner: [<$chain2 MessagesPalletOwnerSigningParams>], + #[structopt(flatten)] + right_relay: [<$chain3 ConnectionParams>], + } + } + + declare_bridge_options!({ implement }, $chain1, $chain2); + }; ($chain1:ident, $chain2:ident) => { paste::item! { #[doc = $chain1 " and " $chain2 " headers+messages relay params."] @@ -103,7 +139,12 @@ macro_rules! declare_bridge_options { #[structopt(flatten)] right_messages_pallet_owner: [<$chain2 MessagesPalletOwnerSigningParams>], } + } + declare_bridge_options!({ implement }, $chain1, $chain2); + }; + ({ implement }, $chain1:ident, $chain2:ident) => { + paste::item! { impl From for [<$chain1 $chain2 HeadersAndMessages>] { fn from(relay_params: RelayHeadersAndMessages) -> [<$chain1 $chain2 HeadersAndMessages>] { match relay_params { @@ -125,11 +166,6 @@ macro_rules! select_bridge { type Left = relay_millau_client::Millau; type Right = relay_rialto_client::Rialto; - type LeftToRightFinality = - crate::chains::millau_headers_to_rialto::MillauFinalityToRialto; - type RightToLeftFinality = - crate::chains::rialto_headers_to_millau::RialtoFinalityToMillau; - type LeftAccountIdConverter = bp_millau::AccountIdConverter; type RightAccountIdConverter = bp_rialto::AccountIdConverter; @@ -138,6 +174,106 @@ macro_rules! select_bridge { rialto_messages_to_millau::RialtoMessagesToMillau as RightToLeftMessageLane, }; + async fn start_on_demand_relays( + params: &Params, + left_client: Client, + right_client: Client, + ) -> anyhow::Result<( + Arc>>, + Arc>>, + )> { + start_on_demand_relay_to_relay::< + Left, + Right, + crate::chains::millau_headers_to_rialto::MillauFinalityToRialto, + crate::chains::rialto_headers_to_millau::RialtoFinalityToMillau, + >( + left_client, + right_client, + TransactionParams { + mortality: params.right_sign.transactions_mortality()?, + signer: params.right_sign.to_keypair::()?, + }, + TransactionParams { + mortality: params.left_sign.transactions_mortality()?, + signer: params.left_sign.to_keypair::()?, + }, + params.shared.only_mandatory_headers, + params.shared.only_mandatory_headers, + params.left.can_start_version_guard(), + params.right.can_start_version_guard(), + ).await + } + + async fn left_create_account( + _left_client: Client, + _left_sign: ::AccountKeyPair, + _account_id: AccountIdOf, + ) -> anyhow::Result<()> { + Err(anyhow::format_err!("Account creation is not supported by this bridge")) + } + + async fn right_create_account( + _right_client: Client, + _right_sign: ::AccountKeyPair, + _account_id: AccountIdOf, + ) -> anyhow::Result<()> { + Err(anyhow::format_err!("Account creation is not supported by this bridge")) + } + + $generic + }, + RelayHeadersAndMessages::MillauRialtoParachain(_) => { + type Params = MillauRialtoParachainHeadersAndMessages; + + type Left = relay_millau_client::Millau; + type Right = relay_rialto_parachain_client::RialtoParachain; + + type LeftAccountIdConverter = bp_millau::AccountIdConverter; + type RightAccountIdConverter = bp_rialto_parachain::AccountIdConverter; + + use crate::chains::{ + millau_messages_to_rialto_parachain::MillauMessagesToRialtoParachain as LeftToRightMessageLane, + rialto_parachain_messages_to_millau::RialtoParachainMessagesToMillau as RightToLeftMessageLane, + }; + + async fn start_on_demand_relays( + params: &Params, + left_client: Client, + right_client: Client, + ) -> anyhow::Result<( + Arc>>, + Arc>>, + )> { + type RightRelayChain = relay_rialto_client::Rialto; + let rialto_relay_chain_client = params.right_relay.to_client::().await?; // TODO: should be the relaychain connection params + + start_on_demand_relay_to_parachain::< + Left, + Right, + RightRelayChain, + crate::chains::millau_headers_to_rialto_parachain::MillauFinalityToRialtoParachain, + crate::chains::rialto_headers_to_millau::RialtoFinalityToMillau, + crate::chains::rialto_parachains_to_millau::RialtoParachainsToMillau, + >( + left_client, + right_client, + rialto_relay_chain_client, + TransactionParams { + mortality: params.right_sign.transactions_mortality()?, + signer: params.right_sign.to_keypair::()?, + }, + TransactionParams { + mortality: params.left_sign.transactions_mortality()?, + signer: params.left_sign.to_keypair::()?, + }, + params.shared.only_mandatory_headers, + params.shared.only_mandatory_headers, + params.left.can_start_version_guard(), + params.right.can_start_version_guard(), + ).await + } + async fn left_create_account( _left_client: Client, _left_sign: ::AccountKeyPair, @@ -162,11 +298,6 @@ macro_rules! select_bridge { type Left = relay_rococo_client::Rococo; type Right = relay_wococo_client::Wococo; - type LeftToRightFinality = - crate::chains::rococo_headers_to_wococo::RococoFinalityToWococo; - type RightToLeftFinality = - crate::chains::wococo_headers_to_rococo::WococoFinalityToRococo; - type LeftAccountIdConverter = bp_rococo::AccountIdConverter; type RightAccountIdConverter = bp_wococo::AccountIdConverter; @@ -175,6 +306,37 @@ macro_rules! select_bridge { wococo_messages_to_rococo::WococoMessagesToRococo as RightToLeftMessageLane, }; + async fn start_on_demand_relays( + params: &Params, + left_client: Client, + right_client: Client, + ) -> anyhow::Result<( + Arc>>, + Arc>>, + )> { + start_on_demand_relay_to_relay::< + Left, + Right, + crate::chains::rococo_headers_to_wococo::RococoFinalityToWococo, + crate::chains::wococo_headers_to_rococo::WococoFinalityToRococo, + >( + left_client, + right_client, + TransactionParams { + mortality: params.right_sign.transactions_mortality()?, + signer: params.right_sign.to_keypair::()?, + }, + TransactionParams { + mortality: params.left_sign.transactions_mortality()?, + signer: params.left_sign.to_keypair::()?, + }, + params.shared.only_mandatory_headers, + params.shared.only_mandatory_headers, + params.left.can_start_version_guard(), + params.right.can_start_version_guard(), + ).await + } + async fn left_create_account( left_client: Client, left_sign: ::AccountKeyPair, @@ -219,11 +381,6 @@ macro_rules! select_bridge { type Left = relay_kusama_client::Kusama; type Right = relay_polkadot_client::Polkadot; - type LeftToRightFinality = - crate::chains::kusama_headers_to_polkadot::KusamaFinalityToPolkadot; - type RightToLeftFinality = - crate::chains::polkadot_headers_to_kusama::PolkadotFinalityToKusama; - type LeftAccountIdConverter = bp_kusama::AccountIdConverter; type RightAccountIdConverter = bp_polkadot::AccountIdConverter; @@ -232,6 +389,37 @@ macro_rules! select_bridge { polkadot_messages_to_kusama::PolkadotMessagesToKusama as RightToLeftMessageLane, }; + async fn start_on_demand_relays( + params: &Params, + left_client: Client, + right_client: Client, + ) -> anyhow::Result<( + Arc>>, + Arc>>, + )> { + start_on_demand_relay_to_relay::< + Left, + Right, + crate::chains::kusama_headers_to_polkadot::KusamaFinalityToPolkadot, + crate::chains::polkadot_headers_to_kusama::PolkadotFinalityToKusama, + >( + left_client, + right_client, + TransactionParams { + mortality: params.right_sign.transactions_mortality()?, + signer: params.right_sign.to_keypair::()?, + }, + TransactionParams { + mortality: params.left_sign.transactions_mortality()?, + signer: params.left_sign.to_keypair::()?, + }, + params.shared.only_mandatory_headers, + params.shared.only_mandatory_headers, + params.left.can_start_version_guard(), + params.right.can_start_version_guard(), + ).await + } + async fn left_create_account( left_client: Client, left_sign: ::AccountKeyPair, @@ -277,12 +465,14 @@ macro_rules! select_bridge { // All supported chains. declare_chain_options!(Millau, millau); declare_chain_options!(Rialto, rialto); +declare_chain_options!(RialtoParachain, rialto_parachain); declare_chain_options!(Rococo, rococo); declare_chain_options!(Wococo, wococo); declare_chain_options!(Kusama, kusama); declare_chain_options!(Polkadot, polkadot); // All supported bridges. declare_bridge_options!(Millau, Rialto); +declare_bridge_options!(Millau, RialtoParachain, Rialto); declare_bridge_options!(Rococo, Wococo); declare_bridge_options!(Kusama, Polkadot); @@ -303,12 +493,12 @@ impl RelayHeadersAndMessages { let right_messages_pallet_owner = params.right_messages_pallet_owner.to_keypair::()?; - let lanes = params.shared.lane; + let lanes = params.shared.lane.clone(); let relayer_mode = params.shared.relayer_mode.into(); let relay_strategy = MixStrategy::new(relayer_mode); // create metrics registry and register standalone metrics - let metrics_params: MetricsParams = params.shared.prometheus_params.into(); + let metrics_params: MetricsParams = params.shared.prometheus_params.clone().into(); let metrics_params = relay_utils::relay_metrics(metrics_params).into_params(); let left_to_right_metrics = substrate_relay_helper::messages_metrics::standalone_metrics::< @@ -448,38 +638,8 @@ impl RelayHeadersAndMessages { .await?; // start on-demand header relays - let left_to_right_transaction_params = TransactionParams { - mortality: right_transactions_mortality, - signer: right_sign.clone(), - }; - let right_to_left_transaction_params = TransactionParams { - mortality: left_transactions_mortality, - signer: left_sign.clone(), - }; - LeftToRightFinality::start_relay_guards( - &right_client, - &left_to_right_transaction_params, - params.right.can_start_version_guard(), - ) - .await?; - RightToLeftFinality::start_relay_guards( - &left_client, - &right_to_left_transaction_params, - params.left.can_start_version_guard(), - ) - .await?; - let left_to_right_on_demand_headers = OnDemandHeadersRelay::new::( - left_client.clone(), - right_client.clone(), - left_to_right_transaction_params, - params.shared.only_mandatory_headers, - ); - let right_to_left_on_demand_headers = OnDemandHeadersRelay::new::( - right_client.clone(), - left_client.clone(), - right_to_left_transaction_params, - params.shared.only_mandatory_headers, - ); + let (left_to_right_on_demand_headers, right_to_left_on_demand_headers) = + start_on_demand_relays(¶ms, left_client.clone(), right_client.clone()).await?; // Need 2x capacity since we consider both directions for each lane let mut message_relays = Vec::with_capacity(lanes.len() * 2); for lane in lanes { @@ -543,6 +703,137 @@ impl RelayHeadersAndMessages { } } +/// Start bidirectional on-demand headers <> headers relay. +async fn start_on_demand_relay_to_relay( + left_client: Client, + right_client: Client, + left_to_right_transaction_params: TransactionParams>, + right_to_left_transaction_params: TransactionParams>, + left_to_right_only_mandatory_headers: bool, + right_to_left_only_mandatory_headers: bool, + left_can_start_version_guard: bool, + right_can_start_version_guard: bool, +) -> anyhow::Result<( + Arc>>, + Arc>>, +)> +where + LC: Chain + TransactionSignScheme, + RC: Chain + TransactionSignScheme, + LR: SubstrateFinalitySyncPipeline< + SourceChain = LC, + TargetChain = RC, + TransactionSignScheme = RC, + >, + RL: SubstrateFinalitySyncPipeline< + SourceChain = RC, + TargetChain = LC, + TransactionSignScheme = LC, + >, + AccountIdOf: From<<::AccountKeyPair as Pair>::Public>, + AccountIdOf: From<<::AccountKeyPair as Pair>::Public>, +{ + LR::start_relay_guards( + &right_client, + &left_to_right_transaction_params, + right_can_start_version_guard, + ) + .await?; + RL::start_relay_guards( + &left_client, + &right_to_left_transaction_params, + left_can_start_version_guard, + ) + .await?; + let left_to_right_on_demand_headers = OnDemandHeadersRelay::new::( + left_client.clone(), + right_client.clone(), + left_to_right_transaction_params, + left_to_right_only_mandatory_headers, + ); + let right_to_left_on_demand_headers = OnDemandHeadersRelay::new::( + right_client.clone(), + left_client.clone(), + right_to_left_transaction_params, + right_to_left_only_mandatory_headers, + ); + + Ok((Arc::new(left_to_right_on_demand_headers), Arc::new(right_to_left_on_demand_headers))) +} + +/// Start bidirectional on-demand headers <> parachains relay. +async fn start_on_demand_relay_to_parachain( + left_client: Client, + right_client: Client, + right_relay_client: Client, + left_to_right_transaction_params: TransactionParams>, + right_to_left_transaction_params: TransactionParams>, + left_to_right_only_mandatory_headers: bool, + right_to_left_only_mandatory_headers: bool, + left_can_start_version_guard: bool, + right_can_start_version_guard: bool, +) -> anyhow::Result<( + Arc>>, + Arc>>, +)> +where + LC: Chain + TransactionSignScheme, + RC: Chain + TransactionSignScheme, + RRC: Chain + + TransactionSignScheme, + LR: SubstrateFinalitySyncPipeline< + SourceChain = LC, + TargetChain = RC, + TransactionSignScheme = RC, + >, + RRF: SubstrateFinalitySyncPipeline< + SourceChain = RRC, + TargetChain = LC, + TransactionSignScheme = LC, + >, + RL: SubstrateParachainsPipeline< + SourceRelayChain = RRC, + SourceParachain = RC, + TargetChain = LC, + TransactionSignScheme = LC, + >, + AccountIdOf: From<<::AccountKeyPair as Pair>::Public>, + AccountIdOf: From<<::AccountKeyPair as Pair>::Public>, +{ + LR::start_relay_guards( + &right_client, + &left_to_right_transaction_params, + right_can_start_version_guard, + ) + .await?; + RRF::start_relay_guards( + &left_client, + &right_to_left_transaction_params, + left_can_start_version_guard, + ) + .await?; + let left_to_right_on_demand_headers = OnDemandHeadersRelay::new::( + left_client.clone(), + right_client, + left_to_right_transaction_params, + left_to_right_only_mandatory_headers, + ); + let right_relay_to_left_on_demand_headers = OnDemandHeadersRelay::new::( + right_relay_client.clone(), + left_client.clone(), + right_to_left_transaction_params.clone(), + right_to_left_only_mandatory_headers, + ); + let right_to_left_on_demand_parachains = OnDemandParachainsRelay::new::( + right_relay_client, + left_client, + right_to_left_transaction_params, + Arc::new(right_relay_to_left_on_demand_headers), + ); + + Ok((Arc::new(left_to_right_on_demand_headers), Arc::new(right_to_left_on_demand_parachains))) +} + /// Sign and submit transaction with given call to the chain. async fn submit_signed_extrinsic>( client: Client, diff --git a/bridges/relays/bin-substrate/src/cli/relay_parachains.rs b/bridges/relays/bin-substrate/src/cli/relay_parachains.rs index 23ff1da642..8667ff7416 100644 --- a/bridges/relays/bin-substrate/src/cli/relay_parachains.rs +++ b/bridges/relays/bin-substrate/src/cli/relay_parachains.rs @@ -20,7 +20,8 @@ use relay_utils::metrics::{GlobalMetrics, StandaloneMetric}; use structopt::StructOpt; use strum::{EnumString, EnumVariantNames, VariantNames}; use substrate_relay_helper::{ - parachains_source::ParachainsSource, parachains_target::ParachainsTarget, TransactionParams, + parachains::{source::ParachainsSource, target::ParachainsTarget}, + TransactionParams, }; use crate::cli::{ @@ -54,15 +55,7 @@ macro_rules! select_bridge { ($bridge: expr, $generic: tt) => { match $bridge { RelayParachainsBridge::RialtoToMillau => { - use crate::chains::rialto_parachains_to_millau::{ - RialtoParachainsToMillau as Pipeline, - RialtoParachainsToMillauSubmitParachainHeadsCallBuilder as SubmitParachainHeadsCallBuilder, - }; - - use bp_millau::BRIDGE_PARAS_PALLET_NAME as BRIDGE_PARAS_PALLET_NAME_AT_TARGET; - use bp_rialto::PARAS_PALLET_NAME as PARAS_PALLET_NAME_AT_SOURCE; - - use relay_millau_client::Millau as TargetTransactionSignScheme; + use crate::chains::rialto_parachains_to_millau::RialtoParachainsToMillau as Pipeline; $generic }, @@ -78,25 +71,15 @@ impl RelayParachains { type TargetChain = ::TargetChain; let source_client = self.source.to_client::().await?; - let source_client = ParachainsSource::::new( - source_client, - PARAS_PALLET_NAME_AT_SOURCE.into(), - ); + let source_client = ParachainsSource::::new(source_client, None); let taret_transaction_params = TransactionParams { signer: self.target_sign.to_keypair::()?, mortality: self.target_sign.target_transactions_mortality, }; let target_client = self.target.to_client::().await?; - let target_client = ParachainsTarget::< - Pipeline, - TargetTransactionSignScheme, - SubmitParachainHeadsCallBuilder, - >::new( - target_client.clone(), - taret_transaction_params, - BRIDGE_PARAS_PALLET_NAME_AT_TARGET.into(), - ); + let target_client = + ParachainsTarget::::new(target_client.clone(), taret_transaction_params); let metrics_params: relay_utils::metrics::MetricsParams = self.prometheus_params.into(); GlobalMetrics::new()?.register_and_spawn(&metrics_params.registry)?; diff --git a/bridges/relays/client-rialto/src/lib.rs b/bridges/relays/client-rialto/src/lib.rs index 858227e808..e79556b966 100644 --- a/bridges/relays/client-rialto/src/lib.rs +++ b/bridges/relays/client-rialto/src/lib.rs @@ -21,7 +21,8 @@ use codec::{Compact, Decode, Encode}; use frame_support::weights::Weight; use relay_substrate_client::{ BalanceOf, Chain, ChainBase, ChainWithBalances, ChainWithGrandpa, ChainWithMessages, - Error as SubstrateError, IndexOf, SignParam, TransactionSignScheme, UnsignedTransaction, + Error as SubstrateError, IndexOf, RelayChain, SignParam, TransactionSignScheme, + UnsignedTransaction, }; use sp_core::{storage::StorageKey, Pair}; use sp_runtime::{generic::SignedPayload, traits::IdentifyAccount}; @@ -69,6 +70,12 @@ impl Chain for Rialto { type WeightToFee = bp_rialto::WeightToFee; } +impl RelayChain for Rialto { + const PARAS_PALLET_NAME: &'static str = bp_rialto::PARAS_PALLET_NAME; + const PARACHAINS_FINALITY_PALLET_NAME: &'static str = + bp_rialto::WITH_RIALTO_BRIDGE_PARAS_PALLET_NAME; +} + impl ChainWithGrandpa for Rialto { const WITH_CHAIN_GRANDPA_PALLET_NAME: &'static str = bp_rialto::WITH_RIALTO_GRANDPA_PALLET_NAME; } diff --git a/bridges/relays/client-substrate/src/chain.rs b/bridges/relays/client-substrate/src/chain.rs index a55fa64d85..f4a2fcf262 100644 --- a/bridges/relays/client-substrate/src/chain.rs +++ b/bridges/relays/client-substrate/src/chain.rs @@ -64,6 +64,20 @@ pub trait Chain: ChainBase + Clone { type WeightToFee: WeightToFeePolynomial; } +/// Substrate-based relay chain that supports parachains. +/// +/// We assume that the parachains are supported using `runtime_parachains::paras` pallet. +pub trait RelayChain: Chain { + /// Name of the `runtime_parachains::paras` pallet in the runtime of this chain. + const PARAS_PALLET_NAME: &'static str; + /// Name of the bridge parachains pallet (used in `construct_runtime` macro call) that is + /// deployed at the **bridged** chain. + /// + /// We assume that all chains that are bridging with this `ChainWithGrandpa` are using + /// the same name. + const PARACHAINS_FINALITY_PALLET_NAME: &'static str; +} + /// Substrate-based chain that is using direct GRANDPA finality from minimal relay-client point of /// view. /// diff --git a/bridges/relays/client-substrate/src/client.rs b/bridges/relays/client-substrate/src/client.rs index a0426b99f9..0f9445a8d3 100644 --- a/bridges/relays/client-substrate/src/client.rs +++ b/bridges/relays/client-substrate/src/client.rs @@ -262,6 +262,11 @@ impl Client { Ok(*self.header_by_hash(self.best_finalized_header_hash().await?).await?.number()) } + /// Return header of the best finalized block. + pub async fn best_finalized_header(&self) -> Result { + self.header_by_hash(self.best_finalized_header_hash().await?).await + } + /// Returns the best Substrate header. pub async fn best_header(&self) -> Result where diff --git a/bridges/relays/client-substrate/src/lib.rs b/bridges/relays/client-substrate/src/lib.rs index d6cab3dd91..0234459f9d 100644 --- a/bridges/relays/client-substrate/src/lib.rs +++ b/bridges/relays/client-substrate/src/lib.rs @@ -33,8 +33,8 @@ use std::time::Duration; pub use crate::{ chain::{ AccountKeyPairOf, BlockWithJustification, CallOf, Chain, ChainWithBalances, - ChainWithGrandpa, ChainWithMessages, SignParam, TransactionSignScheme, TransactionStatusOf, - UnsignedTransaction, WeightToFeeOf, + ChainWithGrandpa, ChainWithMessages, RelayChain, SignParam, TransactionSignScheme, + TransactionStatusOf, UnsignedTransaction, WeightToFeeOf, }, client::{ChainRuntimeVersion, Client, OpaqueGrandpaAuthoritiesSet, Subscription}, error::{Error, Result}, diff --git a/bridges/relays/lib-substrate-relay/src/lib.rs b/bridges/relays/lib-substrate-relay/src/lib.rs index 374ab91516..73dedb3a33 100644 --- a/bridges/relays/lib-substrate-relay/src/lib.rs +++ b/bridges/relays/lib-substrate-relay/src/lib.rs @@ -28,9 +28,8 @@ pub mod messages_lane; pub mod messages_metrics; pub mod messages_source; pub mod messages_target; -pub mod on_demand_headers; -pub mod parachains_source; -pub mod parachains_target; +pub mod on_demand; +pub mod parachains; /// Default relay loop stall timeout. If transactions generated by relay are immortal, then /// this timeout is used. diff --git a/bridges/relays/lib-substrate-relay/src/messages_lane.rs b/bridges/relays/lib-substrate-relay/src/messages_lane.rs index fadf5e6224..9d6a6f4862 100644 --- a/bridges/relays/lib-substrate-relay/src/messages_lane.rs +++ b/bridges/relays/lib-substrate-relay/src/messages_lane.rs @@ -21,10 +21,11 @@ use crate::{ messages_metrics::StandaloneMessagesMetrics, messages_source::{SubstrateMessagesProof, SubstrateMessagesSource}, messages_target::{SubstrateMessagesDeliveryProof, SubstrateMessagesTarget}, - on_demand_headers::OnDemandHeadersRelay, + on_demand::OnDemandRelay, TransactionParams, STALL_TIMEOUT, }; +use async_std::sync::Arc; use bp_messages::{LaneId, MessageNonce}; use bp_runtime::{AccountIdOf, Chain as _}; use bridge_runtime_common::messages::{ @@ -135,9 +136,11 @@ pub struct MessagesRelayParams { pub target_transaction_params: TransactionParams>, /// Optional on-demand source to target headers relay. - pub source_to_target_headers_relay: Option>, + pub source_to_target_headers_relay: + Option>>>, /// Optional on-demand target to source headers relay. - pub target_to_source_headers_relay: Option>, + pub target_to_source_headers_relay: + Option>>>, /// Identifier of lane that needs to be served. pub lane_id: LaneId, /// Metrics parameters. diff --git a/bridges/relays/lib-substrate-relay/src/messages_source.rs b/bridges/relays/lib-substrate-relay/src/messages_source.rs index 77dd2aed05..9c447c6b83 100644 --- a/bridges/relays/lib-substrate-relay/src/messages_source.rs +++ b/bridges/relays/lib-substrate-relay/src/messages_source.rs @@ -23,10 +23,11 @@ use crate::{ MessageLaneAdapter, ReceiveMessagesDeliveryProofCallBuilder, SubstrateMessageLane, }, messages_target::SubstrateMessagesDeliveryProof, - on_demand_headers::OnDemandHeadersRelay, + on_demand::OnDemandRelay, TransactionParams, }; +use async_std::sync::Arc; use async_trait::async_trait; use bp_messages::{ storage_keys::{operating_mode_key, outbound_lane_data_key}, @@ -66,7 +67,7 @@ pub struct SubstrateMessagesSource { target_client: Client, lane_id: LaneId, transaction_params: TransactionParams>, - target_to_source_headers_relay: Option>, + target_to_source_headers_relay: Option>>>, } impl SubstrateMessagesSource

{ @@ -76,7 +77,9 @@ impl SubstrateMessagesSource

{ target_client: Client, lane_id: LaneId, transaction_params: TransactionParams>, - target_to_source_headers_relay: Option>, + target_to_source_headers_relay: Option< + Arc>>, + >, ) -> Self { SubstrateMessagesSource { source_client, @@ -282,7 +285,7 @@ where async fn require_target_header_on_source(&self, id: TargetHeaderIdOf>) { if let Some(ref target_to_source_headers_relay) = self.target_to_source_headers_relay { - target_to_source_headers_relay.require_finalized_header(id).await; + target_to_source_headers_relay.require_more_headers(id.0).await; } } @@ -424,18 +427,13 @@ where let self_best_id = HeaderId(*self_best_header.number(), self_best_hash); // now let's read id of best finalized peer header at our best finalized block - let encoded_best_finalized_peer_on_self = self_client - .state_call( - best_finalized_header_id_method_name.into(), - Bytes(Vec::new()), - Some(self_best_hash), + let peer_on_self_best_finalized_id = + best_finalized_peer_header_at_self::( + self_client, + self_best_hash, + best_finalized_header_id_method_name, ) .await?; - let decoded_best_finalized_peer_on_self: (BlockNumberOf, HashOf) = - Decode::decode(&mut &encoded_best_finalized_peer_on_self.0[..]) - .map_err(SubstrateError::ResponseParseFailed)?; - let peer_on_self_best_finalized_id = - HeaderId(decoded_best_finalized_peer_on_self.0, decoded_best_finalized_peer_on_self.1); // read actual header, matching the `peer_on_self_best_finalized_id` from the peer chain let actual_peer_on_self_best_finalized_id = match peer_client { @@ -455,6 +453,35 @@ where }) } +/// Reads best `PeerChain` header known to the `SelfChain` using provided runtime API method. +/// +/// Method is supposed to be the `FinalityApi::best_finalized()` method. +pub async fn best_finalized_peer_header_at_self( + self_client: &Client, + at_self_hash: HashOf, + best_finalized_header_id_method_name: &str, +) -> Result, SubstrateError> +where + SelfChain: Chain, + PeerChain: Chain, +{ + // now let's read id of best finalized peer header at our best finalized block + let encoded_best_finalized_peer_on_self = self_client + .state_call( + best_finalized_header_id_method_name.into(), + Bytes(Vec::new()), + Some(at_self_hash), + ) + .await?; + let decoded_best_finalized_peer_on_self: (BlockNumberOf, HashOf) = + Decode::decode(&mut &encoded_best_finalized_peer_on_self.0[..]) + .map_err(SubstrateError::ResponseParseFailed)?; + let peer_on_self_best_finalized_id = + HeaderId(decoded_best_finalized_peer_on_self.0, decoded_best_finalized_peer_on_self.1); + + Ok(peer_on_self_best_finalized_id) +} + fn make_message_details_map( weights: Vec>, nonces: RangeInclusive, diff --git a/bridges/relays/lib-substrate-relay/src/messages_target.rs b/bridges/relays/lib-substrate-relay/src/messages_target.rs index 687d5163cb..08604e66b7 100644 --- a/bridges/relays/lib-substrate-relay/src/messages_target.rs +++ b/bridges/relays/lib-substrate-relay/src/messages_target.rs @@ -22,10 +22,11 @@ use crate::{ messages_lane::{MessageLaneAdapter, ReceiveMessagesProofCallBuilder, SubstrateMessageLane}, messages_metrics::StandaloneMessagesMetrics, messages_source::{ensure_messages_pallet_active, read_client_state, SubstrateMessagesProof}, - on_demand_headers::OnDemandHeadersRelay, + on_demand::OnDemandRelay, TransactionParams, }; +use async_std::sync::Arc; use async_trait::async_trait; use bp_messages::{ storage_keys::inbound_lane_data_key, total_unrewarded_messages, InboundLaneData, LaneId, @@ -42,7 +43,7 @@ use messages_relay::{ }; use num_traits::{Bounded, Zero}; use relay_substrate_client::{ - AccountIdOf, AccountKeyPairOf, BalanceOf, Chain, ChainWithMessages, Client, + AccountIdOf, AccountKeyPairOf, BalanceOf, BlockNumberOf, Chain, ChainWithMessages, Client, Error as SubstrateError, HashOf, HeaderIdOf, IndexOf, SignParam, TransactionEra, TransactionSignScheme, UnsignedTransaction, WeightToFeeOf, }; @@ -63,7 +64,7 @@ pub struct SubstrateMessagesTarget { relayer_id_at_source: AccountIdOf, transaction_params: TransactionParams>, metric_values: StandaloneMessagesMetrics, - source_to_target_headers_relay: Option>, + source_to_target_headers_relay: Option>>>, } impl SubstrateMessagesTarget

{ @@ -75,7 +76,9 @@ impl SubstrateMessagesTarget

{ relayer_id_at_source: AccountIdOf, transaction_params: TransactionParams>, metric_values: StandaloneMessagesMetrics, - source_to_target_headers_relay: Option>, + source_to_target_headers_relay: Option< + Arc>>, + >, ) -> Self { SubstrateMessagesTarget { target_client, @@ -269,7 +272,7 @@ where async fn require_source_header_on_target(&self, id: SourceHeaderIdOf>) { if let Some(ref source_to_target_headers_relay) = self.source_to_target_headers_relay { - source_to_target_headers_relay.require_finalized_header(id).await; + source_to_target_headers_relay.require_more_headers(id.0).await; } } diff --git a/bridges/relays/lib-substrate-relay/src/on_demand_headers.rs b/bridges/relays/lib-substrate-relay/src/on_demand/headers.rs similarity index 96% rename from bridges/relays/lib-substrate-relay/src/on_demand_headers.rs rename to bridges/relays/lib-substrate-relay/src/on_demand/headers.rs index 915e04f087..3d007de4db 100644 --- a/bridges/relays/lib-substrate-relay/src/on_demand_headers.rs +++ b/bridges/relays/lib-substrate-relay/src/on_demand/headers.rs @@ -14,15 +14,16 @@ // You should have received a copy of the GNU General Public License // along with Parity Bridges Common. If not, see . -//! On-demand Substrate -> Substrate headers relay. +//! On-demand Substrate -> Substrate header finality relay. use async_std::sync::{Arc, Mutex}; +use async_trait::async_trait; use futures::{select, FutureExt}; use num_traits::{One, Zero}; use finality_relay::{FinalitySyncParams, SourceHeader, TargetClient as FinalityTargetClient}; use relay_substrate_client::{ - AccountIdOf, AccountKeyPairOf, BlockNumberOf, Chain, Client, HeaderIdOf, HeaderOf, SyncHeader, + AccountIdOf, AccountKeyPairOf, BlockNumberOf, Chain, Client, HeaderOf, SyncHeader, TransactionSignScheme, }; use relay_utils::{ @@ -35,10 +36,11 @@ use crate::{ target::SubstrateFinalityTarget, SubstrateFinalitySyncPipeline, RECENT_FINALITY_PROOFS_LIMIT, }, + on_demand::OnDemandRelay, TransactionParams, STALL_TIMEOUT, }; -/// On-demand Substrate <-> Substrate headers relay. +/// On-demand Substrate <-> Substrate header finality relay. /// /// This relay may be requested to sync more headers, whenever some other relay (e.g. messages /// relay) needs it to continue its regular work. When enough headers are relayed, on-demand stops @@ -82,20 +84,24 @@ impl OnDemandHeadersRelay { this } +} - /// Someone is asking us to relay given finalized header. - pub async fn require_finalized_header(&self, header_id: HeaderIdOf) { +#[async_trait] +impl OnDemandRelay> + for OnDemandHeadersRelay +{ + async fn require_more_headers(&self, required_header: BlockNumberOf) { let mut required_header_number = self.required_header_number.lock().await; - if header_id.0 > *required_header_number { + if required_header > *required_header_number { log::trace!( target: "bridge", "More {} headers required in {} relay. Going to sync up to the {}", SourceChain::NAME, self.relay_task_name, - header_id.0, + required_header, ); - *required_header_number = header_id.0; + *required_header_number = required_header; } } } diff --git a/bridges/relays/lib-substrate-relay/src/on_demand/mod.rs b/bridges/relays/lib-substrate-relay/src/on_demand/mod.rs new file mode 100644 index 0000000000..7a2dfc9c15 --- /dev/null +++ b/bridges/relays/lib-substrate-relay/src/on_demand/mod.rs @@ -0,0 +1,35 @@ +// Copyright 2019-2021 Parity Technologies (UK) Ltd. +// This file is part of Parity Bridges Common. + +// Parity Bridges Common is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity Bridges Common is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity Bridges Common. If not, see . + +//! Types and functions intended to ease adding of new Substrate -> Substrate +//! on-demand pipelines. + +use async_trait::async_trait; + +pub mod headers; +pub mod parachains; + +/// On-demand headers relay that is relaying finalizing headers only when requested. +#[async_trait] +pub trait OnDemandRelay: Send + Sync { + /// Ask relay to relay source header with given number to the target chain. + /// + /// Depending on implementation, on-demand relay may also relay `required_header` ancestors + /// (e.g. if they're mandatory), or its descendants. The request is considered complete if + /// the best avbailable header at the target chain has number that is larger than or equal + /// to the `required_header`. + async fn require_more_headers(&self, required_header: SourceHeaderNumber); +} diff --git a/bridges/relays/lib-substrate-relay/src/on_demand/parachains.rs b/bridges/relays/lib-substrate-relay/src/on_demand/parachains.rs new file mode 100644 index 0000000000..8f1bee3520 --- /dev/null +++ b/bridges/relays/lib-substrate-relay/src/on_demand/parachains.rs @@ -0,0 +1,797 @@ +// Copyright 2019-2021 Parity Technologies (UK) Ltd. +// This file is part of Parity Bridges Common. + +// Parity Bridges Common is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity Bridges Common is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity Bridges Common. If not, see . + +//! On-demand Substrate -> Substrate parachain finality relay. + +use crate::{ + messages_source::best_finalized_peer_header_at_self, + on_demand::OnDemandRelay, + parachains::{ + source::ParachainsSource, target::ParachainsTarget, ParachainsPipelineAdapter, + SubstrateParachainsPipeline, + }, + TransactionParams, +}; + +use async_std::{ + channel::{unbounded, Receiver, Sender}, + sync::{Arc, Mutex}, +}; +use async_trait::async_trait; +use bp_polkadot_core::parachains::ParaHash; +use futures::{select, FutureExt}; +use num_traits::Zero; +use pallet_bridge_parachains::{RelayBlockHash, RelayBlockHasher, RelayBlockNumber}; +use parachains_relay::parachains_loop::{ParachainSyncParams, TargetClient}; +use relay_substrate_client::{ + AccountIdOf, AccountKeyPairOf, BlockNumberOf, Chain, Client, Error as SubstrateError, + TransactionSignScheme, +}; +use relay_utils::{ + metrics::MetricsParams, relay_loop::Client as RelayClient, FailedClient, HeaderId, +}; +use sp_runtime::traits::Header as HeaderT; +use std::{cmp::Ordering, collections::BTreeMap}; + +/// On-demand Substrate <-> Substrate parachain finality relay. +/// +/// This relay may be requested to sync more parachain headers, whenever some other relay +/// (e.g. messages relay) needs it to continue its regular work. When enough parachain headers +/// are relayed, on-demand stops syncing headers. +#[derive(Clone)] +pub struct OnDemandParachainsRelay { + /// Relay task name. + relay_task_name: String, + /// Channel used to communicate with background task and ask for relay of parachain heads. + required_header_number_sender: Sender>, +} + +impl OnDemandParachainsRelay { + /// Create new on-demand parachains relay. + /// + /// Note that the argument is the source relay chain client, not the parachain client. + /// That's because parachain finality is determined by the relay chain and we don't + /// need to connect to the parachain itself here. + pub fn new>( + source_relay_client: Client, + target_client: Client, + target_transaction_params: TransactionParams>, + on_demand_source_relay_to_target_headers: Arc< + dyn OnDemandRelay>, + >, + ) -> Self + where + P::SourceParachain: Chain, + P::SourceRelayChain: + Chain, + AccountIdOf: + From< as sp_core::Pair>::Public>, + P::TransactionSignScheme: TransactionSignScheme, + { + let (required_header_number_sender, required_header_number_receiver) = unbounded(); + let this = OnDemandParachainsRelay { + relay_task_name: on_demand_parachains_relay_name::(), + required_header_number_sender, + }; + async_std::task::spawn(async move { + background_task::

( + source_relay_client, + target_client, + target_transaction_params, + on_demand_source_relay_to_target_headers, + required_header_number_receiver, + ) + .await; + }); + + this + } +} + +#[async_trait] +impl OnDemandRelay> + for OnDemandParachainsRelay +where + SourceParachain: Chain, +{ + async fn require_more_headers(&self, required_header: BlockNumberOf) { + if let Err(e) = self.required_header_number_sender.send(required_header).await { + log::trace!( + target: "bridge", + "Failed to request {} header {:?} in {:?}: {:?}", + SourceParachain::NAME, + required_header, + self.relay_task_name, + e, + ); + } + } +} + +/// Background task that is responsible for starting parachain headers relay. +async fn background_task( + source_relay_client: Client, + target_client: Client, + target_transaction_params: TransactionParams>, + on_demand_source_relay_to_target_headers: Arc< + dyn OnDemandRelay>, + >, + required_parachain_header_number_receiver: Receiver>, +) where + P::SourceParachain: Chain, + P::SourceRelayChain: + Chain, + AccountIdOf: + From< as sp_core::Pair>::Public>, + P::TransactionSignScheme: TransactionSignScheme, +{ + let relay_task_name = on_demand_parachains_relay_name::(); + let target_transactions_mortality = target_transaction_params.mortality; + + let mut relay_state = RelayState::Idle; + let mut headers_map_cache = BTreeMap::new(); + let mut required_parachain_header_number = Zero::zero(); + let required_para_header_number_ref = Arc::new(Mutex::new(required_parachain_header_number)); + + let mut restart_relay = true; + let parachains_relay_task = futures::future::Fuse::terminated(); + futures::pin_mut!(parachains_relay_task); + + let mut parachains_source = ParachainsSource::

::new( + source_relay_client.clone(), + Some(required_para_header_number_ref.clone()), + ); + let mut parachains_target = + ParachainsTarget::

::new(target_client.clone(), target_transaction_params.clone()); + + loop { + select! { + new_required_parachain_header_number = required_parachain_header_number_receiver.recv().fuse() => { + let new_required_parachain_header_number = match new_required_parachain_header_number { + Ok(new_required_parachain_header_number) => new_required_parachain_header_number, + Err(e) => { + log::error!( + target: "bridge", + "Background task of {} has exited with error: {:?}", + relay_task_name, + e, + ); + + return; + }, + }; + + // keep in mind that we are not updating `required_para_header_number_ref` here, because + // then we'll be submitting all previous headers as well (while required relay headers are + // delivered) and we want to avoid that (to reduce cost) + required_parachain_header_number = std::cmp::max( + required_parachain_header_number, + new_required_parachain_header_number, + ); + }, + _ = parachains_relay_task => { + // this should never happen in practice given the current code + restart_relay = true; + }, + } + + // the workflow of the on-demand parachains relay is: + // + // 1) message relay (or any other dependent relay) sees new message at parachain header + // `PH`; 2) it sees that the target chain does not know `PH`; + // 3) it asks on-demand parachains relay to relay `PH` to the target chain; + // + // Phase#1: relaying relay chain header + // + // 4) on-demand parachains relay waits for GRANDPA-finalized block of the source relay chain + // `RH` that is storing `PH` or its descendant. Let it be `PH'`; + // 5) it asks on-demand headers relay to relay `RH` to the target chain; + // 6) it waits until `RH` (or its descendant) is relayed to the target chain; + // + // Phase#2: relaying parachain header + // + // 7) on-demand parachains relay sets `ParachainsSource::maximal_header_number` to the + // `PH'.number()`. 8) parachains finality relay sees that the parachain head has been + // updated and relays `PH'` to the target chain. + + // select headers to relay + let relay_data = read_relay_data( + ¶chains_source, + ¶chains_target, + required_parachain_header_number, + &mut headers_map_cache, + ) + .await; + match relay_data { + Ok(mut relay_data) => { + let prev_relay_state = relay_state; + relay_state = select_headers_to_relay(&mut relay_data, relay_state); + log::trace!( + target: "bridge", + "Selected new relay state in {}: {:?} using old state {:?} and data {:?}", + relay_task_name, + relay_state, + prev_relay_state, + relay_data, + ); + }, + Err(failed_client) => { + relay_utils::relay_loop::reconnect_failed_client( + failed_client, + relay_utils::relay_loop::RECONNECT_DELAY, + &mut parachains_source, + &mut parachains_target, + ) + .await; + continue + }, + } + + // we have selected our new 'state' => let's notify our source clients about our new + // requirements + match relay_state { + RelayState::Idle => (), + RelayState::RelayingRelayHeader(required_relay_header, _) => { + on_demand_source_relay_to_target_headers + .require_more_headers(required_relay_header) + .await; + }, + RelayState::RelayingParaHeader(required_para_header) => { + *required_para_header_number_ref.lock().await = required_para_header; + }, + } + + // start/restart relay + if restart_relay { + let stall_timeout = relay_substrate_client::transaction_stall_timeout( + target_transactions_mortality, + P::TargetChain::AVERAGE_BLOCK_INTERVAL, + crate::STALL_TIMEOUT, + ); + + log::info!( + target: "bridge", + "Starting {} relay\n\t\ + Tx mortality: {:?} (~{}m)\n\t\ + Stall timeout: {:?}", + relay_task_name, + target_transactions_mortality, + stall_timeout.as_secs_f64() / 60.0f64, + stall_timeout, + ); + + parachains_relay_task.set( + parachains_relay::parachains_loop::run( + parachains_source.clone(), + parachains_target.clone(), + ParachainSyncParams { + parachains: vec![P::SOURCE_PARACHAIN_PARA_ID.into()], + stall_timeout: std::time::Duration::from_secs(60), + strategy: parachains_relay::parachains_loop::ParachainSyncStrategy::Any, + }, + MetricsParams::disabled(), + futures::future::pending(), + ) + .fuse(), + ); + + restart_relay = false; + } + } +} + +/// On-demand parachains relay task name. +fn on_demand_parachains_relay_name() -> String { + format!("on-demand-{}-to-{}", SourceChain::NAME, TargetChain::NAME) +} + +/// On-demand relay state. +#[derive(Clone, Copy, Debug, PartialEq)] +enum RelayState { + /// On-demand relay is not doing anything. + Idle, + /// Relaying given relay header to relay given parachain header later. + RelayingRelayHeader(SourceRelayBlock, SourceParaBlock), + /// Relaying given parachain header. + RelayingParaHeader(SourceParaBlock), +} + +/// Data gathered from source and target clients, used by on-demand relay. +#[derive(Debug)] +struct RelayData<'a, SourceParaBlock, SourceRelayBlock> { + /// Parachain header number that is required at the target chain. + pub required_para_header: SourceParaBlock, + /// Parachain header number, known to the target chain. + pub para_header_at_target: SourceParaBlock, + /// Parachain header number, known to the source (relay) chain. + pub para_header_at_source: Option, + /// Relay header number at the source chain. + pub relay_header_at_source: SourceRelayBlock, + /// Relay header number at the target chain. + pub relay_header_at_target: SourceRelayBlock, + /// Map of relay to para header block numbers for recent relay headers. + /// + /// Even if we have been trying to relay relay header #100 to relay parachain header #50 + /// afterwards, it may happen that the relay header #200 may be relayed instead - either + /// by us (e.g. if GRANDPA justification is generated for #200, or if we are only syncing + /// mandatory headers), or by other relayer. Then, instead of parachain header #50 we may + /// relay parachain header #70. + /// + /// This cache is especially important, given that we assume that the nodes we're connected + /// to are not necessarily archive nodes. Then, if current relay chain block is #210 and #200 + /// has been delivered to the target chain, we have more chances to generate storage proof + /// at relay block #200 than on relay block #100, which is most likely has pruned state + /// already. + pub headers_map_cache: &'a mut BTreeMap, +} + +/// Read required data from source and target clients. +async fn read_relay_data<'a, P: SubstrateParachainsPipeline>( + source: &ParachainsSource

, + target: &ParachainsTarget

, + required_header_number: BlockNumberOf, + headers_map_cache: &'a mut BTreeMap< + BlockNumberOf, + BlockNumberOf, + >, +) -> Result< + RelayData<'a, BlockNumberOf, BlockNumberOf>, + FailedClient, +> +where + ParachainsTarget

: + TargetClient> + RelayClient, +{ + let map_target_err = |e| { + log::error!( + target: "bridge", + "Failed to read {} relay data from {} client: {:?}", + on_demand_parachains_relay_name::(), + P::TargetChain::NAME, + e, + ); + FailedClient::Target + }; + let map_source_err = |e| { + log::error!( + target: "bridge", + "Failed to read {} relay data from {} client: {:?}", + on_demand_parachains_relay_name::(), + P::SourceRelayChain::NAME, + e, + ); + FailedClient::Source + }; + + let best_target_block_hash = target.best_block().await.map_err(map_target_err)?.1; + let para_header_at_target = + best_finalized_peer_header_at_self::( + target.client(), + best_target_block_hash, + P::SourceParachain::BEST_FINALIZED_HEADER_ID_METHOD, + ) + .await + .map_err(map_target_err)? + .0; + + let best_finalized_relay_header = + source.client().best_finalized_header().await.map_err(map_source_err)?; + let best_finalized_relay_block_id = + HeaderId(*best_finalized_relay_header.number(), best_finalized_relay_header.hash()); + let para_header_at_source = source + .on_chain_parachain_header( + best_finalized_relay_block_id, + P::SOURCE_PARACHAIN_PARA_ID.into(), + ) + .await + .map_err(map_source_err)? + .map(|h| *h.number()); + + let relay_header_at_source = best_finalized_relay_block_id.0; + let relay_header_at_target = + best_finalized_peer_header_at_self::( + target.client(), + best_target_block_hash, + P::SourceRelayChain::BEST_FINALIZED_HEADER_ID_METHOD, + ) + .await + .map_err(map_target_err)? + .0; + + Ok(RelayData { + required_para_header: required_header_number, + para_header_at_target, + para_header_at_source, + relay_header_at_source, + relay_header_at_target, + headers_map_cache, + }) +} + +// This number is bigger than the session length of any well-known Substrate-based relay +// chain. We expect that the underlying on-demand relay will submit at least 1 header per +// session. +const MAX_HEADERS_MAP_CACHE_ENTRIES: usize = 4096; + +/// Select relay and parachain headers that need to be relayed. +fn select_headers_to_relay<'a, SourceParaBlock, SourceRelayBlock>( + data: &mut RelayData<'a, SourceParaBlock, SourceRelayBlock>, + mut state: RelayState, +) -> RelayState +where + RelayData<'a, SourceParaBlock, SourceRelayBlock>: std::fmt::Debug, // TODO: remove + SourceParaBlock: Copy + PartialOrd, + SourceRelayBlock: Copy + Ord, +{ + // despite of our current state, we want to update the headers map cache + if let Some(para_header_at_source) = data.para_header_at_source { + data.headers_map_cache + .insert(data.relay_header_at_source, para_header_at_source); + if data.headers_map_cache.len() > MAX_HEADERS_MAP_CACHE_ENTRIES { + let first_key = *data.headers_map_cache.keys().next().expect("map is not empty; qed"); + data.headers_map_cache.remove(&first_key); + } + } + + // this switch is responsible for processing `RelayingRelayHeader` state + match state { + RelayState::Idle | RelayState::RelayingParaHeader(_) => (), + RelayState::RelayingRelayHeader(relay_header_number, para_header_number) => { + match data.relay_header_at_target.cmp(&relay_header_number) { + Ordering::Less => { + // relay header hasn't yet been relayed + return RelayState::RelayingRelayHeader(relay_header_number, para_header_number) + }, + Ordering::Equal => { + // relay header has been realyed and we may continue with parachain header + state = RelayState::RelayingParaHeader(para_header_number); + }, + Ordering::Greater => { + // relay header descendant has been relayed and we may need to change parachain + // header that we want to relay + let next_para_header_number = data + .headers_map_cache + .range(..=data.relay_header_at_target) + .next_back() + .map(|(_, next_para_header_number)| *next_para_header_number) + .unwrap_or_else(|| para_header_number); + state = RelayState::RelayingParaHeader(next_para_header_number); + }, + } + }, + } + + // this switch is responsible for processing `RelayingParaHeader` state + match state { + RelayState::Idle => (), + RelayState::RelayingRelayHeader(_, _) => unreachable!("processed by previous match; qed"), + RelayState::RelayingParaHeader(para_header_number) => { + if data.para_header_at_target < para_header_number { + // parachain header hasn't yet been relayed + return RelayState::RelayingParaHeader(para_header_number) + } + }, + } + + // if we have already satisfied our "customer", do nothing + if data.required_para_header <= data.para_header_at_target { + return RelayState::Idle + } + + // if required header is not available even at the source chain, let's wait + if Some(data.required_para_header) > data.para_header_at_source { + return RelayState::Idle + } + + // we will always try to sync latest parachain/relay header, even if we've been asked for some + // its ancestor + + // we need relay chain header first + if data.relay_header_at_target < data.relay_header_at_source { + return RelayState::RelayingRelayHeader( + data.relay_header_at_source, + data.required_para_header, + ) + } + + // if all relay headers synced, we may start directly with parachain header + RelayState::RelayingParaHeader(data.required_para_header) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn relay_waits_for_relay_header_to_be_delivered() { + assert_eq!( + select_headers_to_relay( + &mut RelayData { + required_para_header: 100, + para_header_at_target: 50, + para_header_at_source: Some(110), + relay_header_at_source: 800, + relay_header_at_target: 700, + headers_map_cache: &mut BTreeMap::new(), + }, + RelayState::RelayingRelayHeader(750, 100), + ), + RelayState::RelayingRelayHeader(750, 100), + ); + } + + #[test] + fn relay_starts_relaying_requested_para_header_after_relay_header_is_delivered() { + assert_eq!( + select_headers_to_relay( + &mut RelayData { + required_para_header: 100, + para_header_at_target: 50, + para_header_at_source: Some(110), + relay_header_at_source: 800, + relay_header_at_target: 750, + headers_map_cache: &mut BTreeMap::new(), + }, + RelayState::RelayingRelayHeader(750, 100), + ), + RelayState::RelayingParaHeader(100), + ); + } + + #[test] + fn relay_selects_same_para_header_after_better_relay_header_is_delivered_1() { + assert_eq!( + select_headers_to_relay( + &mut RelayData { + required_para_header: 100, + para_header_at_target: 50, + para_header_at_source: Some(110), + relay_header_at_source: 800, + relay_header_at_target: 780, + headers_map_cache: &mut vec![(700, 90), (750, 100)].into_iter().collect(), + }, + RelayState::RelayingRelayHeader(750, 100), + ), + RelayState::RelayingParaHeader(100), + ); + } + + #[test] + fn relay_selects_same_para_header_after_better_relay_header_is_delivered_2() { + assert_eq!( + select_headers_to_relay( + &mut RelayData { + required_para_header: 100, + para_header_at_target: 50, + para_header_at_source: Some(110), + relay_header_at_source: 800, + relay_header_at_target: 780, + headers_map_cache: &mut BTreeMap::new(), + }, + RelayState::RelayingRelayHeader(750, 100), + ), + RelayState::RelayingParaHeader(100), + ); + } + + #[test] + fn relay_selects_better_para_header_after_better_relay_header_is_delivered() { + assert_eq!( + select_headers_to_relay( + &mut RelayData { + required_para_header: 100, + para_header_at_target: 50, + para_header_at_source: Some(120), + relay_header_at_source: 800, + relay_header_at_target: 780, + headers_map_cache: &mut vec![(700, 90), (750, 100), (780, 110), (790, 120)] + .into_iter() + .collect(), + }, + RelayState::RelayingRelayHeader(750, 100), + ), + RelayState::RelayingParaHeader(110), + ); + } + + #[test] + fn relay_waits_for_para_header_to_be_delivered() { + assert_eq!( + select_headers_to_relay( + &mut RelayData { + required_para_header: 100, + para_header_at_target: 50, + para_header_at_source: Some(110), + relay_header_at_source: 800, + relay_header_at_target: 700, + headers_map_cache: &mut BTreeMap::new(), + }, + RelayState::RelayingParaHeader(100), + ), + RelayState::RelayingParaHeader(100), + ); + } + + #[test] + fn relay_stays_idle_if_required_para_header_is_already_delivered() { + assert_eq!( + select_headers_to_relay( + &mut RelayData { + required_para_header: 100, + para_header_at_target: 100, + para_header_at_source: Some(110), + relay_header_at_source: 800, + relay_header_at_target: 700, + headers_map_cache: &mut BTreeMap::new(), + }, + RelayState::Idle, + ), + RelayState::Idle, + ); + } + + #[test] + fn relay_waits_for_required_para_header_to_appear_at_source_1() { + assert_eq!( + select_headers_to_relay( + &mut RelayData { + required_para_header: 110, + para_header_at_target: 100, + para_header_at_source: None, + relay_header_at_source: 800, + relay_header_at_target: 700, + headers_map_cache: &mut BTreeMap::new(), + }, + RelayState::Idle, + ), + RelayState::Idle, + ); + } + + #[test] + fn relay_waits_for_required_para_header_to_appear_at_source_2() { + assert_eq!( + select_headers_to_relay( + &mut RelayData { + required_para_header: 110, + para_header_at_target: 100, + para_header_at_source: Some(100), + relay_header_at_source: 800, + relay_header_at_target: 700, + headers_map_cache: &mut BTreeMap::new(), + }, + RelayState::Idle, + ), + RelayState::Idle, + ); + } + + #[test] + fn relay_starts_relaying_relay_header_when_new_para_header_is_requested() { + assert_eq!( + select_headers_to_relay( + &mut RelayData { + required_para_header: 110, + para_header_at_target: 100, + para_header_at_source: Some(110), + relay_header_at_source: 800, + relay_header_at_target: 700, + headers_map_cache: &mut BTreeMap::new(), + }, + RelayState::Idle, + ), + RelayState::RelayingRelayHeader(800, 110), + ); + } + + #[test] + fn relay_starts_relaying_para_header_when_new_para_header_is_requested() { + assert_eq!( + select_headers_to_relay( + &mut RelayData { + required_para_header: 110, + para_header_at_target: 100, + para_header_at_source: Some(110), + relay_header_at_source: 800, + relay_header_at_target: 800, + headers_map_cache: &mut BTreeMap::new(), + }, + RelayState::Idle, + ), + RelayState::RelayingParaHeader(110), + ); + } + + #[test] + fn headers_map_cache_is_updated() { + let mut headers_map_cache = BTreeMap::new(); + + // when parachain header is known, map is updated + select_headers_to_relay( + &mut RelayData { + required_para_header: 0, + para_header_at_target: 50, + para_header_at_source: Some(110), + relay_header_at_source: 800, + relay_header_at_target: 700, + headers_map_cache: &mut headers_map_cache, + }, + RelayState::RelayingRelayHeader(750, 100), + ); + assert_eq!(headers_map_cache.clone().into_iter().collect::>(), vec![(800, 110)],); + + // when parachain header is not known, map is NOT updated + select_headers_to_relay( + &mut RelayData { + required_para_header: 0, + para_header_at_target: 50, + para_header_at_source: None, + relay_header_at_source: 800, + relay_header_at_target: 700, + headers_map_cache: &mut headers_map_cache, + }, + RelayState::RelayingRelayHeader(750, 100), + ); + assert_eq!(headers_map_cache.clone().into_iter().collect::>(), vec![(800, 110)],); + + // map auto-deduplicates equal entries + select_headers_to_relay( + &mut RelayData { + required_para_header: 0, + para_header_at_target: 50, + para_header_at_source: Some(110), + relay_header_at_source: 800, + relay_header_at_target: 700, + headers_map_cache: &mut headers_map_cache, + }, + RelayState::RelayingRelayHeader(750, 100), + ); + assert_eq!(headers_map_cache.clone().into_iter().collect::>(), vec![(800, 110)],); + + // nothing is pruned if number of map entries is < MAX_HEADERS_MAP_CACHE_ENTRIES + for i in 1..MAX_HEADERS_MAP_CACHE_ENTRIES { + select_headers_to_relay( + &mut RelayData { + required_para_header: 0, + para_header_at_target: 50, + para_header_at_source: Some(110 + i), + relay_header_at_source: 800 + i, + relay_header_at_target: 700, + headers_map_cache: &mut headers_map_cache, + }, + RelayState::RelayingRelayHeader(750, 100), + ); + assert_eq!(headers_map_cache.len(), i + 1); + } + + // when we add next entry, the oldest one is pruned + assert!(headers_map_cache.contains_key(&800)); + assert_eq!(headers_map_cache.len(), MAX_HEADERS_MAP_CACHE_ENTRIES); + select_headers_to_relay( + &mut RelayData { + required_para_header: 0, + para_header_at_target: 50, + para_header_at_source: Some(110 + MAX_HEADERS_MAP_CACHE_ENTRIES), + relay_header_at_source: 800 + MAX_HEADERS_MAP_CACHE_ENTRIES, + relay_header_at_target: 700, + headers_map_cache: &mut headers_map_cache, + }, + RelayState::RelayingRelayHeader(750, 100), + ); + assert!(!headers_map_cache.contains_key(&800)); + assert_eq!(headers_map_cache.len(), MAX_HEADERS_MAP_CACHE_ENTRIES); + } +} diff --git a/bridges/relays/lib-substrate-relay/src/parachains/mod.rs b/bridges/relays/lib-substrate-relay/src/parachains/mod.rs new file mode 100644 index 0000000000..ef14b7bb10 --- /dev/null +++ b/bridges/relays/lib-substrate-relay/src/parachains/mod.rs @@ -0,0 +1,110 @@ +// Copyright 2019-2021 Parity Technologies (UK) Ltd. +// This file is part of Parity Bridges Common. + +// Parity Bridges Common is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity Bridges Common is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity Bridges Common. If not, see . + +//! Types and functions intended to ease adding of new Substrate -> Substrate +//! parachain finality proofs synchronization pipelines. + +use async_trait::async_trait; +use bp_polkadot_core::parachains::{ParaHeadsProof, ParaId}; +use pallet_bridge_parachains::{ + Call as BridgeParachainsCall, Config as BridgeParachainsConfig, RelayBlockHash, + RelayBlockHasher, RelayBlockNumber, +}; +use parachains_relay::ParachainsPipeline; +use relay_substrate_client::{CallOf, Chain, HashOf, RelayChain, TransactionSignScheme}; +use std::{fmt::Debug, marker::PhantomData}; + +pub mod source; +pub mod target; + +/// Substrate -> Substrate parachain finality proofs synchronization pipeline. +/// +/// This is currently restricted to the single parachain, because it is how it +/// will be used (at least) initially. +#[async_trait] +pub trait SubstrateParachainsPipeline: 'static + Clone + Debug + Send + Sync { + /// Headers of this parachain are submitted to the `Self::TargetChain`. + type SourceParachain: Chain; + /// Relay chain that is storing headers of `Self::SourceParachain`. + type SourceRelayChain: RelayChain; + /// Target chain where `Self::SourceParachain` headers are submitted. + type TargetChain: Chain; + + /// How submit parachains heads call is built? + type SubmitParachainHeadsCallBuilder: SubmitParachainHeadsCallBuilder; + /// Scheme used to sign target chain transactions. + type TransactionSignScheme: TransactionSignScheme; + + /// Id of the `Self::SourceParachain`, used for registration in `Self::SourceRelayChain`. + const SOURCE_PARACHAIN_PARA_ID: u32; +} + +/// Adapter that allows all `SubstrateParachainsPipeline` to act as `ParachainsPipeline`. +#[derive(Clone, Debug)] +pub struct ParachainsPipelineAdapter { + _phantom: PhantomData

, +} + +impl ParachainsPipeline for ParachainsPipelineAdapter

{ + type SourceChain = P::SourceRelayChain; + type TargetChain = P::TargetChain; +} + +/// Different ways of building `submit_parachain_heads` calls. +pub trait SubmitParachainHeadsCallBuilder: + 'static + Send + Sync +{ + /// Given parachains and their heads proof, build call of `submit_parachain_heads` + /// function of bridge parachains module at the target chain. + fn build_submit_parachain_heads_call( + relay_block_hash: HashOf, + parachains: Vec, + parachain_heads_proof: ParaHeadsProof, + ) -> CallOf; +} + +/// Building `submit_parachain_heads` call when you have direct access to the target +/// chain runtime. +pub struct DirectSubmitParachainHeadsCallBuilder { + _phantom: PhantomData<(P, R, I)>, +} + +impl SubmitParachainHeadsCallBuilder

for DirectSubmitParachainHeadsCallBuilder +where + P: SubstrateParachainsPipeline, + P::SourceRelayChain: Chain, + R: BridgeParachainsConfig + Send + Sync, + I: 'static + Send + Sync, + R::BridgedChain: bp_runtime::Chain< + BlockNumber = RelayBlockNumber, + Hash = RelayBlockHash, + Hasher = RelayBlockHasher, + >, + CallOf: From>, +{ + fn build_submit_parachain_heads_call( + relay_block_hash: HashOf, + parachains: Vec, + parachain_heads_proof: ParaHeadsProof, + ) -> CallOf { + BridgeParachainsCall::::submit_parachain_heads { + relay_block_hash, + parachains, + parachain_heads_proof, + } + .into() + } +} diff --git a/bridges/relays/lib-substrate-relay/src/parachains/source.rs b/bridges/relays/lib-substrate-relay/src/parachains/source.rs new file mode 100644 index 0000000000..3ae735ab89 --- /dev/null +++ b/bridges/relays/lib-substrate-relay/src/parachains/source.rs @@ -0,0 +1,163 @@ +// Copyright 2019-2021 Parity Technologies (UK) Ltd. +// This file is part of Parity Bridges Common. + +// Parity Bridges Common is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity Bridges Common is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity Bridges Common. If not, see . + +//! Parachain heads source. + +use crate::{ + finality::source::RequiredHeaderNumberRef, + parachains::{ParachainsPipelineAdapter, SubstrateParachainsPipeline}, +}; + +use async_std::sync::{Arc, Mutex}; +use async_trait::async_trait; +use bp_parachains::parachain_head_storage_key_at_source; +use bp_polkadot_core::parachains::{ParaHash, ParaHead, ParaHeadsProof, ParaId}; +use codec::Decode; +use parachains_relay::parachains_loop::SourceClient; +use relay_substrate_client::{ + Chain, Client, Error as SubstrateError, HeaderIdOf, HeaderOf, RelayChain, +}; +use relay_utils::relay_loop::Client as RelayClient; +use sp_runtime::traits::Header as HeaderT; + +/// Substrate client as parachain heads source. +#[derive(Clone)] +pub struct ParachainsSource { + client: Client, + maximal_header_number: Option>, + previous_parachain_head: Arc>>, +} + +impl ParachainsSource

{ + /// Creates new parachains source client. + pub fn new( + client: Client, + maximal_header_number: Option>, + ) -> Self { + let previous_parachain_head = Arc::new(Mutex::new(None)); + ParachainsSource { client, maximal_header_number, previous_parachain_head } + } + + /// Returns reference to the underlying RPC client. + pub fn client(&self) -> &Client { + &self.client + } + + /// Return decoded head of given parachain. + pub async fn on_chain_parachain_header( + &self, + at_block: HeaderIdOf, + para_id: ParaId, + ) -> Result>, SubstrateError> { + let storage_key = + parachain_head_storage_key_at_source(P::SourceRelayChain::PARAS_PALLET_NAME, para_id); + let para_head = self.client.raw_storage_value(storage_key, Some(at_block.1)).await?; + let para_head = para_head.map(|h| ParaHead::decode(&mut &h.0[..])).transpose()?; + let para_head = match para_head { + Some(para_head) => para_head, + None => return Ok(None), + }; + + Ok(Some(Decode::decode(&mut ¶_head.0[..])?)) + } +} + +#[async_trait] +impl RelayClient for ParachainsSource

{ + type Error = SubstrateError; + + async fn reconnect(&mut self) -> Result<(), SubstrateError> { + self.client.reconnect().await + } +} + +#[async_trait] +impl SourceClient> + for ParachainsSource

+where + P::SourceParachain: Chain, +{ + async fn ensure_synced(&self) -> Result { + match self.client.ensure_synced().await { + Ok(_) => Ok(true), + Err(SubstrateError::ClientNotSynced(_)) => Ok(false), + Err(e) => Err(e), + } + } + + async fn parachain_head( + &self, + at_block: HeaderIdOf, + para_id: ParaId, + ) -> Result, Self::Error> { + // we don't need to support many parachains now + if para_id.0 != P::SOURCE_PARACHAIN_PARA_ID { + return Err(SubstrateError::Custom(format!( + "Parachain id {} is not matching expected {}", + para_id.0, + P::SOURCE_PARACHAIN_PARA_ID, + ))) + } + + let parachain_head = match self.on_chain_parachain_header(at_block, para_id).await? { + Some(parachain_header) => { + let mut parachain_head = Some(parachain_header.hash()); + // never return head that is larger than requested. This way we'll never sync + // headers past `maximal_header_number` + if let Some(ref maximal_header_number) = self.maximal_header_number { + let maximal_header_number = *maximal_header_number.lock().await; + if *parachain_header.number() > maximal_header_number { + let previous_parachain_head = *self.previous_parachain_head.lock().await; + if let Some(previous_parachain_head) = previous_parachain_head { + parachain_head = Some(previous_parachain_head); + } + } + } + + parachain_head + }, + None => None, + }; + + *self.previous_parachain_head.lock().await = parachain_head; + + Ok(parachain_head) + } + + async fn prove_parachain_heads( + &self, + at_block: HeaderIdOf, + parachains: &[ParaId], + ) -> Result { + let storage_keys = parachains + .iter() + .map(|para_id| { + parachain_head_storage_key_at_source( + P::SourceRelayChain::PARAS_PALLET_NAME, + *para_id, + ) + }) + .collect(); + let parachain_heads_proof = self + .client + .prove_storage(storage_keys, at_block.1) + .await? + .iter_nodes() + .collect(); + + Ok(parachain_heads_proof) + } +} diff --git a/bridges/relays/lib-substrate-relay/src/parachains_target.rs b/bridges/relays/lib-substrate-relay/src/parachains/target.rs similarity index 50% rename from bridges/relays/lib-substrate-relay/src/parachains_target.rs rename to bridges/relays/lib-substrate-relay/src/parachains/target.rs index ed0456c06f..3a4e0b2c27 100644 --- a/bridges/relays/lib-substrate-relay/src/parachains_target.rs +++ b/bridges/relays/lib-substrate-relay/src/parachains/target.rs @@ -16,112 +16,58 @@ //! Parachain heads target. -use crate::TransactionParams; +use crate::{ + parachains::{ + ParachainsPipelineAdapter, SubmitParachainHeadsCallBuilder, SubstrateParachainsPipeline, + }, + TransactionParams, +}; use async_trait::async_trait; use bp_parachains::{parachain_head_storage_key_at_target, BestParaHeadHash}; use bp_polkadot_core::parachains::{ParaHeadsProof, ParaId}; use codec::{Decode, Encode}; -use pallet_bridge_parachains::{ - Call as BridgeParachainsCall, Config as BridgeParachainsConfig, RelayBlockHash, - RelayBlockHasher, RelayBlockNumber, -}; -use parachains_relay::{parachains_loop::TargetClient, ParachainsPipeline}; +use parachains_relay::parachains_loop::TargetClient; use relay_substrate_client::{ - AccountIdOf, AccountKeyPairOf, BlockNumberOf, CallOf, Chain, Client, Error as SubstrateError, - HashOf, HeaderIdOf, SignParam, TransactionEra, TransactionSignScheme, UnsignedTransaction, + AccountIdOf, AccountKeyPairOf, BlockNumberOf, Chain, Client, Error as SubstrateError, HashOf, + HeaderIdOf, RelayChain, SignParam, TransactionEra, TransactionSignScheme, UnsignedTransaction, }; use relay_utils::{relay_loop::Client as RelayClient, HeaderId}; use sp_core::{Bytes, Pair}; use sp_runtime::traits::Header as HeaderT; -use std::marker::PhantomData; - -/// Different ways of building `submit_parachain_heads` calls. -pub trait SubmitParachainHeadsCallBuilder: 'static + Send + Sync { - /// Given parachains and their heads proof, build call of `submit_parachain_heads` - /// function of bridge parachains module at the target chain. - fn build_submit_parachain_heads_call( - relay_block_hash: HashOf, - parachains: Vec, - parachain_heads_proof: ParaHeadsProof, - ) -> CallOf; -} - -/// Building `submit_parachain_heads` call when you have direct access to the target -/// chain runtime. -pub struct DirectSubmitParachainHeadsCallBuilder { - _phantom: PhantomData<(P, R, I)>, -} - -impl SubmitParachainHeadsCallBuilder

for DirectSubmitParachainHeadsCallBuilder -where - P: ParachainsPipeline, - P::SourceChain: Chain, - R: BridgeParachainsConfig + Send + Sync, - I: 'static + Send + Sync, - R::BridgedChain: bp_runtime::Chain< - BlockNumber = RelayBlockNumber, - Hash = RelayBlockHash, - Hasher = RelayBlockHasher, - >, - CallOf: From>, -{ - fn build_submit_parachain_heads_call( - relay_block_hash: HashOf, - parachains: Vec, - parachain_heads_proof: ParaHeadsProof, - ) -> CallOf { - BridgeParachainsCall::::submit_parachain_heads { - relay_block_hash, - parachains, - parachain_heads_proof, - } - .into() - } -} /// Substrate client as parachain heads source. -pub struct ParachainsTarget { +pub struct ParachainsTarget { client: Client, - transaction_params: TransactionParams>, - bridge_paras_pallet_name: String, - _phantom: PhantomData, + transaction_params: TransactionParams>, } -impl ParachainsTarget { +impl ParachainsTarget

{ /// Creates new parachains target client. pub fn new( client: Client, - transaction_params: TransactionParams>, - bridge_paras_pallet_name: String, + transaction_params: TransactionParams>, ) -> Self { - ParachainsTarget { - client, - transaction_params, - bridge_paras_pallet_name, - _phantom: Default::default(), - } + ParachainsTarget { client, transaction_params } + } + + /// Returns reference to the underlying RPC client. + pub fn client(&self) -> &Client { + &self.client } } -impl Clone for ParachainsTarget { +impl Clone for ParachainsTarget

{ fn clone(&self) -> Self { ParachainsTarget { client: self.client.clone(), transaction_params: self.transaction_params.clone(), - bridge_paras_pallet_name: self.bridge_paras_pallet_name.clone(), - _phantom: Default::default(), } } } #[async_trait] -impl< - P: ParachainsPipeline, - S: 'static + TransactionSignScheme, - CB: SubmitParachainHeadsCallBuilder

, - > RelayClient for ParachainsTarget -{ +impl RelayClient for ParachainsTarget

{ type Error = SubstrateError; async fn reconnect(&mut self) -> Result<(), SubstrateError> { @@ -130,12 +76,11 @@ impl< } #[async_trait] -impl TargetClient

for ParachainsTarget +impl

TargetClient> for ParachainsTarget

where - P: ParachainsPipeline, - S: 'static + TransactionSignScheme, - CB: SubmitParachainHeadsCallBuilder

, - AccountIdOf: From< as Pair>::Public>, + P: SubstrateParachainsPipeline, + P::TransactionSignScheme: TransactionSignScheme, + AccountIdOf: From< as Pair>::Public>, { async fn best_block(&self) -> Result, Self::Error> { let best_header = self.client.best_header().await?; @@ -148,18 +93,18 @@ where async fn best_finalized_source_block( &self, at_block: &HeaderIdOf, - ) -> Result, Self::Error> { + ) -> Result, Self::Error> { let encoded_best_finalized_source_block = self .client .state_call( - P::SourceChain::BEST_FINALIZED_HEADER_ID_METHOD.into(), + P::SourceRelayChain::BEST_FINALIZED_HEADER_ID_METHOD.into(), Bytes(Vec::new()), Some(at_block.1), ) .await?; let decoded_best_finalized_source_block: ( - BlockNumberOf, - HashOf, + BlockNumberOf, + HashOf, ) = Decode::decode(&mut &encoded_best_finalized_source_block.0[..]) .map_err(SubstrateError::ResponseParseFailed)?; Ok(HeaderId(decoded_best_finalized_source_block.0, decoded_best_finalized_source_block.1)) @@ -170,8 +115,10 @@ where at_block: HeaderIdOf, para_id: ParaId, ) -> Result, Self::Error> { - let storage_key = - parachain_head_storage_key_at_target(&self.bridge_paras_pallet_name, para_id); + let storage_key = parachain_head_storage_key_at_target( + P::SourceRelayChain::PARACHAINS_FINALITY_PALLET_NAME, + para_id, + ); let para_head = self.client.storage_value(storage_key, Some(at_block.1)).await?; Ok(para_head) @@ -179,21 +126,24 @@ where async fn submit_parachain_heads_proof( &self, - at_relay_block: HeaderIdOf, + at_relay_block: HeaderIdOf, updated_parachains: Vec, proof: ParaHeadsProof, ) -> Result<(), Self::Error> { let genesis_hash = *self.client.genesis_hash(); let transaction_params = self.transaction_params.clone(); let (spec_version, transaction_version) = self.client.simple_runtime_version().await?; - let call = - CB::build_submit_parachain_heads_call(at_relay_block.1, updated_parachains, proof); + let call = P::SubmitParachainHeadsCallBuilder::build_submit_parachain_heads_call( + at_relay_block.1, + updated_parachains, + proof, + ); self.client .submit_signed_extrinsic( self.transaction_params.signer.public().into(), move |best_block_id, transaction_nonce| { Ok(Bytes( - S::sign_transaction(SignParam { + P::TransactionSignScheme::sign_transaction(SignParam { spec_version, transaction_version, genesis_hash, diff --git a/bridges/relays/lib-substrate-relay/src/parachains_source.rs b/bridges/relays/lib-substrate-relay/src/parachains_source.rs deleted file mode 100644 index 9e8d378d8b..0000000000 --- a/bridges/relays/lib-substrate-relay/src/parachains_source.rs +++ /dev/null @@ -1,91 +0,0 @@ -// Copyright 2019-2021 Parity Technologies (UK) Ltd. -// This file is part of Parity Bridges Common. - -// Parity Bridges Common is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Parity Bridges Common is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Parity Bridges Common. If not, see . - -//! Parachain heads source. - -use async_trait::async_trait; -use bp_parachains::parachain_head_storage_key_at_source; -use bp_polkadot_core::parachains::{ParaHash, ParaHead, ParaHeadsProof, ParaId}; -use codec::Decode; -use parachains_relay::{parachains_loop::SourceClient, ParachainsPipeline}; -use relay_substrate_client::{Client, Error as SubstrateError, HeaderIdOf}; -use relay_utils::relay_loop::Client as RelayClient; - -/// Substrate client as parachain heads source. -#[derive(Clone)] -pub struct ParachainsSource { - client: Client, - paras_pallet_name: String, -} - -impl ParachainsSource

{ - /// Creates new parachains source client. - pub fn new(client: Client, paras_pallet_name: String) -> Self { - ParachainsSource { client, paras_pallet_name } - } -} - -#[async_trait] -impl RelayClient for ParachainsSource

{ - type Error = SubstrateError; - - async fn reconnect(&mut self) -> Result<(), SubstrateError> { - self.client.reconnect().await - } -} - -#[async_trait] -impl SourceClient

for ParachainsSource

{ - async fn ensure_synced(&self) -> Result { - match self.client.ensure_synced().await { - Ok(_) => Ok(true), - Err(SubstrateError::ClientNotSynced(_)) => Ok(false), - Err(e) => Err(e), - } - } - - async fn parachain_head( - &self, - at_block: HeaderIdOf, - para_id: ParaId, - ) -> Result, Self::Error> { - let storage_key = parachain_head_storage_key_at_source(&self.paras_pallet_name, para_id); - let para_head = self.client.raw_storage_value(storage_key, Some(at_block.1)).await?; - let para_head = para_head.map(|h| ParaHead::decode(&mut &h.0[..])).transpose()?; - let para_hash = para_head.map(|h| h.hash()); - - Ok(para_hash) - } - - async fn prove_parachain_heads( - &self, - at_block: HeaderIdOf, - parachains: &[ParaId], - ) -> Result { - let storage_keys = parachains - .iter() - .map(|para_id| parachain_head_storage_key_at_source(&self.paras_pallet_name, *para_id)) - .collect(); - let parachain_heads_proof = self - .client - .prove_storage(storage_keys, at_block.1) - .await? - .iter_nodes() - .collect(); - - Ok(parachain_heads_proof) - } -}