Merge commit 'f9c24ef0db390c355241445af37a5c7999a7dc66' into hc-bump-bridges-subtree-take-2

This commit is contained in:
Hernando Castano
2021-05-04 15:27:09 -04:00
82 changed files with 2056 additions and 816 deletions
@@ -23,10 +23,12 @@ structopt = "0.3"
bp-header-chain = { path = "../../primitives/header-chain" }
bp-kusama = { path = "../../primitives/chain-kusama" }
bp-messages = { path = "../../primitives/messages" }
bp-message-dispatch = { path = "../../primitives/message-dispatch" }
bp-millau = { path = "../../primitives/chain-millau" }
bp-polkadot = { path = "../../primitives/chain-polkadot" }
bp-rialto = { path = "../../primitives/chain-rialto" }
bp-rococo = { path = "../../primitives/chain-rococo" }
bp-wococo = { path = "../../primitives/chain-wococo" }
bp-runtime = { path = "../../primitives/runtime" }
bp-westend = { path = "../../primitives/chain-westend" }
bridge-runtime-common = { path = "../../bin/runtime-common" }
@@ -35,13 +37,13 @@ finality-relay = { path = "../finality" }
headers-relay = { path = "../headers" }
messages-relay = { path = "../messages" }
millau-runtime = { path = "../../bin/millau/runtime" }
pallet-bridge-dispatch = { path = "../../modules/dispatch" }
pallet-bridge-messages = { path = "../../modules/messages" }
relay-kusama-client = { path = "../client-kusama" }
relay-millau-client = { path = "../client-millau" }
relay-polkadot-client = { path = "../client-polkadot" }
relay-rialto-client = { path = "../client-rialto" }
relay-rococo-client = { path = "../client-rococo" }
relay-wococo-client = { path = "../client-wococo" }
relay-substrate-client = { path = "../client-substrate" }
relay-utils = { path = "../utils" }
relay-westend-client = { path = "../client-westend" }
@@ -21,9 +21,9 @@ use crate::cli::{
encode_call::{self, Call, CliEncodeCall},
encode_message, send_message, CliChain,
};
use bp_message_dispatch::{CallOrigin, MessagePayload};
use codec::Decode;
use frame_support::weights::{GetDispatchInfo, Weight};
use pallet_bridge_dispatch::{CallOrigin, MessagePayload};
use relay_millau_client::Millau;
use sp_version::RuntimeVersion;
@@ -20,14 +20,15 @@ pub mod millau_headers_to_rialto;
pub mod millau_messages_to_rialto;
pub mod rialto_headers_to_millau;
pub mod rialto_messages_to_millau;
pub mod rococo_headers_to_westend;
pub mod rococo_headers_to_wococo;
pub mod westend_headers_to_millau;
pub mod westend_headers_to_rococo;
pub mod wococo_headers_to_rococo;
mod millau;
mod rialto;
mod rococo;
mod westend;
mod wococo;
use relay_utils::metrics::{FloatJsonValueMetric, MetricsParams};
@@ -131,7 +132,7 @@ mod tests {
let payload = send_message::message_payload(
Default::default(),
call.get_dispatch_info().weight,
pallet_bridge_dispatch::CallOrigin::SourceRoot,
bp_message_dispatch::CallOrigin::SourceRoot,
&call,
);
assert_eq!(Millau::verify_message(&payload), Ok(()));
@@ -141,7 +142,7 @@ mod tests {
let payload = send_message::message_payload(
Default::default(),
call.get_dispatch_info().weight,
pallet_bridge_dispatch::CallOrigin::SourceRoot,
bp_message_dispatch::CallOrigin::SourceRoot,
&call,
);
assert!(Millau::verify_message(&payload).is_err());
@@ -168,7 +169,7 @@ mod tests {
let payload = send_message::message_payload(
Default::default(),
maximal_dispatch_weight,
pallet_bridge_dispatch::CallOrigin::SourceRoot,
bp_message_dispatch::CallOrigin::SourceRoot,
&call,
);
assert_eq!(Millau::verify_message(&payload), Ok(()));
@@ -176,7 +177,7 @@ mod tests {
let payload = send_message::message_payload(
Default::default(),
maximal_dispatch_weight + 1,
pallet_bridge_dispatch::CallOrigin::SourceRoot,
bp_message_dispatch::CallOrigin::SourceRoot,
&call,
);
assert!(Millau::verify_message(&payload).is_err());
@@ -193,7 +194,7 @@ mod tests {
let payload = send_message::message_payload(
Default::default(),
maximal_dispatch_weight,
pallet_bridge_dispatch::CallOrigin::SourceRoot,
bp_message_dispatch::CallOrigin::SourceRoot,
&call,
);
assert_eq!(Rialto::verify_message(&payload), Ok(()));
@@ -201,7 +202,7 @@ mod tests {
let payload = send_message::message_payload(
Default::default(),
maximal_dispatch_weight + 1,
pallet_bridge_dispatch::CallOrigin::SourceRoot,
bp_message_dispatch::CallOrigin::SourceRoot,
&call,
);
assert!(Rialto::verify_message(&payload).is_err());
@@ -270,7 +271,7 @@ mod rococo_tests {
votes_ancestries: vec![],
};
let actual = bp_rococo::BridgeGrandpaWestendCall::submit_finality_proof(header.clone(), justification.clone());
let actual = bp_rococo::BridgeGrandpaWococoCall::submit_finality_proof(header.clone(), justification.clone());
let expected = millau_runtime::BridgeGrandpaRialtoCall::<millau_runtime::Runtime>::submit_finality_proof(
header,
justification,
@@ -21,9 +21,9 @@ use crate::cli::{
encode_call::{self, Call, CliEncodeCall},
encode_message, send_message, CliChain,
};
use bp_message_dispatch::{CallOrigin, MessagePayload};
use codec::Decode;
use frame_support::weights::{GetDispatchInfo, Weight};
use pallet_bridge_dispatch::{CallOrigin, MessagePayload};
use relay_rialto_client::Rialto;
use sp_version::RuntimeVersion;
@@ -14,7 +14,7 @@
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
//! Rococo-to-Westend headers sync entrypoint.
//! Rococo-to-Wococo headers sync entrypoint.
use crate::finality_pipeline::{SubstrateFinalitySyncPipeline, SubstrateFinalityToSubstrate};
@@ -23,37 +23,37 @@ use codec::Encode;
use relay_rococo_client::{Rococo, SyncHeader as RococoSyncHeader};
use relay_substrate_client::{Chain, TransactionSignScheme};
use relay_utils::metrics::MetricsParams;
use relay_westend_client::{SigningParams as WestendSigningParams, Westend};
use relay_wococo_client::{SigningParams as WococoSigningParams, Wococo};
use sp_core::{Bytes, Pair};
/// Rococo-to-Westend finality sync pipeline.
pub(crate) type RococoFinalityToWestend = SubstrateFinalityToSubstrate<Rococo, Westend, WestendSigningParams>;
/// Rococo-to-Wococo finality sync pipeline.
pub(crate) type RococoFinalityToWococo = SubstrateFinalityToSubstrate<Rococo, Wococo, WococoSigningParams>;
impl SubstrateFinalitySyncPipeline for RococoFinalityToWestend {
impl SubstrateFinalitySyncPipeline for RococoFinalityToWococo {
const BEST_FINALIZED_SOURCE_HEADER_ID_AT_TARGET: &'static str = bp_rococo::BEST_FINALIZED_ROCOCO_HEADER_METHOD;
type TargetChain = Westend;
type TargetChain = Wococo;
fn customize_metrics(params: MetricsParams) -> anyhow::Result<MetricsParams> {
crate::chains::add_polkadot_kusama_price_metrics::<Self>(params)
}
fn transactions_author(&self) -> bp_westend::AccountId {
fn transactions_author(&self) -> bp_wococo::AccountId {
(*self.target_sign.public().as_array_ref()).into()
}
fn make_submit_finality_proof_transaction(
&self,
transaction_nonce: <Westend as Chain>::Index,
transaction_nonce: <Wococo as Chain>::Index,
header: RococoSyncHeader,
proof: GrandpaJustification<bp_rococo::Header>,
) -> Bytes {
let call = bp_westend::Call::BridgeGrandpaRococo(bp_westend::BridgeGrandpaRococoCall::submit_finality_proof(
let call = bp_wococo::Call::BridgeGrandpaRococo(bp_wococo::BridgeGrandpaRococoCall::submit_finality_proof(
header.into_inner(),
proof,
));
let genesis_hash = *self.target_client.genesis_hash();
let transaction = Westend::sign_transaction(genesis_hash, &self.target_sign, transaction_nonce, call);
let transaction = Wococo::sign_transaction(genesis_hash, &self.target_sign, transaction_nonce, call);
Bytes(transaction.encode())
}
@@ -48,7 +48,7 @@ impl SubstrateFinalitySyncPipeline for WestendFinalityToRococo {
header: WestendSyncHeader,
proof: GrandpaJustification<bp_westend::Header>,
) -> Bytes {
let call = bp_rococo::Call::BridgeGrandpaWestend(bp_rococo::BridgeGrandpaWestendCall::submit_finality_proof(
let call = bp_rococo::Call::BridgeGrandpaWestend(bp_rococo::BridgeGrandpaCall::submit_finality_proof(
header.into_inner(),
proof,
));
@@ -0,0 +1,39 @@
// Copyright 2019-2021 Parity Technologies (UK) Ltd.
// This file is part of Parity Bridges Common.
// Parity Bridges Common is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Bridges Common is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
use crate::cli::{encode_message, CliChain};
use frame_support::weights::Weight;
use relay_wococo_client::Wococo;
use sp_version::RuntimeVersion;
impl CliChain for Wococo {
const RUNTIME_VERSION: RuntimeVersion = bp_wococo::VERSION;
type KeyPair = sp_core::sr25519::Pair;
type MessagePayload = ();
fn ss58_format() -> u16 {
42
}
fn max_extrinsic_weight() -> Weight {
0
}
fn encode_message(_message: encode_message::MessagePayload) -> Result<Self::MessagePayload, String> {
Err("Sending messages from Wococo is not yet supported.".into())
}
}
@@ -0,0 +1,60 @@
// Copyright 2019-2021 Parity Technologies (UK) Ltd.
// This file is part of Parity Bridges Common.
// Parity Bridges Common is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Bridges Common is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
//! Wococo-to-Rococo headers sync entrypoint.
use crate::finality_pipeline::{SubstrateFinalitySyncPipeline, SubstrateFinalityToSubstrate};
use bp_header_chain::justification::GrandpaJustification;
use codec::Encode;
use relay_rococo_client::{Rococo, SigningParams as RococoSigningParams};
use relay_substrate_client::{Chain, TransactionSignScheme};
use relay_utils::metrics::MetricsParams;
use relay_wococo_client::{SyncHeader as WococoSyncHeader, Wococo};
use sp_core::{Bytes, Pair};
/// Wococo-to-Rococo finality sync pipeline.
pub(crate) type WococoFinalityToRococo = SubstrateFinalityToSubstrate<Wococo, Rococo, RococoSigningParams>;
impl SubstrateFinalitySyncPipeline for WococoFinalityToRococo {
const BEST_FINALIZED_SOURCE_HEADER_ID_AT_TARGET: &'static str = bp_wococo::BEST_FINALIZED_WOCOCO_HEADER_METHOD;
type TargetChain = Rococo;
fn customize_metrics(params: MetricsParams) -> anyhow::Result<MetricsParams> {
crate::chains::add_polkadot_kusama_price_metrics::<Self>(params)
}
fn transactions_author(&self) -> bp_rococo::AccountId {
(*self.target_sign.public().as_array_ref()).into()
}
fn make_submit_finality_proof_transaction(
&self,
transaction_nonce: <Rococo as Chain>::Index,
header: WococoSyncHeader,
proof: GrandpaJustification<bp_wococo::Header>,
) -> Bytes {
let call = bp_rococo::Call::BridgeGrandpaWococo(bp_rococo::BridgeGrandpaWococoCall::submit_finality_proof(
header.into_inner(),
proof,
));
let genesis_hash = *self.target_client.genesis_hash();
let transaction = Rococo::sign_transaction(genesis_hash, &self.target_sign, transaction_nonce, call);
Bytes(transaction.encode())
}
}
@@ -53,7 +53,7 @@ macro_rules! select_full_bridge {
// Derive-account
#[allow(unused_imports)]
use bp_millau::derive_account_from_rialto_id as derive_account;
use bp_rialto::derive_account_from_millau_id as derive_account;
// Relay-messages
#[allow(unused_imports)]
@@ -75,7 +75,7 @@ macro_rules! select_full_bridge {
// Derive-account
#[allow(unused_imports)]
use bp_rialto::derive_account_from_millau_id as derive_account;
use bp_millau::derive_account_from_rialto_id as derive_account;
// Relay-messages
#[allow(unused_imports)]
@@ -91,11 +91,11 @@ mod tests {
assert_eq!(
format!("{}", rialto_derived),
"73gLnUwrAdH4vMjbXCiNEpgyz1PLk9JxCaY4cKzvfSZT73KE"
"74GNQjmkcfstRftSQPJgMREchqHM56EvAUXRc266cZ1NYVW5"
);
assert_eq!(
format!("{}", millau_derived),
"5rpTJqGv1BPAYy2sXzkPpc3Wx1ZpQtgfuBsrDpNV4HsXAmbi"
"5rERgaT1Z8nM3et2epA5i1VtEBfp5wkhwHtVE8HK7BRbjAH2"
);
assert_eq!(millau_derived, millau2_derived);
}
@@ -44,8 +44,8 @@ arg_enum! {
MillauToRialto,
RialtoToMillau,
WestendToMillau,
WestendToRococo,
RococoToWestend,
RococoToWococo,
WococoToRococo,
}
}
@@ -102,26 +102,26 @@ macro_rules! select_bridge {
$generic
}
InitBridgeName::WestendToRococo => {
type Source = relay_westend_client::Westend;
InitBridgeName::RococoToWococo => {
type Source = relay_rococo_client::Rococo;
type Target = relay_wococo_client::Wococo;
fn encode_init_bridge(
init_data: InitializationData<<Source as ChainBase>::Header>,
) -> <Target as Chain>::Call {
bp_wococo::Call::BridgeGrandpaRococo(bp_wococo::BridgeGrandpaRococoCall::initialize(init_data))
}
$generic
}
InitBridgeName::WococoToRococo => {
type Source = relay_wococo_client::Wococo;
type Target = relay_rococo_client::Rococo;
fn encode_init_bridge(
init_data: InitializationData<<Source as ChainBase>::Header>,
) -> <Target as Chain>::Call {
bp_rococo::Call::BridgeGrandpaWestend(bp_rococo::BridgeGrandpaWestendCall::initialize(init_data))
}
$generic
}
InitBridgeName::RococoToWestend => {
type Source = relay_rococo_client::Rococo;
type Target = relay_westend_client::Westend;
fn encode_init_bridge(
init_data: InitializationData<<Source as ChainBase>::Header>,
) -> <Target as Chain>::Call {
bp_westend::Call::BridgeGrandpaRococo(bp_westend::BridgeGrandpaRococoCall::initialize(init_data))
bp_rococo::Call::BridgeGrandpaWococo(bp_rococo::BridgeGrandpaWococoCall::initialize(init_data))
}
$generic
@@ -89,8 +89,23 @@ pub enum Command {
}
impl Command {
// Initialize logger depending on the command.
fn init_logger(&self) {
use relay_utils::initialize::{initialize_logger, initialize_relay};
match self {
Self::RelayHeaders(_) | Self::RelayMessages(_) | Self::RelayHeadersAndMessages(_) | Self::InitBridge(_) => {
initialize_relay();
}
_ => {
initialize_logger(false);
}
}
}
/// Run the command.
pub async fn run(self) -> anyhow::Result<()> {
self.init_logger();
match self {
Self::RelayHeaders(arg) => arg.run().await?,
Self::RelayMessages(arg) => arg.run().await?,
@@ -42,8 +42,8 @@ arg_enum! {
MillauToRialto,
RialtoToMillau,
WestendToMillau,
WestendToRococo,
RococoToWestend,
RococoToWococo,
WococoToRococo,
}
}
@@ -71,17 +71,17 @@ macro_rules! select_bridge {
$generic
}
RelayHeadersBridge::WestendToRococo => {
type Source = relay_westend_client::Westend;
type Target = relay_rococo_client::Rococo;
type Finality = crate::chains::westend_headers_to_rococo::WestendFinalityToRococo;
RelayHeadersBridge::RococoToWococo => {
type Source = relay_rococo_client::Rococo;
type Target = relay_wococo_client::Wococo;
type Finality = crate::chains::rococo_headers_to_wococo::RococoFinalityToWococo;
$generic
}
RelayHeadersBridge::RococoToWestend => {
type Source = relay_rococo_client::Rococo;
type Target = relay_westend_client::Westend;
type Finality = crate::chains::rococo_headers_to_westend::RococoFinalityToWestend;
RelayHeadersBridge::WococoToRococo => {
type Source = relay_wococo_client::Wococo;
type Target = relay_rococo_client::Rococo;
type Finality = crate::chains::wococo_headers_to_rococo::WococoFinalityToRococo;
$generic
}
@@ -102,6 +102,7 @@ impl RelayHeaders {
Finality::new(target_client.clone(), target_sign),
source_client,
target_client,
false,
metrics_params,
)
.await
@@ -96,6 +96,9 @@ macro_rules! select_bridge {
type LeftToRightMessages = crate::chains::millau_messages_to_rialto::MillauMessagesToRialto;
type RightToLeftMessages = crate::chains::rialto_messages_to_millau::RialtoMessagesToMillau;
const MAX_MISSING_LEFT_HEADERS_AT_RIGHT: bp_millau::BlockNumber = bp_millau::SESSION_LENGTH;
const MAX_MISSING_RIGHT_HEADERS_AT_LEFT: bp_rialto::BlockNumber = bp_rialto::SESSION_LENGTH;
use crate::chains::millau_messages_to_rialto::run as left_to_right_messages;
use crate::chains::rialto_messages_to_millau::run as right_to_left_messages;
@@ -131,11 +134,13 @@ impl RelayHeadersAndMessages {
left_client.clone(),
right_client.clone(),
LeftToRightFinality::new(right_client.clone(), right_sign.clone()),
MAX_MISSING_LEFT_HEADERS_AT_RIGHT,
);
let right_to_left_on_demand_headers = OnDemandHeadersRelay::new(
right_client.clone(),
left_client.clone(),
RightToLeftFinality::new(left_client.clone(), left_sign.clone()),
MAX_MISSING_RIGHT_HEADERS_AT_LEFT,
);
let left_to_right_messages = left_to_right_messages(MessagesRelayParams {
@@ -21,9 +21,9 @@ use crate::cli::{
Balance, CliChain, ExplicitOrMaximal, HexBytes, HexLaneId, Origins, SourceConnectionParams, SourceSigningParams,
TargetSigningParams,
};
use bp_message_dispatch::{CallOrigin, MessagePayload};
use codec::Encode;
use frame_support::{dispatch::GetDispatchInfo, weights::Weight};
use pallet_bridge_dispatch::{CallOrigin, MessagePayload};
use relay_substrate_client::{Chain, TransactionSignScheme};
use sp_core::{Bytes, Pair};
use sp_runtime::{traits::IdentifyAccount, AccountId32, MultiSignature, MultiSigner};
@@ -96,7 +96,7 @@ where
SourceChain: Clone + Chain + Debug,
BlockNumberOf<SourceChain>: BlockNumberBase,
TargetChain: Clone + Chain + Debug,
TargetSign: Clone + Send + Sync,
TargetSign: 'static + Clone + Send + Sync,
{
const SOURCE_NAME: &'static str = SourceChain::NAME;
const TARGET_NAME: &'static str = TargetChain::NAME;
@@ -112,6 +112,7 @@ pub async fn run<SourceChain, TargetChain, P>(
pipeline: P,
source_client: Client<SourceChain>,
target_client: Client<TargetChain>,
is_on_demand_task: bool,
metrics_params: MetricsParams,
) -> anyhow::Result<()>
where
@@ -137,6 +138,7 @@ where
FinalitySource::new(source_client),
SubstrateFinalityTarget::new(target_client, pipeline),
FinalitySyncParams {
is_on_demand_task,
tick: std::cmp::max(SourceChain::AVERAGE_BLOCK_INTERVAL, TargetChain::AVERAGE_BLOCK_INTERVAL),
recent_finality_proofs_limit: RECENT_FINALITY_PROOFS_LIMIT,
stall_timeout: STALL_TIMEOUT,
@@ -18,8 +18,6 @@
#![warn(missing_docs)]
use relay_utils::initialize::initialize_logger;
mod chains;
mod cli;
mod finality_pipeline;
@@ -31,7 +29,6 @@ mod messages_target;
mod on_demand_headers;
fn main() {
initialize_logger(false);
let command = cli::parse_args();
let run = command.run();
let result = async_std::task::block_on(run);
@@ -203,7 +203,7 @@ mod tests {
// reserved for messages dispatch allows dispatch of non-trivial messages.
//
// Any significant change in this values should attract additional attention.
(1020, 216_583_333_334),
(1013, 216_583_333_334),
);
}
}
@@ -93,7 +93,7 @@ impl<C, P, R, I> RelayClient for SubstrateMessagesSource<C, P, R, I>
where
C: Chain,
P: SubstrateMessageLane,
R: Send + Sync,
R: 'static + Send + Sync,
I: Send + Sync + Instance,
{
type Error = SubstrateError;
@@ -93,7 +93,7 @@ impl<C, P, R, I> RelayClient for SubstrateMessagesTarget<C, P, R, I>
where
C: Chain,
P: SubstrateMessageLane,
R: Send + Sync,
R: 'static + Send + Sync,
I: Send + Sync + Instance,
{
type Error = SubstrateError;
@@ -20,14 +20,22 @@ use crate::finality_pipeline::{SubstrateFinalitySyncPipeline, SubstrateFinalityT
use crate::finality_target::SubstrateFinalityTarget;
use bp_header_chain::justification::GrandpaJustification;
use finality_relay::TargetClient as FinalityTargetClient;
use finality_relay::{
FinalitySyncPipeline, SourceClient as FinalitySourceClient, TargetClient as FinalityTargetClient,
};
use futures::{
channel::{mpsc, oneshot},
select, FutureExt, StreamExt,
};
use num_traits::Zero;
use relay_substrate_client::{BlockNumberOf, Chain, Client, HashOf, HeaderIdOf, SyncHeader};
use relay_utils::{metrics::MetricsParams, BlockNumberBase, HeaderId};
use num_traits::{CheckedSub, Zero};
use relay_substrate_client::{
finality_source::FinalitySource as SubstrateFinalitySource, BlockNumberOf, Chain, Client, HashOf, HeaderIdOf,
SyncHeader,
};
use relay_utils::{
metrics::MetricsParams, relay_loop::Client as RelayClient, BlockNumberBase, FailedClient, HeaderId,
MaybeConnectionError,
};
use std::fmt::Debug;
/// On-demand Substrate <-> Substrate headers relay.
@@ -49,6 +57,7 @@ impl<SourceChain: Chain> OnDemandHeadersRelay<SourceChain> {
source_client: Client<SourceChain>,
target_client: Client<TargetChain>,
pipeline: SubstrateFinalityToSubstrate<SourceChain, TargetChain, TargetSign>,
maximal_headers_difference: SourceChain::BlockNumber,
) -> Self
where
SourceChain: Chain + Debug,
@@ -68,7 +77,14 @@ impl<SourceChain: Chain> OnDemandHeadersRelay<SourceChain> {
{
let (required_header_tx, required_header_rx) = mpsc::channel(1);
async_std::task::spawn(async move {
background_task(source_client, target_client, pipeline, required_header_rx).await;
background_task(
source_client,
target_client,
pipeline,
maximal_headers_difference,
required_header_rx,
)
.await;
});
let background_task_name = format!(
@@ -100,6 +116,7 @@ async fn background_task<SourceChain, TargetChain, TargetSign>(
source_client: Client<SourceChain>,
target_client: Client<TargetChain>,
pipeline: SubstrateFinalityToSubstrate<SourceChain, TargetChain, TargetSign>,
maximal_headers_difference: SourceChain::BlockNumber,
mut required_header_rx: mpsc::Receiver<HeaderIdOf<SourceChain>>,
) where
SourceChain: Chain + Debug,
@@ -118,7 +135,11 @@ async fn background_task<SourceChain, TargetChain, TargetSign>(
FinalityTargetClient<SubstrateFinalityToSubstrate<SourceChain, TargetChain, TargetSign>>,
{
let relay_task_name = on_demand_headers_relay_name::<SourceChain, TargetChain>();
let finality_target = SubstrateFinalityTarget::new(target_client.clone(), pipeline.clone());
let mut finality_source = SubstrateFinalitySource::<
_,
SubstrateFinalityToSubstrate<SourceChain, TargetChain, TargetSign>,
>::new(source_client.clone());
let mut finality_target = SubstrateFinalityTarget::new(target_client.clone(), pipeline.clone());
let mut active_headers_relay = None;
let mut required_header_number = Zero::zero();
@@ -150,30 +171,45 @@ async fn background_task<SourceChain, TargetChain, TargetSign>(
},
}
// read best finalized source block from target
let available_header_number = match finality_target.best_finalized_source_block_number().await {
Ok(available_header_number) => available_header_number,
Err(error) => {
log::error!(
target: "bridge",
"Failed to read best finalized {} header from {} in {} relay: {:?}",
SourceChain::NAME,
TargetChain::NAME,
relay_task_name,
error,
);
// read best finalized source header number from source
let best_finalized_source_header_at_source =
best_finalized_source_header_at_source(&finality_source, &relay_task_name).await;
if matches!(best_finalized_source_header_at_source, Err(ref e) if e.is_connection_error()) {
relay_utils::relay_loop::reconnect_failed_client(
FailedClient::Source,
relay_utils::relay_loop::RECONNECT_DELAY,
&mut finality_source,
&mut finality_target,
)
.await;
continue;
}
// we don't know what's happening with target client, so better to stop on-demand relay than
// submit unneeded transactions
// => assume that required header is known to the target node
required_header_number
}
};
// read best finalized source header number from target
let best_finalized_source_header_at_target =
best_finalized_source_header_at_target::<SourceChain, _, _>(&finality_target, &relay_task_name).await;
if matches!(best_finalized_source_header_at_target, Err(ref e) if e.is_connection_error()) {
relay_utils::relay_loop::reconnect_failed_client(
FailedClient::Target,
relay_utils::relay_loop::RECONNECT_DELAY,
&mut finality_source,
&mut finality_target,
)
.await;
continue;
}
// start or stop headers relay if required
let activate = required_header_number > available_header_number;
match (activate, active_headers_relay.is_some()) {
(true, false) => {
let action = select_on_demand_relay_action::<SourceChain>(
best_finalized_source_header_at_source.ok(),
best_finalized_source_header_at_target.ok(),
required_header_number,
maximal_headers_difference,
&relay_task_name,
active_headers_relay.is_some(),
);
match action {
OnDemandRelayAction::Start => {
let (relay_exited_tx, new_relay_exited_rx) = oneshot::channel();
active_headers_relay = start_on_demand_headers_relay(
relay_task_name.clone(),
@@ -186,14 +222,127 @@ async fn background_task<SourceChain, TargetChain, TargetSign>(
relay_exited_rx = new_relay_exited_rx.right_future();
}
}
(false, true) => {
OnDemandRelayAction::Stop => {
stop_on_demand_headers_relay(active_headers_relay.take()).await;
}
_ => (),
OnDemandRelayAction::None => (),
}
}
}
/// Read best finalized source block number from source client.
///
/// Returns `None` if we have failed to read the number.
async fn best_finalized_source_header_at_source<SourceChain: Chain, P>(
finality_source: &SubstrateFinalitySource<SourceChain, P>,
relay_task_name: &str,
) -> Result<SourceChain::BlockNumber, <SubstrateFinalitySource<SourceChain, P> as RelayClient>::Error>
where
SubstrateFinalitySource<SourceChain, P>: FinalitySourceClient<P>,
P: FinalitySyncPipeline<Number = SourceChain::BlockNumber>,
{
finality_source.best_finalized_block_number().await.map_err(|error| {
log::error!(
target: "bridge",
"Failed to read best finalized source header from source in {} relay: {:?}",
relay_task_name,
error,
);
error
})
}
/// Read best finalized source block number from target client.
///
/// Returns `None` if we have failed to read the number.
async fn best_finalized_source_header_at_target<SourceChain: Chain, TargetChain: Chain, P>(
finality_target: &SubstrateFinalityTarget<TargetChain, P>,
relay_task_name: &str,
) -> Result<SourceChain::BlockNumber, <SubstrateFinalityTarget<TargetChain, P> as RelayClient>::Error>
where
SubstrateFinalityTarget<TargetChain, P>: FinalityTargetClient<P>,
P: FinalitySyncPipeline<Number = SourceChain::BlockNumber>,
{
finality_target
.best_finalized_source_block_number()
.await
.map_err(|error| {
log::error!(
target: "bridge",
"Failed to read best finalized source header from target in {} relay: {:?}",
relay_task_name,
error,
);
error
})
}
/// What to do with the on-demand relay task?
#[derive(Debug, PartialEq)]
enum OnDemandRelayAction {
Start,
Stop,
None,
}
fn select_on_demand_relay_action<C: Chain>(
best_finalized_source_header_at_source: Option<C::BlockNumber>,
best_finalized_source_header_at_target: Option<C::BlockNumber>,
mut required_source_header_at_target: C::BlockNumber,
maximal_headers_difference: C::BlockNumber,
relay_task_name: &str,
is_active: bool,
) -> OnDemandRelayAction {
// if we have been unable to read header number from the target, then let's assume
// that it is the same as required header number. Otherwise we risk submitting
// unneeded transactions
let best_finalized_source_header_at_target =
best_finalized_source_header_at_target.unwrap_or(required_source_header_at_target);
// if we have been unable to read header number from the source, then let's assume
// that it is the same as at the target
let best_finalized_source_header_at_source =
best_finalized_source_header_at_source.unwrap_or(best_finalized_source_header_at_target);
// if there are too many source headers missing from the target node, require some
// new headers at target
//
// why do we need that? When complex headers+messages relay is used, it'll normally only relay
// headers when there are undelivered messages/confirmations. But security model of the
// `pallet-bridge-grandpa` module relies on the fact that headers are synced in real-time and
// that it'll see authorities-change header before unbonding period will end for previous
// authorities set.
let current_headers_difference = best_finalized_source_header_at_source
.checked_sub(&best_finalized_source_header_at_target)
.unwrap_or_else(Zero::zero);
if current_headers_difference > maximal_headers_difference {
required_source_header_at_target = best_finalized_source_header_at_source;
// don't log if relay is already running
if !is_active {
log::trace!(
target: "bridge",
"Too many {} headers missing at target in {} relay ({} vs {}). Going to sync up to the {}",
C::NAME,
relay_task_name,
best_finalized_source_header_at_source,
best_finalized_source_header_at_target,
best_finalized_source_header_at_source,
);
}
}
// now let's select what to do with relay
let needs_to_be_active = required_source_header_at_target > best_finalized_source_header_at_target;
match (needs_to_be_active, is_active) {
(true, false) => OnDemandRelayAction::Start,
(false, true) => OnDemandRelayAction::Stop,
_ => OnDemandRelayAction::None,
}
}
/// On-demand headers relay task name.
fn on_demand_headers_relay_name<SourceChain: Chain, TargetChain: Chain>() -> String {
format!("on-demand-{}-to-{}", SourceChain::NAME, TargetChain::NAME)
@@ -219,7 +368,7 @@ where
TargetSign: 'static,
{
let headers_relay_future =
crate::finality_pipeline::run(pipeline, source_client, target_client, MetricsParams::disabled());
crate::finality_pipeline::run(pipeline, source_client, target_client, true, MetricsParams::disabled());
let closure_task_name = task_name.clone();
async_std::task::Builder::new()
.name(task_name.clone())
@@ -253,3 +402,52 @@ async fn stop_on_demand_headers_relay(task: Option<async_std::task::JoinHandle<(
log::info!(target: "bridge", "Cancelled {} headers relay", task_name);
}
}
#[cfg(test)]
mod tests {
use super::*;
type TestChain = relay_millau_client::Millau;
const AT_SOURCE: Option<bp_millau::BlockNumber> = Some(10);
const AT_TARGET: Option<bp_millau::BlockNumber> = Some(1);
#[test]
fn starts_relay_when_headers_are_required() {
assert_eq!(
select_on_demand_relay_action::<TestChain>(AT_SOURCE, AT_TARGET, 5, 100, "test", false),
OnDemandRelayAction::Start,
);
assert_eq!(
select_on_demand_relay_action::<TestChain>(AT_SOURCE, AT_TARGET, 5, 100, "test", true),
OnDemandRelayAction::None,
);
}
#[test]
fn starts_relay_when_too_many_headers_missing() {
assert_eq!(
select_on_demand_relay_action::<TestChain>(AT_SOURCE, AT_TARGET, 0, 5, "test", false),
OnDemandRelayAction::Start,
);
assert_eq!(
select_on_demand_relay_action::<TestChain>(AT_SOURCE, AT_TARGET, 0, 5, "test", true),
OnDemandRelayAction::None,
);
}
#[test]
fn stops_relay_if_required_header_is_synced() {
assert_eq!(
select_on_demand_relay_action::<TestChain>(AT_SOURCE, AT_TARGET, AT_TARGET.unwrap(), 100, "test", true),
OnDemandRelayAction::Stop,
);
assert_eq!(
select_on_demand_relay_action::<TestChain>(AT_SOURCE, AT_TARGET, AT_TARGET.unwrap(), 100, "test", false),
OnDemandRelayAction::None,
);
}
}
@@ -10,8 +10,8 @@ bp-eth-poa = { path = "../../primitives/ethereum-poa" }
codec = { package = "parity-scale-codec", version = "2.0.0" }
headers-relay = { path = "../headers" }
hex-literal = "0.3"
jsonrpsee-proc-macros = "=0.2.0-alpha.5"
jsonrpsee-ws-client = "=0.2.0-alpha.5"
jsonrpsee-proc-macros = "=0.2.0-alpha.6"
jsonrpsee-ws-client = "=0.2.0-alpha.6"
libsecp256k1 = { version = "0.3.4", default-features = false, features = ["hmac"] }
log = "0.4.11"
relay-utils = { path = "../utils" }
@@ -9,8 +9,8 @@ license = "GPL-3.0-or-later WITH Classpath-exception-2.0"
async-std = "1.6.5"
async-trait = "0.1.40"
codec = { package = "parity-scale-codec", version = "2.0.0" }
jsonrpsee-proc-macros = "=0.2.0-alpha.5"
jsonrpsee-ws-client = "=0.2.0-alpha.5"
jsonrpsee-proc-macros = "=0.2.0-alpha.6"
jsonrpsee-ws-client = "=0.2.0-alpha.6"
log = "0.4.11"
num-traits = "0.2"
rand = "0.7"
@@ -0,0 +1,23 @@
[package]
name = "relay-wococo-client"
version = "0.1.0"
authors = ["Parity Technologies <admin@parity.io>"]
edition = "2018"
license = "GPL-3.0-or-later WITH Classpath-exception-2.0"
[dependencies]
codec = { package = "parity-scale-codec", version = "2.0.0" }
headers-relay = { path = "../headers" }
relay-substrate-client = { path = "../client-substrate" }
relay-utils = { path = "../utils" }
# Bridge dependencies
bp-wococo = { path = "../../primitives/chain-wococo" }
# Substrate Dependencies
frame-system = { git = "https://github.com/paritytech/substrate", branch = "master" }
frame-support = { git = "https://github.com/paritytech/substrate", branch = "master" }
pallet-transaction-payment = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "master" }
@@ -0,0 +1,97 @@
// Copyright 2019-2021 Parity Technologies (UK) Ltd.
// This file is part of Parity Bridges Common.
// Parity Bridges Common is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Bridges Common is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
//! Types used to connect to the Wococo-Substrate chain.
use codec::Encode;
use relay_substrate_client::{Chain, ChainBase, ChainWithBalances, TransactionSignScheme};
use sp_core::{storage::StorageKey, Pair};
use sp_runtime::{generic::SignedPayload, traits::IdentifyAccount};
use std::time::Duration;
/// Wococo header id.
pub type HeaderId = relay_utils::HeaderId<bp_wococo::Hash, bp_wococo::BlockNumber>;
/// Wococo header type used in headers sync.
pub type SyncHeader = relay_substrate_client::SyncHeader<bp_wococo::Header>;
/// Wococo chain definition
#[derive(Debug, Clone, Copy)]
pub struct Wococo;
impl ChainBase for Wococo {
type BlockNumber = bp_wococo::BlockNumber;
type Hash = bp_wococo::Hash;
type Hasher = bp_wococo::Hashing;
type Header = bp_wococo::Header;
}
impl Chain for Wococo {
const NAME: &'static str = "Wococo";
const AVERAGE_BLOCK_INTERVAL: Duration = Duration::from_secs(6);
type AccountId = bp_wococo::AccountId;
type Index = bp_wococo::Index;
type SignedBlock = bp_wococo::SignedBlock;
type Call = bp_wococo::Call;
}
impl ChainWithBalances for Wococo {
type NativeBalance = bp_wococo::Balance;
fn account_info_storage_key(account_id: &Self::AccountId) -> StorageKey {
StorageKey(bp_wococo::account_info_storage_key(account_id))
}
}
impl TransactionSignScheme for Wococo {
type Chain = Wococo;
type AccountKeyPair = sp_core::sr25519::Pair;
type SignedTransaction = bp_wococo::UncheckedExtrinsic;
fn sign_transaction(
genesis_hash: <Self::Chain as ChainBase>::Hash,
signer: &Self::AccountKeyPair,
signer_nonce: <Self::Chain as Chain>::Index,
call: <Self::Chain as Chain>::Call,
) -> Self::SignedTransaction {
let raw_payload = SignedPayload::new(
call,
bp_wococo::SignedExtensions::new(
bp_wococo::VERSION,
sp_runtime::generic::Era::Immortal,
genesis_hash,
signer_nonce,
0,
),
)
.expect("SignedExtension never fails.");
let signature = raw_payload.using_encoded(|payload| signer.sign(payload));
let signer: sp_runtime::MultiSigner = signer.public().into();
let (call, extra, _) = raw_payload.deconstruct();
bp_wococo::UncheckedExtrinsic::new_signed(
call,
sp_runtime::MultiAddress::Id(signer.into_account()),
signature.into(),
extra,
)
}
}
/// Wococo signing params.
pub type SigningParams = sp_core::sr25519::Pair;
@@ -26,7 +26,7 @@ use std::{
};
/// Transaction proof pipeline.
pub trait TransactionProofPipeline {
pub trait TransactionProofPipeline: 'static {
/// Name of the transaction proof source.
const SOURCE_NAME: &'static str;
/// Name of the transaction proof target.
@@ -35,18 +35,21 @@ pub trait TransactionProofPipeline {
/// Block type.
type Block: SourceBlock;
/// Transaction inclusion proof type.
type TransactionProof;
type TransactionProof: 'static + Send + Sync;
}
/// Block that is participating in exchange.
pub trait SourceBlock {
pub trait SourceBlock: 'static + Send + Sync {
/// Block hash type.
type Hash: Clone + Debug + Display;
type Hash: 'static + Clone + Send + Sync + Debug + Display;
/// Block number type.
type Number: Debug
type Number: 'static
+ Debug
+ Display
+ Clone
+ Copy
+ Send
+ Sync
+ Into<u64>
+ std::cmp::Ord
+ std::ops::Add<Output = Self::Number>
@@ -61,7 +64,7 @@ pub trait SourceBlock {
}
/// Transaction that is participating in exchange.
pub trait SourceTransaction {
pub trait SourceTransaction: 'static + Send {
/// Transaction hash type.
type Hash: Debug + Display;
@@ -39,9 +39,9 @@ pub struct TransactionProofsRelayState<BlockNumber> {
}
/// Transactions proofs relay storage.
pub trait TransactionProofsRelayStorage: Clone {
pub trait TransactionProofsRelayStorage: 'static + Clone + Send + Sync {
/// Associated block number.
type BlockNumber;
type BlockNumber: 'static + Send + Sync;
/// Get relay state.
fn state(&self) -> TransactionProofsRelayState<Self::BlockNumber>;
@@ -64,7 +64,7 @@ impl<BlockNumber> InMemoryStorage<BlockNumber> {
}
}
impl<BlockNumber: Clone + Copy> TransactionProofsRelayStorage for InMemoryStorage<BlockNumber> {
impl<BlockNumber: 'static + Clone + Copy + Send + Sync> TransactionProofsRelayStorage for InMemoryStorage<BlockNumber> {
type BlockNumber = BlockNumber;
fn state(&self) -> TransactionProofsRelayState<BlockNumber> {
@@ -89,7 +89,7 @@ pub async fn run<P: TransactionProofPipeline>(
source_client: impl SourceClient<P>,
target_client: impl TargetClient<P>,
metrics_params: MetricsParams,
exit_signal: impl Future<Output = ()>,
exit_signal: impl Future<Output = ()> + 'static + Send,
) -> Result<(), String> {
let exit_signal = exit_signal.shared();
@@ -99,7 +99,7 @@ pub async fn run<P: TransactionProofPipeline>(
.standalone_metric(|registry, prefix| GlobalMetrics::new(registry, prefix))?
.expose()
.await?
.run(|source_client, target_client, metrics| {
.run(metrics_prefix::<P>(), move |source_client, target_client, metrics| {
run_until_connection_lost(
storage.clone(),
source_client,
@@ -117,7 +117,7 @@ async fn run_until_connection_lost<P: TransactionProofPipeline>(
source_client: impl SourceClient<P>,
target_client: impl TargetClient<P>,
metrics_exch: Option<ExchangeLoopMetrics>,
exit_signal: impl Future<Output = ()>,
exit_signal: impl Future<Output = ()> + Send,
) -> Result<(), FailedClient> {
let mut retry_backoff = retry_backoff();
let mut state = storage.state();
@@ -39,6 +39,8 @@ use std::{
/// Finality proof synchronization loop parameters.
#[derive(Debug, Clone)]
pub struct FinalitySyncParams {
/// If `true`, then the separate async task for running finality loop is NOT spawned.
pub is_on_demand_task: bool,
/// Interval at which we check updates on both clients. Normally should be larger than
/// `min(source_block_time, target_block_time)`.
///
@@ -65,7 +67,7 @@ pub struct FinalitySyncParams {
pub trait SourceClient<P: FinalitySyncPipeline>: RelayClient {
/// Stream of new finality proofs. The stream is allowed to miss proofs for some
/// headers, even if those headers are mandatory.
type FinalityProofsStream: Stream<Item = P::FinalityProof>;
type FinalityProofsStream: Stream<Item = P::FinalityProof> + Send;
/// Get best finalized block number.
async fn best_finalized_block_number(&self) -> Result<P::Number, Self::Error>;
@@ -101,16 +103,17 @@ pub async fn run<P: FinalitySyncPipeline>(
target_client: impl TargetClient<P>,
sync_params: FinalitySyncParams,
metrics_params: MetricsParams,
exit_signal: impl Future<Output = ()>,
exit_signal: impl Future<Output = ()> + 'static + Send,
) -> Result<(), String> {
let exit_signal = exit_signal.shared();
relay_utils::relay_loop(source_client, target_client)
.spawn_loop_task(!sync_params.is_on_demand_task)
.with_metrics(Some(metrics_prefix::<P>()), metrics_params)
.loop_metric(|registry, prefix| SyncLoopMetrics::new(registry, prefix))?
.standalone_metric(|registry, prefix| GlobalMetrics::new(registry, prefix))?
.expose()
.await?
.run(|source_client, target_client, metrics| {
.run(metrics_prefix::<P>(), move |source_client, target_client, metrics| {
run_until_connection_lost(
source_client,
target_client,
@@ -106,7 +106,7 @@ impl RelayClient for TestSourceClient {
#[async_trait]
impl SourceClient<TestFinalitySyncPipeline> for TestSourceClient {
type FinalityProofsStream = Pin<Box<dyn Stream<Item = TestFinalityProof>>>;
type FinalityProofsStream = Pin<Box<dyn Stream<Item = TestFinalityProof> + 'static + Send>>;
async fn best_finalized_block_number(&self) -> Result<TestNumber, TestError> {
let mut data = self.data.lock();
@@ -197,6 +197,7 @@ fn run_sync_loop(state_function: impl Fn(&mut ClientsData) -> bool + Send + Sync
data: clients_data.clone(),
};
let sync_params = FinalitySyncParams {
is_on_demand_task: false,
tick: Duration::from_secs(0),
recent_finality_proofs_limit: 1024,
stall_timeout: Duration::from_secs(1),
+1 -1
View File
@@ -28,7 +28,7 @@ mod finality_loop;
mod finality_loop_tests;
/// Finality proofs synchronization pipeline.
pub trait FinalitySyncPipeline: Clone + Debug + Send + Sync {
pub trait FinalitySyncPipeline: 'static + Clone + Debug + Send + Sync {
/// Name of the finality proofs source.
const SOURCE_NAME: &'static str;
/// Name of the finality proofs target.
@@ -102,7 +102,7 @@ pub trait TargetClient<P: HeadersSyncPipeline>: RelayClient {
/// Synchronization maintain procedure.
#[async_trait]
pub trait SyncMaintain<P: HeadersSyncPipeline>: Clone + Send + Sync {
pub trait SyncMaintain<P: HeadersSyncPipeline>: 'static + Clone + Send + Sync {
/// Run custom maintain procedures. This is guaranteed to be called when both source and target
/// clients are unoccupied.
async fn maintain(&self, _sync: &mut HeadersSync<P>) {}
@@ -125,7 +125,7 @@ pub async fn run<P: HeadersSyncPipeline, TC: TargetClient<P>>(
sync_maintain: impl SyncMaintain<P>,
sync_params: HeadersSyncParams,
metrics_params: MetricsParams,
exit_signal: impl Future<Output = ()>,
exit_signal: impl Future<Output = ()> + 'static + Send,
) -> Result<(), String> {
let exit_signal = exit_signal.shared();
relay_utils::relay_loop(source_client, target_client)
@@ -134,7 +134,7 @@ pub async fn run<P: HeadersSyncPipeline, TC: TargetClient<P>>(
.standalone_metric(|registry, prefix| GlobalMetrics::new(registry, prefix))?
.expose()
.await?
.run(|source_client, target_client, metrics| {
.run(metrics_prefix::<P>(), move |source_client, target_client, metrics| {
run_until_connection_lost(
source_client,
source_tick,
@@ -159,7 +159,7 @@ async fn run_until_connection_lost<P: HeadersSyncPipeline, TC: TargetClient<P>>(
sync_maintain: impl SyncMaintain<P>,
sync_params: HeadersSyncParams,
metrics_sync: Option<SyncLoopMetrics>,
exit_signal: impl Future<Output = ()>,
exit_signal: impl Future<Output = ()> + Send,
) -> Result<(), FailedClient> {
let mut progress_context = (Instant::now(), None, None);
@@ -43,7 +43,7 @@ pub enum HeaderStatus {
}
/// Headers synchronization pipeline.
pub trait HeadersSyncPipeline: Clone + Send + Sync {
pub trait HeadersSyncPipeline: 'static + Clone + Send + Sync {
/// Name of the headers source.
const SOURCE_NAME: &'static str;
/// Name of the headers target.
@@ -23,7 +23,7 @@ use relay_utils::{BlockNumberBase, HeaderId};
use std::fmt::Debug;
/// One-way message lane.
pub trait MessageLane: Clone + Send + Sync {
pub trait MessageLane: 'static + Clone + Send + Sync {
/// Name of the messages source.
const SOURCE_NAME: &'static str;
/// Name of the messages target.
@@ -227,7 +227,7 @@ pub async fn run<P: MessageLane>(
source_client: impl SourceClient<P>,
target_client: impl TargetClient<P>,
metrics_params: MetricsParams,
exit_signal: impl Future<Output = ()>,
exit_signal: impl Future<Output = ()> + Send + 'static,
) -> Result<(), String> {
let exit_signal = exit_signal.shared();
relay_utils::relay_loop(source_client, target_client)
@@ -237,15 +237,18 @@ pub async fn run<P: MessageLane>(
.standalone_metric(|registry, prefix| GlobalMetrics::new(registry, prefix))?
.expose()
.await?
.run(|source_client, target_client, metrics| {
run_until_connection_lost(
params.clone(),
source_client,
target_client,
metrics,
exit_signal.clone(),
)
})
.run(
metrics_prefix::<P>(&params.lane),
move |source_client, target_client, metrics| {
run_until_connection_lost(
params.clone(),
source_client,
target_client,
metrics,
exit_signal.clone(),
)
},
)
.await
}
@@ -579,6 +582,9 @@ pub(crate) mod tests {
) -> Result<(), TestError> {
let mut data = self.data.lock();
(self.tick)(&mut *data);
data.source_state.best_self =
HeaderId(data.source_state.best_self.0 + 1, data.source_state.best_self.1 + 1);
data.source_state.best_finalized_self = data.source_state.best_self;
data.submitted_messages_receiving_proofs.push(proof);
data.source_latest_confirmed_received_nonce = proof;
Ok(())
@@ -681,6 +687,7 @@ pub(crate) mod tests {
}
data.target_state.best_self =
HeaderId(data.target_state.best_self.0 + 1, data.target_state.best_self.1 + 1);
data.target_state.best_finalized_self = data.target_state.best_self;
data.target_latest_received_nonce = *proof.0.end();
if let Some(target_latest_confirmed_received_nonce) = proof.1 {
data.target_latest_confirmed_received_nonce = target_latest_confirmed_received_nonce;
@@ -701,7 +708,7 @@ pub(crate) mod tests {
data: TestClientData,
source_tick: Arc<dyn Fn(&mut TestClientData) + Send + Sync>,
target_tick: Arc<dyn Fn(&mut TestClientData) + Send + Sync>,
exit_signal: impl Future<Output = ()>,
exit_signal: impl Future<Output = ()> + 'static + Send,
) -> TestClientData {
async_std::task::block_on(async {
let data = Arc::new(Mutex::new(data));
@@ -809,37 +816,37 @@ pub(crate) mod tests {
..Default::default()
},
Arc::new(|data: &mut TestClientData| {
// blocks are produced on every tick
data.source_state.best_self =
HeaderId(data.source_state.best_self.0 + 1, data.source_state.best_self.1 + 1);
data.source_state.best_finalized_self = data.source_state.best_self;
// headers relay must only be started when we need new target headers at source node
if data.target_to_source_header_required.is_some() {
assert!(data.source_state.best_finalized_peer_at_best_self.0 < data.target_state.best_self.0);
data.target_to_source_header_required = None;
}
// syncing target headers -> source chain
if let Some(last_requirement) = data.target_to_source_header_requirements.last() {
if *last_requirement != data.source_state.best_finalized_peer_at_best_self {
data.source_state.best_finalized_peer_at_best_self = *last_requirement;
}
}
}),
Arc::new(move |data: &mut TestClientData| {
// blocks are produced on every tick
data.target_state.best_self =
HeaderId(data.target_state.best_self.0 + 1, data.target_state.best_self.1 + 1);
data.target_state.best_finalized_self = data.target_state.best_self;
// headers relay must only be started when we need new source headers at target node
if data.source_to_target_header_required.is_some() {
assert!(data.target_state.best_finalized_peer_at_best_self.0 < data.source_state.best_self.0);
data.source_to_target_header_required = None;
}
// syncing source headers -> target chain (all at once)
if data.target_state.best_finalized_peer_at_best_self.0 < data.source_state.best_finalized_self.0 {
data.target_state.best_finalized_peer_at_best_self = data.source_state.best_finalized_self;
}
// syncing source headers -> target chain (all at once)
if data.source_state.best_finalized_peer_at_best_self.0 < data.target_state.best_finalized_self.0 {
data.source_state.best_finalized_peer_at_best_self = data.target_state.best_finalized_self;
}
// if target has received messages batch => increase blocks so that confirmations may be sent
if data.target_latest_received_nonce == 4
|| data.target_latest_received_nonce == 8
|| data.target_latest_received_nonce == 10
{
data.target_state.best_self =
HeaderId(data.target_state.best_self.0 + 1, data.target_state.best_self.0 + 1);
data.target_state.best_finalized_self = data.target_state.best_self;
data.source_state.best_self =
HeaderId(data.source_state.best_self.0 + 1, data.source_state.best_self.0 + 1);
data.source_state.best_finalized_self = data.source_state.best_self;
// syncing source headers -> target chain
if let Some(last_requirement) = data.source_to_target_header_requirements.last() {
if *last_requirement != data.target_state.best_finalized_peer_at_best_self {
data.target_state.best_finalized_peer_at_best_self = *last_requirement;
}
}
// if source has received all messages receiving confirmations => stop
if data.source_latest_confirmed_received_nonce == 10 {
@@ -292,7 +292,16 @@ impl<P: MessageLane> RaceStrategy<SourceHeaderIdOf<P>, TargetHeaderIdOf<P>, P::M
}
fn required_source_header_at_target(&self, current_best: &SourceHeaderIdOf<P>) -> Option<SourceHeaderIdOf<P>> {
self.strategy.required_source_header_at_target(current_best)
let header_required_for_messages_delivery = self.strategy.required_source_header_at_target(current_best);
let header_required_for_reward_confirmations_delivery =
self.latest_confirmed_nonces_at_source.back().map(|(id, _)| id.clone());
match (
header_required_for_messages_delivery,
header_required_for_reward_confirmations_delivery,
) {
(Some(id1), Some(id2)) => Some(if id1.0 > id2.0 { id1 } else { id2 }),
(a, b) => a.or(b),
}
}
fn best_at_source(&self) -> Option<MessageNonce> {
@@ -876,4 +885,46 @@ mod tests {
Some(((20..=23), proof_parameters(true, 4)))
);
}
#[test]
fn source_header_is_requied_when_confirmations_are_required() {
// let's prepare situation when:
// - all messages [20; 23] have been generated at source block#1;
let (mut state, mut strategy) = prepare_strategy();
// - messages [20; 21] have been delivered, but messages [11; 20] can't be delivered because of unrewarded
// relayers vector capacity;
strategy.max_unconfirmed_nonces_at_target = 2;
assert_eq!(
strategy.select_nonces_to_deliver(&state),
Some(((20..=21), proof_parameters(false, 2)))
);
strategy.finalized_target_nonces_updated(
TargetClientNonces {
latest_nonce: 21,
nonces_data: DeliveryRaceTargetNoncesData {
confirmed_nonce: 19,
unrewarded_relayers: UnrewardedRelayersState {
unrewarded_relayer_entries: 2,
messages_in_oldest_entry: 2,
total_messages: 2,
},
},
},
&mut state,
);
assert_eq!(strategy.select_nonces_to_deliver(&state), None);
// - messages [1; 10] receiving confirmation has been delivered at source block#2;
strategy.source_nonces_updated(
header_id(2),
SourceClientNonces {
new_nonces: BTreeMap::new(),
confirmed_nonce: Some(21),
},
);
// - so now we'll need to relay source block#11 to be able to accept messages [11; 20].
assert_eq!(
strategy.required_source_header_at_target(&header_id(1)),
Some(header_id(2))
);
}
}
@@ -16,7 +16,11 @@
//! Relayer initialization functions.
use std::{fmt::Display, io::Write};
use std::{cell::RefCell, fmt::Display, io::Write};
async_std::task_local! {
pub(crate) static LOOP_NAME: RefCell<String> = RefCell::new(String::default());
}
/// Initialize relay environment.
pub fn initialize_relay() {
@@ -43,20 +47,56 @@ pub fn initialize_logger(with_timestamp: bool) {
Either::Right(ansi_term::Colour::Fixed(8).bold().paint(timestamp))
};
writeln!(buf, "{} {} {} {}", timestamp, log_level, log_target, record.args(),)
writeln!(
buf,
"{}{} {} {} {}",
loop_name_prefix(),
timestamp,
log_level,
log_target,
record.args(),
)
});
} else {
builder.format(move |buf, record| {
let log_level = color_level(record.level());
let log_target = color_target(record.target());
writeln!(buf, "{} {} {}", log_level, log_target, record.args(),)
writeln!(
buf,
"{}{} {} {}",
loop_name_prefix(),
log_level,
log_target,
record.args(),
)
});
}
builder.init();
}
/// Initialize relay loop. Must only be called once per every loop task.
pub(crate) fn initialize_loop(loop_name: String) {
LOOP_NAME.with(|g_loop_name| *g_loop_name.borrow_mut() = loop_name);
}
/// Returns loop name prefix to use in logs. The prefix is initialized with the `initialize_loop` call.
fn loop_name_prefix() -> String {
// try_with to avoid panic outside of async-std task context
LOOP_NAME
.try_with(|loop_name| {
// using borrow is ok here, because loop is only initialized once (=> borrow_mut will only be called once)
let loop_name = loop_name.borrow();
if loop_name.is_empty() {
String::new()
} else {
format!("[{}] ", loop_name)
}
})
.unwrap_or_else(|_| String::new())
}
enum Either<A, B> {
Left(A),
Right(B),
+94 -52
View File
@@ -26,9 +26,9 @@ pub const RECONNECT_DELAY: Duration = Duration::from_secs(10);
/// Basic blockchain client from relay perspective.
#[async_trait]
pub trait Client: Clone + Send + Sync {
pub trait Client: 'static + Clone + Send + Sync {
/// Type of error this clients returns.
type Error: Debug + MaybeConnectionError;
type Error: 'static + Debug + MaybeConnectionError + Send + Sync;
/// Try to reconnect to source node.
async fn reconnect(&mut self) -> Result<(), Self::Error>;
@@ -38,6 +38,7 @@ pub trait Client: Clone + Send + Sync {
pub fn relay_loop<SC, TC>(source_client: SC, target_client: TC) -> Loop<SC, TC, ()> {
Loop {
reconnect_delay: RECONNECT_DELAY,
spawn_loop_task: true,
source_client,
target_client,
loop_metric: None,
@@ -49,6 +50,7 @@ pub fn relay_metrics(prefix: Option<String>, params: MetricsParams) -> LoopMetri
LoopMetrics {
relay_loop: Loop {
reconnect_delay: RECONNECT_DELAY,
spawn_loop_task: true,
source_client: (),
target_client: (),
loop_metric: None,
@@ -63,6 +65,7 @@ pub fn relay_metrics(prefix: Option<String>, params: MetricsParams) -> LoopMetri
/// Generic relay loop.
pub struct Loop<SC, TC, LM> {
reconnect_delay: Duration,
spawn_loop_task: bool,
source_client: SC,
target_client: TC,
loop_metric: Option<LM>,
@@ -84,11 +87,23 @@ impl<SC, TC, LM> Loop<SC, TC, LM> {
self
}
/// Set spawn-dedicated-loop-task flag.
///
/// If `true` (default), separate async task is spawned to run relay loop. This is the default
/// behavior for all loops. If `false`, then loop is executed as a part of the current
/// task. The `false` is used for on-demand tasks, which are cancelled from time to time
/// and there's already a dedicated on-demand task for running such loops.
pub fn spawn_loop_task(mut self, spawn_loop_task: bool) -> Self {
self.spawn_loop_task = spawn_loop_task;
self
}
/// Start building loop metrics using given prefix.
pub fn with_metrics(self, prefix: Option<String>, params: MetricsParams) -> LoopMetrics<SC, TC, ()> {
LoopMetrics {
relay_loop: Loop {
reconnect_delay: self.reconnect_delay,
spawn_loop_task: self.spawn_loop_task,
source_client: self.source_client,
target_client: self.target_client,
loop_metric: None,
@@ -105,63 +120,47 @@ impl<SC, TC, LM> Loop<SC, TC, LM> {
/// This function represents an outer loop, which in turn calls provided `run_loop` function to do
/// actual job. When `run_loop` returns, this outer loop reconnects to failed client (source,
/// target or both) and calls `run_loop` again.
pub async fn run<R, F>(mut self, run_loop: R) -> Result<(), String>
pub async fn run<R, F>(mut self, loop_name: String, run_loop: R) -> Result<(), String>
where
R: Fn(SC, TC, Option<LM>) -> F,
F: Future<Output = Result<(), FailedClient>>,
SC: Client,
TC: Client,
LM: Clone,
R: 'static + Send + Fn(SC, TC, Option<LM>) -> F,
F: 'static + Send + Future<Output = Result<(), FailedClient>>,
SC: 'static + Client,
TC: 'static + Client,
LM: 'static + Send + Clone,
{
loop {
let result = run_loop(
self.source_client.clone(),
self.target_client.clone(),
self.loop_metric.clone(),
)
.await;
let spawn_loop_task = self.spawn_loop_task;
let run_loop_task = async move {
crate::initialize::initialize_loop(loop_name);
match result {
Ok(()) => break,
Err(failed_client) => loop {
async_std::task::sleep(self.reconnect_delay).await;
if failed_client == FailedClient::Both || failed_client == FailedClient::Source {
match self.source_client.reconnect().await {
Ok(()) => (),
Err(error) => {
log::warn!(
target: "bridge",
"Failed to reconnect to source client. Going to retry in {}s: {:?}",
self.reconnect_delay.as_secs(),
error,
);
continue;
}
}
}
if failed_client == FailedClient::Both || failed_client == FailedClient::Target {
match self.target_client.reconnect().await {
Ok(()) => (),
Err(error) => {
log::warn!(
target: "bridge",
"Failed to reconnect to target client. Going to retry in {}s: {:?}",
self.reconnect_delay.as_secs(),
error,
);
continue;
}
}
}
loop {
let loop_metric = self.loop_metric.clone();
let future_result = run_loop(self.source_client.clone(), self.target_client.clone(), loop_metric);
let result = future_result.await;
break;
},
match result {
Ok(()) => break,
Err(failed_client) => {
reconnect_failed_client(
failed_client,
self.reconnect_delay,
&mut self.source_client,
&mut self.target_client,
)
.await
}
}
log::debug!(target: "bridge", "Restarting relay loop");
}
log::debug!(target: "bridge", "Restarting relay loop");
}
Ok(())
};
Ok(())
if spawn_loop_task {
async_std::task::spawn(run_loop_task).await
} else {
run_loop_task.await
}
}
}
@@ -237,6 +236,7 @@ impl<SC, TC, LM> LoopMetrics<SC, TC, LM> {
Ok(Loop {
reconnect_delay: self.relay_loop.reconnect_delay,
spawn_loop_task: self.relay_loop.spawn_loop_task,
source_client: self.relay_loop.source_client,
target_client: self.relay_loop.target_client,
loop_metric: self.loop_metric,
@@ -244,6 +244,48 @@ impl<SC, TC, LM> LoopMetrics<SC, TC, LM> {
}
}
/// Deal with the client who has returned connection error.
pub async fn reconnect_failed_client(
failed_client: FailedClient,
reconnect_delay: Duration,
source_client: &mut impl Client,
target_client: &mut impl Client,
) {
loop {
async_std::task::sleep(reconnect_delay).await;
if failed_client == FailedClient::Both || failed_client == FailedClient::Source {
match source_client.reconnect().await {
Ok(()) => (),
Err(error) => {
log::warn!(
target: "bridge",
"Failed to reconnect to source client. Going to retry in {}s: {:?}",
reconnect_delay.as_secs(),
error,
);
continue;
}
}
}
if failed_client == FailedClient::Both || failed_client == FailedClient::Target {
match target_client.reconnect().await {
Ok(()) => (),
Err(error) => {
log::warn!(
target: "bridge",
"Failed to reconnect to target client. Going to retry in {}s: {:?}",
reconnect_delay.as_secs(),
error,
);
continue;
}
}
}
break;
}
}
/// Create new registry with global metrics.
fn create_metrics_registry(prefix: Option<String>) -> Registry {
match prefix {