on-demand headers relay (#833)

* on-demand headers relay

* bool::then

* move file

* atomic submit_signed_extrinsic

* remove cli options from future

* test on-demand relay

* TODOs

* fixed initialization call for Westend -> Millau

* Update relays/client-substrate/src/client.rs

Co-authored-by: Hernando Castano <HCastano@users.noreply.github.com>

* removed on_demand_headers_relay.rs

* on_demand_headers_relay traces

* fix compilation

* fmt

* docs

Co-authored-by: Hernando Castano <HCastano@users.noreply.github.com>
This commit is contained in:
Svyatoslav Nikolsky
2021-03-26 08:49:59 +03:00
committed by Bastian Köcher
parent 51fc83941b
commit 4e4e9a8e4e
18 changed files with 367 additions and 223 deletions
@@ -156,13 +156,21 @@ impl SubmitEthereumHeaders for SubstrateClient<Rialto> {
) -> SubmittedHeaders<EthereumHeaderId, RpcError> {
let ids = headers.iter().map(|header| header.id()).collect();
let submission_result = async {
let account_id = params.signer.public().as_array_ref().clone().into();
let nonce = self.next_account_index(account_id).await?;
let call = instance.build_signed_header_call(headers);
let transaction = Rialto::sign_transaction(*self.genesis_hash(), &params.signer, nonce, call);
let _ = self.submit_extrinsic(Bytes(transaction.encode())).await?;
self.submit_signed_extrinsic(
params.signer.public().as_array_ref().clone().into(),
|transaction_nonce| {
Bytes(
Rialto::sign_transaction(
*self.genesis_hash(),
&params.signer,
transaction_nonce,
instance.build_signed_header_call(headers),
)
.encode(),
)
},
)
.await?;
Ok(())
}
.await;
@@ -197,7 +205,7 @@ impl SubmitEthereumHeaders for SubstrateClient<Rialto> {
let call = instance.build_unsigned_header_call(header);
let transaction = create_unsigned_submit_transaction(call);
match self.submit_extrinsic(Bytes(transaction.encode())).await {
match self.submit_unsigned_extrinsic(Bytes(transaction.encode())).await {
Ok(_) => submitted_headers.submitted.push(id),
Err(error) => {
submitted_headers.rejected.push(id);
@@ -252,13 +260,21 @@ impl SubmitEthereumExchangeTransactionProof for SubstrateClient<Rialto> {
instance: Arc<dyn BridgeInstance>,
proof: rialto_runtime::exchange::EthereumTransactionInclusionProof,
) -> RpcResult<()> {
let account_id = params.signer.public().as_array_ref().clone().into();
let nonce = self.next_account_index(account_id).await?;
let call = instance.build_currency_exchange_call(proof);
let transaction = Rialto::sign_transaction(*self.genesis_hash(), &params.signer, nonce, call);
let _ = self.submit_extrinsic(Bytes(transaction.encode())).await?;
self.submit_signed_extrinsic(
params.signer.public().as_array_ref().clone().into(),
|transaction_nonce| {
Bytes(
Rialto::sign_transaction(
*self.genesis_hash(),
&params.signer,
transaction_nonce,
instance.build_currency_exchange_call(proof),
)
.encode(),
)
},
)
.await?;
Ok(())
}
}
@@ -18,14 +18,13 @@
use crate::finality_target::SubstrateFinalityTarget;
use async_trait::async_trait;
use codec::Encode;
use finality_relay::{FinalitySyncParams, FinalitySyncPipeline};
use relay_substrate_client::{
finality_source::{FinalitySource, Justification},
BlockNumberOf, Chain, Client, Error as SubstrateError, HashOf, SyncHeader,
BlockNumberOf, Chain, Client, HashOf, SyncHeader,
};
use relay_utils::BlockNumberBase;
use sp_core::Bytes;
use std::{fmt::Debug, marker::PhantomData, time::Duration};
/// Default synchronization loop timeout.
@@ -37,20 +36,23 @@ const STALL_TIMEOUT: Duration = Duration::from_secs(120);
const RECENT_FINALITY_PROOFS_LIMIT: usize = 4096;
/// Headers sync pipeline for Substrate <-> Substrate relays.
#[async_trait]
pub trait SubstrateFinalitySyncPipeline: FinalitySyncPipeline {
/// Name of the runtime method that returns id of best finalized source header at target chain.
const BEST_FINALIZED_SOURCE_HEADER_ID_AT_TARGET: &'static str;
/// Signed transaction type.
type SignedTransaction: Send + Sync + Encode;
/// Chain with GRANDPA bridge pallet.
type TargetChain: Chain;
/// Returns id of account that we're using to sign transactions at target chain.
fn transactions_author(&self) -> <Self::TargetChain as Chain>::AccountId;
/// Make submit header transaction.
async fn make_submit_finality_proof_transaction(
fn make_submit_finality_proof_transaction(
&self,
transaction_nonce: <Self::TargetChain as Chain>::Index,
header: Self::Header,
proof: Self::FinalityProof,
) -> Result<Self::SignedTransaction, SubstrateError>;
) -> Bytes;
}
/// Substrate-to-Substrate finality proof pipeline.
@@ -105,6 +107,7 @@ where
Number = BlockNumberOf<SourceChain>,
Header = SyncHeader<SourceChain::Header>,
FinalityProof = Justification<SourceChain::BlockNumber>,
TargetChain = TargetChain,
>,
SourceChain: Clone + Chain,
BlockNumberOf<SourceChain>: BlockNumberBase,
@@ -21,12 +21,10 @@
use crate::finality_pipeline::SubstrateFinalitySyncPipeline;
use async_trait::async_trait;
use codec::{Decode, Encode};
use codec::Decode;
use finality_relay::TargetClient;
use futures::TryFutureExt;
use relay_substrate_client::{Chain, Client, Error as SubstrateError};
use relay_utils::relay_loop::Client as RelayClient;
use sp_core::Bytes;
/// Substrate client as Substrate finality target.
pub struct SubstrateFinalityTarget<C: Chain, P> {
@@ -65,7 +63,7 @@ where
C: Chain,
P::Number: Decode,
P::Hash: Decode,
P: SubstrateFinalitySyncPipeline,
P: SubstrateFinalitySyncPipeline<TargetChain = C>,
{
async fn best_finalized_source_block_number(&self) -> Result<P::Number, SubstrateError> {
// we can't continue to relay finality if target node is out of sync, because
@@ -82,9 +80,11 @@ where
}
async fn submit_finality_proof(&self, header: P::Header, proof: P::FinalityProof) -> Result<(), SubstrateError> {
self.pipeline
.make_submit_finality_proof_transaction(header, proof)
.and_then(|tx| self.client.submit_extrinsic(Bytes(tx.encode())))
self.client
.submit_signed_extrinsic(self.pipeline.transactions_author(), move |transaction_nonce| {
self.pipeline
.make_submit_finality_proof_transaction(transaction_nonce, header, proof)
})
.await
.map(drop)
}
@@ -38,9 +38,16 @@ use sp_runtime::traits::Header as HeaderT;
pub async fn initialize<SourceChain: Chain, TargetChain: Chain>(
source_client: Client<SourceChain>,
target_client: Client<TargetChain>,
prepare_initialize_transaction: impl FnOnce(InitializationData<SourceChain::Header>) -> Result<Bytes, String>,
target_transactions_signer: TargetChain::AccountId,
prepare_initialize_transaction: impl FnOnce(TargetChain::Index, InitializationData<SourceChain::Header>) -> Bytes,
) {
let result = do_initialize(source_client, target_client, prepare_initialize_transaction).await;
let result = do_initialize(
source_client,
target_client,
target_transactions_signer,
prepare_initialize_transaction,
)
.await;
match result {
Ok(tx_hash) => log::info!(
@@ -64,7 +71,8 @@ pub async fn initialize<SourceChain: Chain, TargetChain: Chain>(
async fn do_initialize<SourceChain: Chain, TargetChain: Chain>(
source_client: Client<SourceChain>,
target_client: Client<TargetChain>,
prepare_initialize_transaction: impl FnOnce(InitializationData<SourceChain::Header>) -> Result<Bytes, String>,
target_transactions_signer: TargetChain::AccountId,
prepare_initialize_transaction: impl FnOnce(TargetChain::Index, InitializationData<SourceChain::Header>) -> Bytes,
) -> Result<TargetChain::Hash, String> {
let initialization_data = prepare_initialization_data(source_client).await?;
log::info!(
@@ -75,9 +83,10 @@ async fn do_initialize<SourceChain: Chain, TargetChain: Chain>(
initialization_data,
);
let initialization_tx = prepare_initialize_transaction(initialization_data)?;
let initialization_tx_hash = target_client
.submit_extrinsic(initialization_tx)
.submit_signed_extrinsic(target_transactions_signer, move |transaction_nonce| {
prepare_initialize_transaction(transaction_nonce, initialization_data)
})
.await
.map_err(|err| format!("Failed to submit {} transaction: {:?}", TargetChain::NAME, err))?;
Ok(initialization_tx_hash)
@@ -17,17 +17,15 @@
use crate::messages_source::SubstrateMessagesProof;
use crate::messages_target::SubstrateMessagesReceivingProof;
use async_trait::async_trait;
use bp_messages::MessageNonce;
use codec::Encode;
use frame_support::weights::Weight;
use messages_relay::message_lane::{MessageLane, SourceHeaderIdOf, TargetHeaderIdOf};
use relay_substrate_client::{BlockNumberOf, Chain, Client, Error as SubstrateError, HashOf};
use relay_substrate_client::{BlockNumberOf, Chain, Client, HashOf};
use relay_utils::BlockNumberBase;
use sp_core::Bytes;
use std::ops::RangeInclusive;
/// Message sync pipeline for Substrate <-> Substrate relays.
#[async_trait]
pub trait SubstrateMessageLane: MessageLane {
/// Name of the runtime method that returns dispatch weight of outbound messages at the source chain.
const OUTBOUND_LANE_MESSAGES_DISPATCH_WEIGHT_METHOD: &'static str;
@@ -48,25 +46,33 @@ pub trait SubstrateMessageLane: MessageLane {
/// Name of the runtime method that returns id of best finalized target header at source chain.
const BEST_FINALIZED_TARGET_HEADER_ID_AT_SOURCE: &'static str;
/// Signed transaction type of the source chain.
type SourceSignedTransaction: Send + Sync + Encode;
/// Signed transaction type of the target chain.
type TargetSignedTransaction: Send + Sync + Encode;
/// Source chain.
type SourceChain: Chain;
/// Target chain.
type TargetChain: Chain;
/// Returns id of account that we're using to sign transactions at target chain (messages proof).
fn target_transactions_author(&self) -> <Self::TargetChain as Chain>::AccountId;
/// Make messages delivery transaction.
async fn make_messages_delivery_transaction(
fn make_messages_delivery_transaction(
&self,
transaction_nonce: <Self::TargetChain as Chain>::Index,
generated_at_header: SourceHeaderIdOf<Self>,
nonces: RangeInclusive<MessageNonce>,
proof: Self::MessagesProof,
) -> Result<Self::TargetSignedTransaction, SubstrateError>;
) -> Bytes;
/// Returns id of account that we're using to sign transactions at source chain (delivery proof).
fn source_transactions_author(&self) -> <Self::SourceChain as Chain>::AccountId;
/// Make messages receiving proof transaction.
async fn make_messages_receiving_proof_transaction(
fn make_messages_receiving_proof_transaction(
&self,
transaction_nonce: <Self::SourceChain as Chain>::Index,
generated_at_header: TargetHeaderIdOf<Self>,
proof: Self::MessagesReceivingProof,
) -> Result<Self::SourceSignedTransaction, SubstrateError>;
) -> Bytes;
}
/// Substrate-to-Substrate message lane.
@@ -94,6 +94,7 @@ where
MessagesProof = SubstrateMessagesProof<C>,
SourceHeaderNumber = <C::Header as HeaderT>::Number,
SourceHeaderHash = <C::Header as HeaderT>::Hash,
SourceChain = C,
>,
P::TargetHeaderNumber: Decode,
P::TargetHeaderHash: Decode,
@@ -197,13 +198,16 @@ where
generated_at_block: TargetHeaderIdOf<P>,
proof: P::MessagesReceivingProof,
) -> Result<(), SubstrateError> {
let tx = self
.lane
.make_messages_receiving_proof_transaction(generated_at_block, proof)
self.client
.submit_signed_extrinsic(self.lane.source_transactions_author(), move |transaction_nonce| {
self.lane
.make_messages_receiving_proof_transaction(transaction_nonce, generated_at_block, proof)
})
.await?;
self.client.submit_extrinsic(Bytes(tx.encode())).await?;
Ok(())
}
async fn activate_target_to_source_headers_relay(&self, _activate: bool) {}
}
pub async fn read_client_state<SelfChain, BridgedHeaderHash, BridgedHeaderNumber>(
@@ -90,6 +90,7 @@ where
C::Index: DeserializeOwned,
<C::Header as HeaderT>::Number: BlockNumberBase,
P: SubstrateMessageLane<
TargetChain = C,
MessagesReceivingProof = SubstrateMessagesReceivingProof<C>,
TargetHeaderNumber = <C::Header as HeaderT>::Number,
TargetHeaderHash = <C::Header as HeaderT>::Hash,
@@ -183,11 +184,18 @@ where
nonces: RangeInclusive<MessageNonce>,
proof: P::MessagesProof,
) -> Result<RangeInclusive<MessageNonce>, SubstrateError> {
let tx = self
.lane
.make_messages_delivery_transaction(generated_at_header, nonces.clone(), proof)
self.client
.submit_signed_extrinsic(self.lane.target_transactions_author(), |transaction_nonce| {
self.lane.make_messages_delivery_transaction(
transaction_nonce,
generated_at_header,
nonces.clone(),
proof,
)
})
.await?;
self.client.submit_extrinsic(Bytes(tx.encode())).await?;
Ok(nonces)
}
async fn activate_source_to_target_headers_relay(&self, _activate: bool) {}
}
@@ -19,36 +19,38 @@
use super::{MillauClient, RialtoClient};
use crate::finality_pipeline::{SubstrateFinalitySyncPipeline, SubstrateFinalityToSubstrate};
use async_trait::async_trait;
use codec::Encode;
use relay_millau_client::{Millau, SyncHeader as MillauSyncHeader};
use relay_rialto_client::{Rialto, SigningParams as RialtoSigningParams};
use relay_substrate_client::{finality_source::Justification, Error as SubstrateError, TransactionSignScheme};
use sp_core::Pair;
use relay_substrate_client::{finality_source::Justification, Chain, TransactionSignScheme};
use sp_core::{Bytes, Pair};
/// Millau-to-Rialto finality sync pipeline.
pub(crate) type MillauFinalityToRialto = SubstrateFinalityToSubstrate<Millau, Rialto, RialtoSigningParams>;
#[async_trait]
impl SubstrateFinalitySyncPipeline for MillauFinalityToRialto {
const BEST_FINALIZED_SOURCE_HEADER_ID_AT_TARGET: &'static str = bp_millau::BEST_FINALIZED_MILLAU_HEADER_METHOD;
type SignedTransaction = <Rialto as TransactionSignScheme>::SignedTransaction;
type TargetChain = Rialto;
async fn make_submit_finality_proof_transaction(
fn transactions_author(&self) -> bp_rialto::AccountId {
self.target_sign.signer.public().as_array_ref().clone().into()
}
fn make_submit_finality_proof_transaction(
&self,
transaction_nonce: <Rialto as Chain>::Index,
header: MillauSyncHeader,
proof: Justification<bp_millau::BlockNumber>,
) -> Result<Self::SignedTransaction, SubstrateError> {
let account_id = self.target_sign.signer.public().as_array_ref().clone().into();
let nonce = self.target_client.next_account_index(account_id).await?;
) -> Bytes {
let call =
rialto_runtime::BridgeGrandpaMillauCall::submit_finality_proof(header.into_inner(), proof.into_inner())
.into();
let genesis_hash = *self.target_client.genesis_hash();
let transaction = Rialto::sign_transaction(genesis_hash, &self.target_sign.signer, nonce, call);
let transaction = Rialto::sign_transaction(genesis_hash, &self.target_sign.signer, transaction_nonce, call);
Ok(transaction)
Bytes(transaction.encode())
}
}
@@ -21,7 +21,6 @@ use crate::messages_lane::{select_delivery_transaction_limits, SubstrateMessageL
use crate::messages_source::SubstrateMessagesSource;
use crate::messages_target::SubstrateMessagesTarget;
use async_trait::async_trait;
use bp_messages::{LaneId, MessageNonce};
use bp_runtime::{MILLAU_BRIDGE_INSTANCE, RIALTO_BRIDGE_INSTANCE};
use bridge_runtime_common::messages::target::FromBridgedChainMessagesProof;
@@ -30,15 +29,14 @@ use frame_support::dispatch::GetDispatchInfo;
use messages_relay::message_lane::MessageLane;
use relay_millau_client::{HeaderId as MillauHeaderId, Millau, SigningParams as MillauSigningParams};
use relay_rialto_client::{HeaderId as RialtoHeaderId, Rialto, SigningParams as RialtoSigningParams};
use relay_substrate_client::{Chain, Error as SubstrateError, TransactionSignScheme};
use relay_substrate_client::{Chain, TransactionSignScheme};
use relay_utils::metrics::MetricsParams;
use sp_core::Pair;
use sp_core::{Bytes, Pair};
use std::{ops::RangeInclusive, time::Duration};
/// Millau-to-Rialto message lane.
type MillauMessagesToRialto = SubstrateMessageLaneToSubstrate<Millau, MillauSigningParams, Rialto, RialtoSigningParams>;
#[async_trait]
impl SubstrateMessageLane for MillauMessagesToRialto {
const OUTBOUND_LANE_MESSAGES_DISPATCH_WEIGHT_METHOD: &'static str =
bp_rialto::TO_RIALTO_MESSAGES_DISPATCH_WEIGHT_METHOD;
@@ -54,22 +52,25 @@ impl SubstrateMessageLane for MillauMessagesToRialto {
const BEST_FINALIZED_SOURCE_HEADER_ID_AT_TARGET: &'static str = bp_millau::BEST_FINALIZED_MILLAU_HEADER_METHOD;
const BEST_FINALIZED_TARGET_HEADER_ID_AT_SOURCE: &'static str = bp_rialto::BEST_FINALIZED_RIALTO_HEADER_METHOD;
type SourceSignedTransaction = <Millau as TransactionSignScheme>::SignedTransaction;
type TargetSignedTransaction = <Rialto as TransactionSignScheme>::SignedTransaction;
type SourceChain = Millau;
type TargetChain = Rialto;
async fn make_messages_receiving_proof_transaction(
fn source_transactions_author(&self) -> bp_rialto::AccountId {
self.source_sign.signer.public().as_array_ref().clone().into()
}
fn make_messages_receiving_proof_transaction(
&self,
transaction_nonce: <Millau as Chain>::Index,
_generated_at_block: RialtoHeaderId,
proof: <Self as MessageLane>::MessagesReceivingProof,
) -> Result<Self::SourceSignedTransaction, SubstrateError> {
) -> Bytes {
let (relayers_state, proof) = proof;
let account_id = self.source_sign.signer.public().as_array_ref().clone().into();
let nonce = self.source_client.next_account_index(account_id).await?;
let call: millau_runtime::Call =
millau_runtime::MessagesCall::receive_messages_delivery_proof(proof, relayers_state).into();
let call_weight = call.get_dispatch_info().weight;
let genesis_hash = *self.source_client.genesis_hash();
let transaction = Millau::sign_transaction(genesis_hash, &self.source_sign.signer, nonce, call);
let transaction = Millau::sign_transaction(genesis_hash, &self.source_sign.signer, transaction_nonce, call);
log::trace!(
target: "bridge",
"Prepared Rialto -> Millau confirmation transaction. Weight: {}/{}, size: {}/{}",
@@ -78,15 +79,20 @@ impl SubstrateMessageLane for MillauMessagesToRialto {
transaction.encode().len(),
bp_millau::max_extrinsic_size(),
);
Ok(transaction)
Bytes(transaction.encode())
}
async fn make_messages_delivery_transaction(
fn target_transactions_author(&self) -> bp_rialto::AccountId {
self.target_sign.signer.public().as_array_ref().clone().into()
}
fn make_messages_delivery_transaction(
&self,
transaction_nonce: <Rialto as Chain>::Index,
_generated_at_header: MillauHeaderId,
_nonces: RangeInclusive<MessageNonce>,
proof: <Self as MessageLane>::MessagesProof,
) -> Result<Self::TargetSignedTransaction, SubstrateError> {
) -> Bytes {
let (dispatch_weight, proof) = proof;
let FromBridgedChainMessagesProof {
ref nonces_start,
@@ -94,8 +100,6 @@ impl SubstrateMessageLane for MillauMessagesToRialto {
..
} = proof;
let messages_count = nonces_end - nonces_start + 1;
let account_id = self.target_sign.signer.public().as_array_ref().clone().into();
let nonce = self.target_client.next_account_index(account_id).await?;
let call: rialto_runtime::Call = rialto_runtime::MessagesCall::receive_messages_proof(
self.relayer_id_at_source.clone(),
proof,
@@ -105,7 +109,7 @@ impl SubstrateMessageLane for MillauMessagesToRialto {
.into();
let call_weight = call.get_dispatch_info().weight;
let genesis_hash = *self.target_client.genesis_hash();
let transaction = Rialto::sign_transaction(genesis_hash, &self.target_sign.signer, nonce, call);
let transaction = Rialto::sign_transaction(genesis_hash, &self.target_sign.signer, transaction_nonce, call);
log::trace!(
target: "bridge",
"Prepared Millau -> Rialto delivery transaction. Weight: {}/{}, size: {}/{}",
@@ -114,7 +118,7 @@ impl SubstrateMessageLane for MillauMessagesToRialto {
transaction.encode().len(),
bp_rialto::max_extrinsic_size(),
);
Ok(transaction)
Bytes(transaction.encode())
}
}
@@ -53,24 +53,25 @@ async fn run_init_bridge(command: cli::InitBridge) -> Result<(), String> {
let rialto_client = rialto.into_client().await?;
let rialto_sign = rialto_sign.parse()?;
let rialto_signer_next_index = rialto_client
.next_account_index(rialto_sign.signer.public().into())
.await?;
crate::headers_initialize::initialize(millau_client, rialto_client.clone(), move |initialization_data| {
Ok(Bytes(
Rialto::sign_transaction(
*rialto_client.genesis_hash(),
&rialto_sign.signer,
rialto_signer_next_index,
rialto_runtime::SudoCall::sudo(Box::new(
rialto_runtime::BridgeGrandpaMillauCall::initialize(initialization_data).into(),
))
.into(),
crate::headers_initialize::initialize(
millau_client,
rialto_client.clone(),
rialto_sign.signer.public().into(),
move |transaction_nonce, initialization_data| {
Bytes(
Rialto::sign_transaction(
*rialto_client.genesis_hash(),
&rialto_sign.signer,
transaction_nonce,
rialto_runtime::SudoCall::sudo(Box::new(
rialto_runtime::BridgeGrandpaMillauCall::initialize(initialization_data).into(),
))
.into(),
)
.encode(),
)
.encode(),
))
})
},
)
.await;
}
cli::InitBridge::RialtoToMillau {
@@ -81,26 +82,28 @@ async fn run_init_bridge(command: cli::InitBridge) -> Result<(), String> {
let rialto_client = rialto.into_client().await?;
let millau_client = millau.into_client().await?;
let millau_sign = millau_sign.parse()?;
let millau_signer_next_index = millau_client
.next_account_index(millau_sign.signer.public().into())
.await?;
crate::headers_initialize::initialize(rialto_client, millau_client.clone(), move |initialization_data| {
let initialize_call = millau_runtime::BridgeGrandpaRialtoCall::<
millau_runtime::Runtime,
millau_runtime::RialtoGrandpaInstance,
>::initialize(initialization_data);
crate::headers_initialize::initialize(
rialto_client,
millau_client.clone(),
millau_sign.signer.public().into(),
move |transaction_nonce, initialization_data| {
let initialize_call = millau_runtime::BridgeGrandpaRialtoCall::<
millau_runtime::Runtime,
millau_runtime::RialtoGrandpaInstance,
>::initialize(initialization_data);
Ok(Bytes(
Millau::sign_transaction(
*millau_client.genesis_hash(),
&millau_sign.signer,
millau_signer_next_index,
millau_runtime::SudoCall::sudo(Box::new(initialize_call.into())).into(),
Bytes(
Millau::sign_transaction(
*millau_client.genesis_hash(),
&millau_sign.signer,
transaction_nonce,
millau_runtime::SudoCall::sudo(Box::new(initialize_call.into())).into(),
)
.encode(),
)
.encode(),
))
})
},
)
.await;
}
cli::InitBridge::WestendToMillau {
@@ -111,29 +114,31 @@ async fn run_init_bridge(command: cli::InitBridge) -> Result<(), String> {
let westend_client = westend.into_client().await?;
let millau_client = millau.into_client().await?;
let millau_sign = millau_sign.parse()?;
let millau_signer_next_index = millau_client
.next_account_index(millau_sign.signer.public().into())
.await?;
// at Westend -> Millau initialization we're not using sudo, because otherwise our deployments
// may fail, because we need to initialize both Rialto -> Millau and Westend -> Millau bridge.
// => since there's single possible sudo account, one of transaction may fail with duplicate nonce error
crate::headers_initialize::initialize(westend_client, millau_client.clone(), move |initialization_data| {
let initialize_call = millau_runtime::BridgeGrandpaWestendCall::<
millau_runtime::Runtime,
millau_runtime::WestendGrandpaInstance,
>::initialize(initialization_data);
crate::headers_initialize::initialize(
westend_client,
millau_client.clone(),
millau_sign.signer.public().into(),
move |transaction_nonce, initialization_data| {
let initialize_call = millau_runtime::BridgeGrandpaWestendCall::<
millau_runtime::Runtime,
millau_runtime::WestendGrandpaInstance,
>::initialize(initialization_data);
Ok(Bytes(
Millau::sign_transaction(
*millau_client.genesis_hash(),
&millau_sign.signer,
millau_signer_next_index,
initialize_call.into(),
Bytes(
Millau::sign_transaction(
*millau_client.genesis_hash(),
&millau_sign.signer,
transaction_nonce,
initialize_call.into(),
)
.encode(),
)
.encode(),
))
})
},
)
.await;
}
}
@@ -262,30 +267,32 @@ async fn run_send_message(command: cli::SendMessage) -> Result<(), String> {
})
.await?;
let millau_call = millau_runtime::Call::BridgeRialtoMessages(millau_runtime::MessagesCall::send_message(
lane, payload, fee,
));
millau_client
.submit_signed_extrinsic(millau_sign.signer.public().clone().into(), |transaction_nonce| {
let millau_call = millau_runtime::Call::BridgeRialtoMessages(
millau_runtime::MessagesCall::send_message(lane, payload, fee),
);
let signed_millau_call = Millau::sign_transaction(
*millau_client.genesis_hash(),
&millau_sign.signer,
millau_client
.next_account_index(millau_sign.signer.public().clone().into())
.await?,
millau_call,
)
.encode();
let signed_millau_call = Millau::sign_transaction(
*millau_client.genesis_hash(),
&millau_sign.signer,
transaction_nonce,
millau_call,
)
.encode();
log::info!(
target: "bridge",
"Sending message to Rialto. Size: {}. Dispatch weight: {}. Fee: {}",
signed_millau_call.len(),
dispatch_weight,
fee,
);
log::info!(target: "bridge", "Signed Millau Call: {:?}", HexBytes::encode(&signed_millau_call));
log::info!(
target: "bridge",
"Sending message to Rialto. Size: {}. Dispatch weight: {}. Fee: {}",
signed_millau_call.len(),
dispatch_weight,
fee,
);
log::info!(target: "bridge", "Signed Millau Call: {:?}", HexBytes::encode(&signed_millau_call));
millau_client.submit_extrinsic(Bytes(signed_millau_call)).await?;
Bytes(signed_millau_call)
})
.await?;
}
cli::SendMessage::RialtoToMillau {
rialto,
@@ -318,30 +325,32 @@ async fn run_send_message(command: cli::SendMessage) -> Result<(), String> {
})
.await?;
let rialto_call = rialto_runtime::Call::BridgeMillauMessages(rialto_runtime::MessagesCall::send_message(
lane, payload, fee,
));
rialto_client
.submit_signed_extrinsic(rialto_sign.signer.public().clone().into(), |transaction_nonce| {
let rialto_call = rialto_runtime::Call::BridgeMillauMessages(
rialto_runtime::MessagesCall::send_message(lane, payload, fee),
);
let signed_rialto_call = Rialto::sign_transaction(
*rialto_client.genesis_hash(),
&rialto_sign.signer,
rialto_client
.next_account_index(rialto_sign.signer.public().clone().into())
.await?,
rialto_call,
)
.encode();
let signed_rialto_call = Rialto::sign_transaction(
*rialto_client.genesis_hash(),
&rialto_sign.signer,
transaction_nonce,
rialto_call,
)
.encode();
log::info!(
target: "bridge",
"Sending message to Millau. Size: {}. Dispatch weight: {}. Fee: {}",
signed_rialto_call.len(),
dispatch_weight,
fee,
);
log::info!(target: "bridge", "Signed Rialto Call: {:?}", HexBytes::encode(&signed_rialto_call));
log::info!(
target: "bridge",
"Sending message to Millau. Size: {}. Dispatch weight: {}. Fee: {}",
signed_rialto_call.len(),
dispatch_weight,
fee,
);
log::info!(target: "bridge", "Signed Rialto Call: {:?}", HexBytes::encode(&signed_rialto_call));
rialto_client.submit_extrinsic(Bytes(signed_rialto_call)).await?;
Bytes(signed_rialto_call)
})
.await?;
}
}
Ok(())
@@ -19,29 +19,30 @@
use super::{MillauClient, RialtoClient};
use crate::finality_pipeline::{SubstrateFinalitySyncPipeline, SubstrateFinalityToSubstrate};
use async_trait::async_trait;
use codec::Encode;
use relay_millau_client::{Millau, SigningParams as MillauSigningParams};
use relay_rialto_client::{Rialto, SyncHeader as RialtoSyncHeader};
use relay_substrate_client::{finality_source::Justification, Error as SubstrateError, TransactionSignScheme};
use sp_core::Pair;
use relay_substrate_client::{finality_source::Justification, Chain, TransactionSignScheme};
use sp_core::{Bytes, Pair};
/// Rialto-to-Millau finality sync pipeline.
pub(crate) type RialtoFinalityToMillau = SubstrateFinalityToSubstrate<Rialto, Millau, MillauSigningParams>;
#[async_trait]
impl SubstrateFinalitySyncPipeline for RialtoFinalityToMillau {
const BEST_FINALIZED_SOURCE_HEADER_ID_AT_TARGET: &'static str = bp_rialto::BEST_FINALIZED_RIALTO_HEADER_METHOD;
type SignedTransaction = <Millau as TransactionSignScheme>::SignedTransaction;
type TargetChain = Millau;
async fn make_submit_finality_proof_transaction(
fn transactions_author(&self) -> bp_millau::AccountId {
self.target_sign.signer.public().as_array_ref().clone().into()
}
fn make_submit_finality_proof_transaction(
&self,
transaction_nonce: <Millau as Chain>::Index,
header: RialtoSyncHeader,
proof: Justification<bp_rialto::BlockNumber>,
) -> Result<Self::SignedTransaction, SubstrateError> {
let account_id = self.target_sign.signer.public().as_array_ref().clone().into();
let nonce = self.target_client.next_account_index(account_id).await?;
) -> Bytes {
let call = millau_runtime::BridgeGrandpaRialtoCall::<
millau_runtime::Runtime,
millau_runtime::RialtoGrandpaInstance,
@@ -49,9 +50,9 @@ impl SubstrateFinalitySyncPipeline for RialtoFinalityToMillau {
.into();
let genesis_hash = *self.target_client.genesis_hash();
let transaction = Millau::sign_transaction(genesis_hash, &self.target_sign.signer, nonce, call);
let transaction = Millau::sign_transaction(genesis_hash, &self.target_sign.signer, transaction_nonce, call);
Ok(transaction)
Bytes(transaction.encode())
}
}
@@ -21,7 +21,6 @@ use crate::messages_lane::{select_delivery_transaction_limits, SubstrateMessageL
use crate::messages_source::SubstrateMessagesSource;
use crate::messages_target::SubstrateMessagesTarget;
use async_trait::async_trait;
use bp_messages::{LaneId, MessageNonce};
use bp_runtime::{MILLAU_BRIDGE_INSTANCE, RIALTO_BRIDGE_INSTANCE};
use bridge_runtime_common::messages::target::FromBridgedChainMessagesProof;
@@ -30,15 +29,14 @@ use frame_support::dispatch::GetDispatchInfo;
use messages_relay::message_lane::MessageLane;
use relay_millau_client::{HeaderId as MillauHeaderId, Millau, SigningParams as MillauSigningParams};
use relay_rialto_client::{HeaderId as RialtoHeaderId, Rialto, SigningParams as RialtoSigningParams};
use relay_substrate_client::{Chain, Error as SubstrateError, TransactionSignScheme};
use relay_substrate_client::{Chain, TransactionSignScheme};
use relay_utils::metrics::MetricsParams;
use sp_core::Pair;
use sp_core::{Bytes, Pair};
use std::{ops::RangeInclusive, time::Duration};
/// Rialto-to-Millau message lane.
type RialtoMessagesToMillau = SubstrateMessageLaneToSubstrate<Rialto, RialtoSigningParams, Millau, MillauSigningParams>;
#[async_trait]
impl SubstrateMessageLane for RialtoMessagesToMillau {
const OUTBOUND_LANE_MESSAGES_DISPATCH_WEIGHT_METHOD: &'static str =
bp_millau::TO_MILLAU_MESSAGES_DISPATCH_WEIGHT_METHOD;
@@ -54,22 +52,25 @@ impl SubstrateMessageLane for RialtoMessagesToMillau {
const BEST_FINALIZED_SOURCE_HEADER_ID_AT_TARGET: &'static str = bp_rialto::BEST_FINALIZED_RIALTO_HEADER_METHOD;
const BEST_FINALIZED_TARGET_HEADER_ID_AT_SOURCE: &'static str = bp_millau::BEST_FINALIZED_MILLAU_HEADER_METHOD;
type SourceSignedTransaction = <Rialto as TransactionSignScheme>::SignedTransaction;
type TargetSignedTransaction = <Millau as TransactionSignScheme>::SignedTransaction;
type SourceChain = Rialto;
type TargetChain = Millau;
async fn make_messages_receiving_proof_transaction(
fn source_transactions_author(&self) -> bp_rialto::AccountId {
self.source_sign.signer.public().as_array_ref().clone().into()
}
fn make_messages_receiving_proof_transaction(
&self,
transaction_nonce: <Rialto as Chain>::Index,
_generated_at_block: MillauHeaderId,
proof: <Self as MessageLane>::MessagesReceivingProof,
) -> Result<Self::SourceSignedTransaction, SubstrateError> {
) -> Bytes {
let (relayers_state, proof) = proof;
let account_id = self.source_sign.signer.public().as_array_ref().clone().into();
let nonce = self.source_client.next_account_index(account_id).await?;
let call: rialto_runtime::Call =
rialto_runtime::MessagesCall::receive_messages_delivery_proof(proof, relayers_state).into();
let call_weight = call.get_dispatch_info().weight;
let genesis_hash = *self.source_client.genesis_hash();
let transaction = Rialto::sign_transaction(genesis_hash, &self.source_sign.signer, nonce, call);
let transaction = Rialto::sign_transaction(genesis_hash, &self.source_sign.signer, transaction_nonce, call);
log::trace!(
target: "bridge",
"Prepared Millau -> Rialto confirmation transaction. Weight: {}/{}, size: {}/{}",
@@ -78,15 +79,20 @@ impl SubstrateMessageLane for RialtoMessagesToMillau {
transaction.encode().len(),
bp_rialto::max_extrinsic_size(),
);
Ok(transaction)
Bytes(transaction.encode())
}
async fn make_messages_delivery_transaction(
fn target_transactions_author(&self) -> bp_rialto::AccountId {
self.target_sign.signer.public().as_array_ref().clone().into()
}
fn make_messages_delivery_transaction(
&self,
transaction_nonce: <Millau as Chain>::Index,
_generated_at_header: RialtoHeaderId,
_nonces: RangeInclusive<MessageNonce>,
proof: <Self as MessageLane>::MessagesProof,
) -> Result<Self::TargetSignedTransaction, SubstrateError> {
) -> Bytes {
let (dispatch_weight, proof) = proof;
let FromBridgedChainMessagesProof {
ref nonces_start,
@@ -94,8 +100,6 @@ impl SubstrateMessageLane for RialtoMessagesToMillau {
..
} = proof;
let messages_count = nonces_end - nonces_start + 1;
let account_id = self.target_sign.signer.public().as_array_ref().clone().into();
let nonce = self.target_client.next_account_index(account_id).await?;
let call: millau_runtime::Call = millau_runtime::MessagesCall::receive_messages_proof(
self.relayer_id_at_source.clone(),
proof,
@@ -105,7 +109,7 @@ impl SubstrateMessageLane for RialtoMessagesToMillau {
.into();
let call_weight = call.get_dispatch_info().weight;
let genesis_hash = *self.target_client.genesis_hash();
let transaction = Millau::sign_transaction(genesis_hash, &self.target_sign.signer, nonce, call);
let transaction = Millau::sign_transaction(genesis_hash, &self.target_sign.signer, transaction_nonce, call);
log::trace!(
target: "bridge",
"Prepared Rialto -> Millau delivery transaction. Weight: {}/{}, size: {}/{}",
@@ -114,7 +118,7 @@ impl SubstrateMessageLane for RialtoMessagesToMillau {
transaction.encode().len(),
bp_millau::max_extrinsic_size(),
);
Ok(transaction)
Bytes(transaction.encode())
}
}
@@ -19,29 +19,30 @@
use super::{MillauClient, WestendClient};
use crate::finality_pipeline::{SubstrateFinalitySyncPipeline, SubstrateFinalityToSubstrate};
use async_trait::async_trait;
use codec::Encode;
use relay_millau_client::{Millau, SigningParams as MillauSigningParams};
use relay_substrate_client::{finality_source::Justification, Error as SubstrateError, TransactionSignScheme};
use relay_substrate_client::{finality_source::Justification, Chain, TransactionSignScheme};
use relay_westend_client::{SyncHeader as WestendSyncHeader, Westend};
use sp_core::Pair;
use sp_core::{Bytes, Pair};
/// Westend-to-Millau finality sync pipeline.
pub(crate) type WestendFinalityToMillau = SubstrateFinalityToSubstrate<Westend, Millau, MillauSigningParams>;
#[async_trait]
impl SubstrateFinalitySyncPipeline for WestendFinalityToMillau {
const BEST_FINALIZED_SOURCE_HEADER_ID_AT_TARGET: &'static str = bp_westend::BEST_FINALIZED_WESTEND_HEADER_METHOD;
type SignedTransaction = <Millau as TransactionSignScheme>::SignedTransaction;
type TargetChain = Millau;
async fn make_submit_finality_proof_transaction(
fn transactions_author(&self) -> bp_millau::AccountId {
self.target_sign.signer.public().as_array_ref().clone().into()
}
fn make_submit_finality_proof_transaction(
&self,
transaction_nonce: <Millau as Chain>::Index,
header: WestendSyncHeader,
proof: Justification<bp_westend::BlockNumber>,
) -> Result<Self::SignedTransaction, SubstrateError> {
let account_id = self.target_sign.signer.public().as_array_ref().clone().into();
let nonce = self.target_client.next_account_index(account_id).await?;
) -> Bytes {
let call = millau_runtime::BridgeGrandpaWestendCall::<
millau_runtime::Runtime,
millau_runtime::WestendGrandpaInstance,
@@ -49,9 +50,9 @@ impl SubstrateFinalitySyncPipeline for WestendFinalityToMillau {
.into();
let genesis_hash = *self.target_client.genesis_hash();
let transaction = Millau::sign_transaction(genesis_hash, &self.target_sign.signer, nonce, call);
let transaction = Millau::sign_transaction(genesis_hash, &self.target_sign.signer, transaction_nonce, call);
Ok(transaction)
Bytes(transaction.encode())
}
}
+30 -3
View File
@@ -20,6 +20,7 @@ use crate::chain::{Chain, ChainWithBalances};
use crate::rpc::{Substrate, SubstrateMessages};
use crate::{ConnectionParams, Error, Result};
use async_std::sync::{Arc, Mutex};
use bp_messages::{LaneId, MessageNonce};
use bp_runtime::InstanceId;
use codec::Decode;
@@ -52,6 +53,10 @@ pub struct Client<C: Chain> {
client: RpcClient,
/// Genesis block hash.
genesis_hash: C::Hash,
/// If several tasks are submitting their transactions simultaneously using `submit_signed_extrinsic`
/// method, they may get the same transaction nonce. So one of transactions will be rejected
/// from the pool. This lock is here to prevent situations like that.
submit_signed_extrinsic_lock: Arc<Mutex<()>>,
}
impl<C: Chain> Clone for Client<C> {
@@ -60,6 +65,7 @@ impl<C: Chain> Clone for Client<C> {
params: self.params.clone(),
client: self.client.clone(),
genesis_hash: self.genesis_hash,
submit_signed_extrinsic_lock: self.submit_signed_extrinsic_lock.clone(),
}
}
}
@@ -84,6 +90,7 @@ impl<C: Chain> Client<C> {
params,
client,
genesis_hash,
submit_signed_extrinsic_lock: Arc::new(Mutex::new(())),
})
}
@@ -192,15 +199,35 @@ impl<C: Chain> Client<C> {
Ok(Substrate::<C>::system_account_next_index(&self.client, account).await?)
}
/// Submit an extrinsic for inclusion in a block.
/// Submit unsigned extrinsic for inclusion in a block.
///
/// Note: The given transaction does not need be SCALE encoded beforehand.
pub async fn submit_extrinsic(&self, transaction: Bytes) -> Result<C::Hash> {
/// Note: The given transaction needs to be SCALE encoded beforehand.
pub async fn submit_unsigned_extrinsic(&self, transaction: Bytes) -> Result<C::Hash> {
let tx_hash = Substrate::<C>::author_submit_extrinsic(&self.client, transaction).await?;
log::trace!(target: "bridge", "Sent transaction to Substrate node: {:?}", tx_hash);
Ok(tx_hash)
}
/// Submit an extrinsic signed by given account.
///
/// All calls of this method are synchronized, so there can't be more than one active
/// `submit_signed_extrinsic()` call. This guarantees that no nonces collision may happen
/// if all client instances are clones of the same initial `Client`.
///
/// Note: The given transaction needs to be SCALE encoded beforehand.
pub async fn submit_signed_extrinsic(
&self,
extrinsic_signer: C::AccountId,
prepare_extrinsic: impl FnOnce(C::Index) -> Bytes,
) -> Result<C::Hash> {
let _guard = self.submit_signed_extrinsic_lock.lock().await;
let transaction_nonce = self.next_account_index(extrinsic_signer).await?;
let extrinsic = prepare_extrinsic(transaction_nonce);
let tx_hash = Substrate::<C>::author_submit_extrinsic(&self.client, extrinsic).await?;
log::trace!(target: "bridge", "Sent transaction to {} node: {:?}", C::NAME, tx_hash);
Ok(tx_hash)
}
/// Get the GRANDPA authority set at given block.
pub async fn grandpa_authorities_set(&self, block: C::Hash) -> Result<OpaqueGrandpaAuthoritiesSet> {
let call = SUB_API_GRANDPA_AUTHORITIES.to_string();
@@ -139,6 +139,9 @@ pub trait SourceClient<P: MessageLane>: RelayClient {
generated_at_block: TargetHeaderIdOf<P>,
proof: P::MessagesReceivingProof,
) -> Result<(), Self::Error>;
/// Activate (or deactivate) headers relay that relays target headers to source node.
async fn activate_target_to_source_headers_relay(&self, activate: bool);
}
/// Target client trait.
@@ -177,6 +180,9 @@ pub trait TargetClient<P: MessageLane>: RelayClient {
nonces: RangeInclusive<MessageNonce>,
proof: P::MessagesProof,
) -> Result<RangeInclusive<MessageNonce>, Self::Error>;
/// Activate (or deactivate) headers relay that relays source headers to target node.
async fn activate_source_to_target_headers_relay(&self, activate: bool);
}
/// State of the client.
@@ -463,6 +469,8 @@ pub(crate) mod tests {
target_latest_received_nonce: MessageNonce,
target_latest_confirmed_received_nonce: MessageNonce,
submitted_messages_proofs: Vec<TestMessagesProof>,
is_target_to_source_headers_relay_activated: bool,
is_source_to_target_headers_relay_activated: bool,
}
#[derive(Clone)]
@@ -567,6 +575,12 @@ pub(crate) mod tests {
data.source_latest_confirmed_received_nonce = proof;
Ok(())
}
async fn activate_target_to_source_headers_relay(&self, activate: bool) {
let mut data = self.data.lock();
data.is_target_to_source_headers_relay_activated = activate;
(self.tick)(&mut *data);
}
}
#[derive(Clone)]
@@ -665,6 +679,12 @@ pub(crate) mod tests {
data.submitted_messages_proofs.push(proof);
Ok(nonces)
}
async fn activate_source_to_target_headers_relay(&self, activate: bool) {
let mut data = self.data.lock();
data.is_source_to_target_headers_relay_activated = activate;
(self.tick)(&mut *data);
}
}
fn run_loop_test(
@@ -778,8 +798,19 @@ pub(crate) mod tests {
target_latest_received_nonce: 0,
..Default::default()
},
Arc::new(|_: &mut TestClientData| {}),
Arc::new(|data: &mut TestClientData| {
// headers relay must only be started when we need new target headers at source node
if data.is_target_to_source_headers_relay_activated {
assert!(data.source_state.best_finalized_peer_at_best_self.0 < data.target_state.best_self.0);
data.is_target_to_source_headers_relay_activated = false;
}
}),
Arc::new(move |data: &mut TestClientData| {
// headers relay must only be started when we need new source headers at target node
if data.is_target_to_source_headers_relay_activated {
assert!(data.target_state.best_finalized_peer_at_best_self.0 < data.source_state.best_self.0);
data.is_target_to_source_headers_relay_activated = false;
}
// syncing source headers -> target chain (all at once)
if data.target_state.best_finalized_peer_at_best_self.0 < data.source_state.best_finalized_self.0 {
data.target_state.best_finalized_peer_at_best_self = data.source_state.best_finalized_self;
@@ -166,6 +166,10 @@ where
type Error = C::Error;
type TargetNoncesData = DeliveryRaceTargetNoncesData;
async fn require_more_source_headers(&self, activate: bool) {
self.client.activate_source_to_target_headers_relay(activate).await
}
async fn nonces(
&self,
at_block: TargetHeaderIdOf<P>,
@@ -123,6 +123,9 @@ pub trait TargetClient<P: MessageRace> {
/// Type of the additional data from the target client, used by the race.
type TargetNoncesData: std::fmt::Debug;
/// Ask headers relay to relay more headers from race source to race target.
async fn require_more_source_headers(&self, activate: bool);
/// Return nonces that are known to the target client.
async fn nonces(
&self,
@@ -216,6 +219,7 @@ pub async fn run<P: MessageRace, SC: SourceClient<P>, TC: TargetClient<P>>(
TargetNoncesData = TC::TargetNoncesData,
>,
) -> Result<(), FailedClient> {
let mut is_strategy_empty = true;
let mut progress_context = Instant::now();
let mut race_state = RaceState::default();
let mut stall_countdown = Instant::now();
@@ -404,6 +408,13 @@ pub async fn run<P: MessageRace, SC: SourceClient<P>, TC: TargetClient<P>>(
progress_context = print_race_progress::<P, _>(progress_context, &strategy);
// ask for more headers if we have nonces to deliver
let prev_is_strategy_empty = is_strategy_empty;
is_strategy_empty = strategy.is_empty();
if is_strategy_empty != prev_is_strategy_empty {
race_target.require_more_source_headers(!is_strategy_empty).await;
}
if stall_countdown.elapsed() > stall_timeout {
log::warn!(
target: "bridge",
@@ -159,6 +159,10 @@ where
type Error = C::Error;
type TargetNoncesData = ();
async fn require_more_source_headers(&self, activate: bool) {
self.client.activate_target_to_source_headers_relay(activate).await
}
async fn nonces(
&self,
at_block: SourceHeaderIdOf<P>,