Extract (headers, exchange, messages) relay loops into separate crates (#357)

* extracted relay crates

* moved metrics to utils

* exchange-relay compilation

* fix compilation of headers-relay

* fixed messages-relay compilation

* fixed ethereum-poa-relay compilation

* cargo lock

* cargo fmt --all

* clippy

* cargo fmt --all

* fix tests compilation

* clippy

* eof

* module level docs

* removed obsolete comment

* #![warn(missing_docs)]

* .0 -> Deref

* post-merge fix

* cargo fmt

* Update relays/headers-relay/src/headers.rs

Co-authored-by: Hernando Castano <HCastano@users.noreply.github.com>

* Update relays/headers-relay/src/headers.rs

Co-authored-by: Hernando Castano <HCastano@users.noreply.github.com>

* Update relays/headers-relay/src/lib.rs

Co-authored-by: Hernando Castano <HCastano@users.noreply.github.com>

Co-authored-by: Hernando Castano <HCastano@users.noreply.github.com>
This commit is contained in:
Svyatoslav Nikolsky
2020-09-18 10:34:08 +03:00
committed by Bastian Köcher
parent fa2abfb140
commit d614cdaba8
35 changed files with 435 additions and 241 deletions
+4 -6
View File
@@ -8,9 +8,7 @@ license = "GPL-3.0-or-later WITH Classpath-exception-2.0"
[dependencies]
ansi_term = "0.12"
async-std = "1.6.2"
async-stream = "0.3.0"
async-trait = "0.1.40"
backoff = "0.2"
bp-currency-exchange = { path = "../../primitives/currency-exchange" }
bp-eth-poa = { path = "../../primitives/ethereum-poa" }
clap = { version = "2.33.3", features = ["yaml"] }
@@ -20,18 +18,18 @@ ethabi = "12.0"
ethabi-contract = "11.0"
ethabi-derive = "12.0"
ethereum-tx-sign = "3.0"
exchange-relay = { path = "../exchange-relay" }
futures = "0.3.5"
headers-relay = { path = "../headers-relay" }
hex = "0.4"
hex-literal = "0.3"
linked-hash-map = "0.5.3"
log = "0.4.11"
messages-relay = { path = "../messages-relay" }
num-traits = "0.2"
parity-crypto = { version = "0.6", features = ["publickey"] }
parking_lot = "0.11.0"
rustc-hex = "2.0.1"
relay-utils = { path = "../utils" }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0.57"
sysinfo = "0.15"
time = "0.2"
web3 = "0.13"
+16 -14
View File
@@ -21,17 +21,16 @@ use crate::ethereum_types::{
use crate::rpc::{Ethereum, EthereumRpc};
use crate::rpc_errors::{EthereumNodeError, RpcError};
use crate::substrate_types::{GrandpaJustification, Hash as SubstrateHash, QueuedSubstrateHeader, SubstrateHeaderId};
use crate::sync_types::SubmittedHeaders;
use crate::utils::{HeaderId, MaybeConnectionError};
use async_trait::async_trait;
use codec::{Decode, Encode};
use ethabi::FunctionOutputDecoder;
use headers_relay::sync_types::SubmittedHeaders;
use jsonrpsee::raw::RawClient;
use jsonrpsee::transport::http::HttpTransportClient;
use jsonrpsee::Client;
use parity_crypto::publickey::KeyPair;
use relay_utils::{HeaderId, MaybeConnectionError};
use std::collections::HashSet;
// to encode/decode contract calls
@@ -693,17 +692,20 @@ mod tests {
}
fn header(number: SubstrateBlockNumber) -> QueuedSubstrateHeader {
QueuedSubstrateHeader::new(SubstrateHeader::new(
number,
Default::default(),
Default::default(),
if number == 0 {
Default::default()
} else {
header(number - 1).id().1
},
Default::default(),
))
QueuedSubstrateHeader::new(
SubstrateHeader::new(
number,
Default::default(),
Default::default(),
if number == 0 {
Default::default()
} else {
header(number - 1).id().1
},
Default::default(),
)
.into(),
)
}
#[test]
@@ -21,10 +21,10 @@ use crate::instances::BridgeInstance;
use crate::rpc::SubstrateRpc;
use crate::substrate_client::{SubstrateConnectionParams, SubstrateRpcClient};
use crate::substrate_types::{Hash as SubstrateHash, Header as SubstrateHeader, SubstrateHeaderId};
use crate::utils::HeaderId;
use codec::{Decode, Encode};
use num_traits::Zero;
use relay_utils::HeaderId;
/// Ethereum synchronization parameters.
#[derive(Debug)]
@@ -21,23 +21,22 @@ use crate::ethereum_types::{
EthereumHeaderId, HeaderWithTransactions as EthereumHeaderWithTransactions, Transaction as EthereumTransaction,
TransactionHash as EthereumTransactionHash, H256,
};
use crate::exchange::{
relay_single_transaction_proof, SourceBlock, SourceClient, SourceTransaction, TargetClient,
TransactionProofPipeline,
};
use crate::exchange_loop::{run as run_loop, InMemoryStorage};
use crate::instances::BridgeInstance;
use crate::metrics::MetricsParams;
use crate::rpc::{EthereumRpc, SubstrateRpc};
use crate::rpc_errors::RpcError;
use crate::substrate_client::{
SubmitEthereumExchangeTransactionProof, SubstrateConnectionParams, SubstrateRpcClient, SubstrateSigningParams,
};
use crate::substrate_types::into_substrate_ethereum_receipt;
use crate::utils::HeaderId;
use async_trait::async_trait;
use bp_currency_exchange::MaybeLockFundsTransaction;
use exchange_relay::exchange::{
relay_single_transaction_proof, SourceBlock, SourceClient, SourceTransaction, TargetClient,
TransactionProofPipeline,
};
use exchange_relay::exchange_loop::{run as run_loop, InMemoryStorage};
use relay_utils::{metrics::MetricsParams, HeaderId};
use rialto_runtime::exchange::EthereumTransactionInclusionProof;
use std::time::Duration;
@@ -17,20 +17,24 @@
//! Ethereum PoA -> Substrate synchronization.
use crate::ethereum_client::{EthereumConnectionParams, EthereumHighLevelRpc, EthereumRpcClient};
use crate::ethereum_types::{EthereumHeaderId, EthereumHeadersSyncPipeline, Header, QueuedEthereumHeader, Receipt};
use crate::ethereum_types::{
EthereumHeaderId, EthereumHeadersSyncPipeline, EthereumSyncHeader as Header, QueuedEthereumHeader, Receipt,
};
use crate::instances::BridgeInstance;
use crate::metrics::MetricsParams;
use crate::rpc::{EthereumRpc, SubstrateRpc};
use crate::rpc_errors::RpcError;
use crate::substrate_client::{
SubmitEthereumHeaders, SubstrateConnectionParams, SubstrateRpcClient, SubstrateSigningParams,
};
use crate::substrate_types::into_substrate_ethereum_header;
use crate::sync::{HeadersSyncParams, TargetTransactionMode};
use crate::sync_loop::{SourceClient, TargetClient};
use crate::sync_types::{SourceHeader, SubmittedHeaders};
use async_trait::async_trait;
use headers_relay::{
sync::{HeadersSyncParams, TargetTransactionMode},
sync_loop::{SourceClient, TargetClient},
sync_types::{SourceHeader, SubmittedHeaders},
};
use relay_utils::metrics::MetricsParams;
use web3::types::H256;
use std::fmt::Debug;
@@ -95,11 +99,11 @@ impl SourceClient<EthereumHeadersSyncPipeline> for EthereumHeadersSource {
}
async fn header_by_hash(&self, hash: H256) -> Result<Header, Self::Error> {
self.client.header_by_hash(hash).await
self.client.header_by_hash(hash).await.map(Into::into)
}
async fn header_by_number(&self, number: u64) -> Result<Header, Self::Error> {
self.client.header_by_number(number).await
self.client.header_by_number(number).await.map(Into::into)
}
async fn header_completion(&self, id: EthereumHeaderId) -> Result<(EthereumHeaderId, Option<()>), Self::Error> {
@@ -199,7 +203,7 @@ pub fn run(params: EthereumSyncParams) -> Result<(), RpcError> {
let source = EthereumHeadersSource::new(eth_client);
let target = SubstrateHeadersTarget::new(sub_client, sign_sub_transactions, sub_sign);
crate::sync_loop::run(
headers_relay::sync_loop::run(
source,
consts::ETHEREUM_TICK_INTERVAL,
target,
+22 -4
View File
@@ -15,9 +15,10 @@
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
use crate::substrate_types::{into_substrate_ethereum_header, into_substrate_ethereum_receipts};
use crate::sync_types::{HeadersSyncPipeline, QueuedHeader, SourceHeader};
use crate::utils::HeaderId;
use codec::Encode;
use headers_relay::sync_types::{HeadersSyncPipeline, QueuedHeader, SourceHeader};
use relay_utils::HeaderId;
pub use web3::types::{Address, Bytes, CallRequest, H256, U128, U256, U64};
@@ -34,6 +35,17 @@ pub type Transaction = web3::types::Transaction;
/// Ethereum header type.
pub type Header = web3::types::Block<H256>;
/// Ethereum header type used in headers sync.
#[derive(Clone, Debug, PartialEq)]
pub struct EthereumSyncHeader(Header);
impl std::ops::Deref for EthereumSyncHeader {
type Target = Header;
fn deref(&self) -> &Self::Target {
&self.0
}
}
/// Ethereum header with transactions type.
pub type HeaderWithTransactions = web3::types::Block<Transaction>;
@@ -60,7 +72,7 @@ impl HeadersSyncPipeline for EthereumHeadersSyncPipeline {
type Hash = H256;
type Number = u64;
type Header = Header;
type Header = EthereumSyncHeader;
type Extra = Vec<Receipt>;
type Completion = ();
@@ -72,7 +84,13 @@ impl HeadersSyncPipeline for EthereumHeadersSyncPipeline {
}
}
impl SourceHeader<H256, u64> for Header {
impl From<Header> for EthereumSyncHeader {
fn from(header: Header) -> Self {
Self(header)
}
}
impl SourceHeader<H256, u64> for EthereumSyncHeader {
fn id(&self) -> EthereumHeaderId {
HeaderId(
self.number.expect(HEADER_ID_PROOF).as_u64(),
+2 -2
View File
@@ -53,7 +53,7 @@ impl BridgeInstance for Rialto {
.into_iter()
.map(|header| {
(
into_substrate_ethereum_header(header.header()),
into_substrate_ethereum_header(&header.header()),
into_substrate_ethereum_receipts(header.extra()),
)
})
@@ -65,7 +65,7 @@ impl BridgeInstance for Rialto {
fn build_unsigned_header_call(&self, header: QueuedEthereumHeader) -> Call {
let pallet_call = rialto_runtime::BridgeEthPoACall::import_unsigned_header(
into_substrate_ethereum_header(header.header()),
into_substrate_ethereum_header(&header.header()),
into_substrate_ethereum_receipts(header.extra()),
);
+29 -25
View File
@@ -22,42 +22,28 @@ mod ethereum_exchange;
mod ethereum_exchange_submit;
mod ethereum_sync_loop;
mod ethereum_types;
mod exchange;
mod exchange_loop;
mod exchange_loop_metrics;
mod headers;
mod instances;
mod message_lane;
mod message_lane_loop;
mod message_race_delivery;
mod message_race_loop;
mod message_race_receiving;
mod metrics;
mod rpc;
mod rpc_errors;
mod substrate_client;
mod substrate_sync_loop;
mod substrate_types;
mod sync;
mod sync_loop;
mod sync_loop_metrics;
mod sync_loop_tests;
mod sync_types;
mod utils;
use ethereum_client::{EthereumConnectionParams, EthereumSigningParams};
use ethereum_deploy_contract::EthereumDeployContractParams;
use ethereum_exchange::EthereumExchangeParams;
use ethereum_exchange_submit::EthereumExchangeSubmitParams;
use ethereum_sync_loop::EthereumSyncParams;
use headers_relay::sync::TargetTransactionMode;
use hex_literal::hex;
use instances::{BridgeInstance, Kovan, Rialto};
use parity_crypto::publickey::{KeyPair, Secret};
use relay_utils::metrics::MetricsParams;
use sp_core::crypto::Pair;
use substrate_client::{SubstrateConnectionParams, SubstrateSigningParams};
use substrate_sync_loop::SubstrateSyncParams;
use sync::HeadersSyncParams;
use headers_relay::sync::HeadersSyncParams;
use std::io::Write;
fn main() {
@@ -223,19 +209,28 @@ fn substrate_signing_params(matches: &clap::ArgMatches) -> Result<SubstrateSigni
}
fn ethereum_sync_params(matches: &clap::ArgMatches) -> Result<EthereumSyncParams, String> {
let mut sync_params = HeadersSyncParams::ethereum_sync_default();
use crate::ethereum_sync_loop::consts::*;
let mut sync_params = HeadersSyncParams {
max_future_headers_to_download: MAX_FUTURE_HEADERS_TO_DOWNLOAD,
max_headers_in_submitted_status: MAX_SUBMITTED_HEADERS,
max_headers_in_single_submit: MAX_HEADERS_IN_SINGLE_SUBMIT,
max_headers_size_in_single_submit: MAX_HEADERS_SIZE_IN_SINGLE_SUBMIT,
prune_depth: PRUNE_DEPTH,
target_tx_mode: TargetTransactionMode::Signed,
};
match matches.value_of("sub-tx-mode") {
Some("signed") => sync_params.target_tx_mode = sync::TargetTransactionMode::Signed,
Some("signed") => sync_params.target_tx_mode = TargetTransactionMode::Signed,
Some("unsigned") => {
sync_params.target_tx_mode = sync::TargetTransactionMode::Unsigned;
sync_params.target_tx_mode = TargetTransactionMode::Unsigned;
// tx pool won't accept too much unsigned transactions
sync_params.max_headers_in_submitted_status = 10;
}
Some("backup") => sync_params.target_tx_mode = sync::TargetTransactionMode::Backup,
Some("backup") => sync_params.target_tx_mode = TargetTransactionMode::Backup,
Some(mode) => return Err(format!("Invalid sub-tx-mode: {}", mode)),
None => sync_params.target_tx_mode = sync::TargetTransactionMode::Signed,
None => sync_params.target_tx_mode = TargetTransactionMode::Signed,
}
let params = EthereumSyncParams {
@@ -253,6 +248,8 @@ fn ethereum_sync_params(matches: &clap::ArgMatches) -> Result<EthereumSyncParams
}
fn substrate_sync_params(matches: &clap::ArgMatches) -> Result<SubstrateSyncParams, String> {
use crate::substrate_sync_loop::consts::*;
let eth_contract_address: ethereum_types::Address = if let Some(eth_contract) = matches.value_of("eth-contract") {
eth_contract.parse().map_err(|e| format!("{}", e))?
} else {
@@ -267,7 +264,14 @@ fn substrate_sync_params(matches: &clap::ArgMatches) -> Result<SubstrateSyncPara
eth_sign: ethereum_signing_params(matches)?,
metrics_params: metrics_params(matches)?,
instance: instance_params(matches)?,
sync_params: HeadersSyncParams::substrate_sync_default(),
sync_params: HeadersSyncParams {
max_future_headers_to_download: MAX_FUTURE_HEADERS_TO_DOWNLOAD,
max_headers_in_submitted_status: MAX_SUBMITTED_HEADERS,
max_headers_in_single_submit: 4,
max_headers_size_in_single_submit: std::usize::MAX,
prune_depth: PRUNE_DEPTH,
target_tx_mode: TargetTransactionMode::Signed,
},
eth_contract_address,
};
@@ -388,12 +392,12 @@ fn ethereum_exchange_params(matches: &clap::ArgMatches) -> Result<EthereumExchan
Ok(params)
}
fn metrics_params(matches: &clap::ArgMatches) -> Result<Option<metrics::MetricsParams>, String> {
fn metrics_params(matches: &clap::ArgMatches) -> Result<Option<MetricsParams>, String> {
if matches.is_present("no-prometheus") {
return Ok(None);
}
let mut metrics_params = metrics::MetricsParams::default();
let mut metrics_params = MetricsParams::default();
if let Some(prometheus_host) = matches.value_of("prometheus-host") {
metrics_params.host = prometheus_host.into();
+1 -2
View File
@@ -14,9 +14,8 @@
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
use crate::utils::MaybeConnectionError;
use jsonrpsee::client::RequestError;
use relay_utils::MaybeConnectionError;
/// Contains common errors that can occur when
/// interacting with a Substrate or Ethereum node
@@ -19,16 +19,16 @@ use crate::instances::BridgeInstance;
use crate::rpc::{Substrate, SubstrateRpc};
use crate::rpc_errors::RpcError;
use crate::substrate_types::{Hash, Header as SubstrateHeader, Number, SignedBlock as SignedSubstrateBlock};
use crate::sync_types::SubmittedHeaders;
use crate::utils::HeaderId;
use async_trait::async_trait;
use bp_eth_poa::AuraHeader as SubstrateEthereumHeader;
use codec::{Decode, Encode};
use headers_relay::sync_types::SubmittedHeaders;
use jsonrpsee::raw::RawClient;
use jsonrpsee::transport::http::HttpTransportClient;
use jsonrpsee::Client;
use num_traits::Zero;
use relay_utils::HeaderId;
use sp_core::crypto::Pair;
use sp_runtime::traits::IdentifyAccount;
use std::collections::VecDeque;
@@ -21,18 +21,21 @@ use crate::ethereum_client::{
};
use crate::ethereum_types::Address;
use crate::instances::BridgeInstance;
use crate::metrics::MetricsParams;
use crate::rpc::SubstrateRpc;
use crate::rpc_errors::RpcError;
use crate::substrate_client::{SubstrateConnectionParams, SubstrateRpcClient};
use crate::substrate_types::{
GrandpaJustification, Hash, Header, Number, QueuedSubstrateHeader, SubstrateHeaderId, SubstrateHeadersSyncPipeline,
GrandpaJustification, Hash, Number, QueuedSubstrateHeader, SubstrateHeaderId, SubstrateHeadersSyncPipeline,
SubstrateSyncHeader as Header,
};
use crate::sync::HeadersSyncParams;
use crate::sync_loop::{SourceClient, TargetClient};
use crate::sync_types::{SourceHeader, SubmittedHeaders};
use async_trait::async_trait;
use headers_relay::{
sync::HeadersSyncParams,
sync_loop::{SourceClient, TargetClient},
sync_types::{SourceHeader, SubmittedHeaders},
};
use relay_utils::metrics::MetricsParams;
use std::fmt::Debug;
use std::{collections::HashSet, time::Duration};
@@ -92,11 +95,11 @@ impl SourceClient<SubstrateHeadersSyncPipeline> for SubstrateHeadersSource {
}
async fn header_by_hash(&self, hash: Hash) -> Result<Header, Self::Error> {
self.client.header_by_hash(hash).await
self.client.header_by_hash(hash).await.map(Into::into)
}
async fn header_by_number(&self, number: Number) -> Result<Header, Self::Error> {
self.client.header_by_number(number).await
self.client.header_by_number(number).await.map(Into::into)
}
async fn header_completion(
@@ -197,7 +200,7 @@ pub fn run(params: SubstrateSyncParams) -> Result<(), RpcError> {
let target = EthereumHeadersTarget::new(eth_client, eth_contract_address, eth_sign);
let source = SubstrateHeadersSource::new(sub_client);
crate::sync_loop::run(
headers_relay::sync_loop::run(
source,
consts::SUBSTRATE_TICK_INTERVAL,
target,
+21 -4
View File
@@ -17,10 +17,10 @@
use crate::ethereum_types::{
Header as EthereumHeader, Receipt as EthereumReceipt, HEADER_ID_PROOF as ETHEREUM_HEADER_ID_PROOF,
};
use crate::sync_types::{HeadersSyncPipeline, QueuedHeader, SourceHeader};
use crate::utils::HeaderId;
use codec::Encode;
use headers_relay::sync_types::{HeadersSyncPipeline, QueuedHeader, SourceHeader};
use relay_utils::HeaderId;
pub use bp_eth_poa::{
Address, AuraHeader as SubstrateEthereumHeader, Bloom, Bytes, LogEntry as SubstrateEthereumLogEntry,
@@ -36,6 +36,17 @@ pub type Number = rialto_runtime::BlockNumber;
/// Substrate header type.
pub type Header = rialto_runtime::Header;
/// Substrate header type used in headers sync.
#[derive(Clone, Debug, PartialEq)]
pub struct SubstrateSyncHeader(Header);
impl std::ops::Deref for SubstrateSyncHeader {
type Target = Header;
fn deref(&self) -> &Self::Target {
&self.0
}
}
/// Substrate signed block type.
pub type SignedBlock = rialto_runtime::SignedBlock;
@@ -59,7 +70,7 @@ impl HeadersSyncPipeline for SubstrateHeadersSyncPipeline {
type Hash = rialto_runtime::Hash;
type Number = rialto_runtime::BlockNumber;
type Header = Header;
type Header = SubstrateSyncHeader;
type Extra = ();
type Completion = GrandpaJustification;
@@ -68,7 +79,13 @@ impl HeadersSyncPipeline for SubstrateHeadersSyncPipeline {
}
}
impl SourceHeader<rialto_runtime::Hash, rialto_runtime::BlockNumber> for Header {
impl From<Header> for SubstrateSyncHeader {
fn from(header: Header) -> Self {
Self(header)
}
}
impl SourceHeader<rialto_runtime::Hash, rialto_runtime::BlockNumber> for SubstrateSyncHeader {
fn id(&self) -> SubstrateHeaderId {
HeaderId(self.number, self.hash())
}
+16
View File
@@ -0,0 +1,16 @@
[package]
name = "exchange-relay"
version = "0.1.0"
authors = ["Parity Technologies <admin@parity.io>"]
edition = "2018"
license = "GPL-3.0-or-later WITH Classpath-exception-2.0"
[dependencies]
async-std = "1.6.2"
async-trait = "0.1.40"
backoff = "0.2"
futures = "0.3.5"
log = "0.4.11"
num-traits = "0.2"
parking_lot = "0.11.0"
relay-utils = { path = "../utils" }
@@ -16,9 +16,8 @@
//! Relaying proofs of exchange transaction.
use crate::utils::{MaybeConnectionError, StringifiedMaybeConnectionError};
use async_trait::async_trait;
use relay_utils::{MaybeConnectionError, StringifiedMaybeConnectionError};
use std::{
fmt::{Debug, Display},
string::ToString,
@@ -54,7 +53,7 @@ pub trait SourceBlock {
type Transaction: SourceTransaction;
/// Return hash of the block.
fn id(&self) -> crate::utils::HeaderId<Self::Hash, Self::Number>;
fn id(&self) -> relay_utils::HeaderId<Self::Hash, Self::Number>;
/// Return block transactions iterator.
fn transactions(&self) -> Vec<Self::Transaction>;
}
@@ -81,7 +80,7 @@ pub type TransactionOf<P> = <<P as TransactionProofPipeline>::Block as SourceBlo
pub type TransactionHashOf<P> = <TransactionOf<P> as SourceTransaction>::Hash;
/// Header id.
pub type HeaderId<P> = crate::utils::HeaderId<BlockHashOf<P>, BlockNumberOf<P>>;
pub type HeaderId<P> = relay_utils::HeaderId<BlockHashOf<P>, BlockNumberOf<P>>;
/// Source client API.
#[async_trait]
@@ -443,9 +442,9 @@ async fn wait_header_finalized<P: TransactionProofPipeline>(
#[cfg(test)]
pub(crate) mod tests {
use super::*;
use crate::utils::HeaderId;
use parking_lot::Mutex;
use relay_utils::HeaderId;
use std::{
collections::{HashMap, HashSet},
sync::Arc,
@@ -21,12 +21,14 @@ use crate::exchange::{
TransactionProofPipeline,
};
use crate::exchange_loop_metrics::ExchangeLoopMetrics;
use crate::metrics::{start as metrics_start, GlobalMetrics, MetricsParams};
use crate::utils::retry_backoff;
use backoff::backoff::Backoff;
use futures::{future::FutureExt, select};
use num_traits::One;
use relay_utils::{
metrics::{start as metrics_start, GlobalMetrics, MetricsParams},
retry_backoff,
};
use std::{future::Future, time::Duration};
/// Delay after connection-related error happened before we'll try
@@ -95,8 +97,8 @@ pub fn run<P: TransactionProofPipeline>(
let mut state = storage.state();
let mut current_finalized_block = None;
let mut metrics_global = GlobalMetrics::new();
let mut metrics_exch = ExchangeLoopMetrics::new();
let mut metrics_global = GlobalMetrics::default();
let mut metrics_exch = ExchangeLoopMetrics::default();
let metrics_enabled = metrics_params.is_some();
metrics_start(
format!("{}_to_{}_Exchange", P::SOURCE_NAME, P::TARGET_NAME),
@@ -14,8 +14,10 @@
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
//! Metrics for currency-exchange relay loop.
use crate::exchange::{BlockNumberOf, RelayedBlockTransactions, TransactionProofPipeline};
use crate::metrics::{register, Counter, CounterVec, GaugeVec, Metrics, Opts, Registry, U64};
use relay_utils::metrics::{register, Counter, CounterVec, GaugeVec, Metrics, Opts, Registry, U64};
/// Exchange transactions relay metrics.
pub struct ExchangeLoopMetrics {
@@ -36,9 +38,8 @@ impl Metrics for ExchangeLoopMetrics {
}
}
impl ExchangeLoopMetrics {
/// Creates sync loop metrics.
pub fn new() -> Self {
impl Default for ExchangeLoopMetrics {
fn default() -> Self {
ExchangeLoopMetrics {
best_block_numbers: GaugeVec::new(
Opts::new("best_block_numbers", "Best finalized block numbers"),
@@ -54,7 +55,9 @@ impl ExchangeLoopMetrics {
.expect("metric is static and thus valid; qed"),
}
}
}
impl ExchangeLoopMetrics {
/// Update metrics when single block is relayed.
pub fn update<P: TransactionProofPipeline>(
&mut self,
+26
View File
@@ -0,0 +1,26 @@
// Copyright 2019-2020 Parity Technologies (UK) Ltd.
// This file is part of Parity Bridges Common.
// Parity Bridges Common is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Bridges Common is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
//! Relaying [`currency-exchange`](../pallet_bridge_currency_exchange/index.html) application
//! specific data. Currency exchange application allows exchanging tokens between bridged chains.
//! This module provides entrypoints for crafting and submitting (single and multiple)
//! proof-of-exchange-at-source-chain transaction(s) to target chain.
#![warn(missing_docs)]
pub mod exchange;
pub mod exchange_loop;
pub mod exchange_loop_metrics;
+17
View File
@@ -0,0 +1,17 @@
[package]
name = "headers-relay"
version = "0.1.0"
authors = ["Parity Technologies <admin@parity.io>"]
edition = "2018"
license = "GPL-3.0-or-later WITH Classpath-exception-2.0"
[dependencies]
async-std = "1.6.2"
async-trait = "0.1.40"
backoff = "0.2"
futures = "0.3.5"
linked-hash-map = "0.5.3"
log = "0.4.11"
num-traits = "0.2"
parking_lot = "0.11.0"
relay-utils = { path = "../utils" }
@@ -14,11 +14,17 @@
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
//! Headers queue - the intermediate buffer that is filled when headers are read
//! from the source chain. Headers are removed from the queue once they become
//! known to the target chain. Inside, there are several sub-queues, where headers
//! may stay until source/target chain state isn't updated. When a header reaches the
//! `ready` sub-queue, it may be submitted to the target chain.
use crate::sync_types::{HeaderIdOf, HeaderStatus, HeadersSyncPipeline, QueuedHeader, SourceHeader};
use crate::utils::HeaderId;
use linked_hash_map::LinkedHashMap;
use num_traits::{One, Zero};
use relay_utils::HeaderId;
use std::{
collections::{btree_map::Entry as BTreeMapEntry, hash_map::Entry as HashMapEntry, BTreeMap, HashMap, HashSet},
time::{Duration, Instant},
@@ -32,7 +38,7 @@ type KnownHeaders<P> =
/// We're trying to fetch completion data for single header at this interval.
const RETRY_FETCH_COMPLETION_INTERVAL: Duration = Duration::from_secs(20);
/// Ethereum headers queue.
/// Headers queue.
#[derive(Debug)]
pub struct QueuedHeaders<P: HeadersSyncPipeline> {
/// Headers that are received from source node, but we (native sync code) have
@@ -80,9 +86,8 @@ struct HeaderCompletion<Completion> {
pub completion: Completion,
}
impl<P: HeadersSyncPipeline> QueuedHeaders<P> {
/// Returns new QueuedHeaders.
pub fn new() -> Self {
impl<P: HeadersSyncPipeline> Default for QueuedHeaders<P> {
fn default() -> Self {
QueuedHeaders {
maybe_orphan: HeadersQueue::new(),
orphan: HeadersQueue::new(),
@@ -98,7 +103,9 @@ impl<P: HeadersSyncPipeline> QueuedHeaders<P> {
prune_border: Zero::zero(),
}
}
}
impl<P: HeadersSyncPipeline> QueuedHeaders<P> {
/// Returns prune border.
#[cfg(test)]
pub fn prune_border(&self) -> P::Number {
@@ -778,57 +785,56 @@ fn queued_incomplete_header<Id: Clone + Eq + std::hash::Hash, T>(
#[cfg(test)]
pub(crate) mod tests {
use super::*;
use crate::ethereum_types::{EthereumHeaderId, EthereumHeadersSyncPipeline, Header, H256};
use crate::sync_loop_tests::{TestHash, TestHeader, TestHeaderId, TestHeadersSyncPipeline, TestNumber};
use crate::sync_types::QueuedHeader;
pub(crate) fn header(number: u64) -> QueuedHeader<EthereumHeadersSyncPipeline> {
QueuedHeader::new(Header {
number: Some(number.into()),
hash: Some(hash(number)),
pub(crate) fn header(number: TestNumber) -> QueuedHeader<TestHeadersSyncPipeline> {
QueuedHeader::new(TestHeader {
number,
hash: hash(number),
parent_hash: hash(number - 1),
..Default::default()
})
}
pub(crate) fn hash(number: u64) -> H256 {
H256::from_low_u64_le(number)
pub(crate) fn hash(number: TestNumber) -> TestHash {
number
}
pub(crate) fn id(number: u64) -> EthereumHeaderId {
pub(crate) fn id(number: TestNumber) -> TestHeaderId {
HeaderId(number, hash(number))
}
#[test]
fn total_headers_works() {
// total headers just sums up number of headers in every queue
let mut queue = QueuedHeaders::<EthereumHeadersSyncPipeline>::new();
let mut queue = QueuedHeaders::<TestHeadersSyncPipeline>::default();
queue.maybe_orphan.entry(1).or_default().insert(
hash(1),
QueuedHeader::<EthereumHeadersSyncPipeline>::new(Default::default()),
QueuedHeader::<TestHeadersSyncPipeline>::new(Default::default()),
);
queue.maybe_orphan.entry(1).or_default().insert(
hash(2),
QueuedHeader::<EthereumHeadersSyncPipeline>::new(Default::default()),
QueuedHeader::<TestHeadersSyncPipeline>::new(Default::default()),
);
queue.maybe_orphan.entry(2).or_default().insert(
hash(3),
QueuedHeader::<EthereumHeadersSyncPipeline>::new(Default::default()),
QueuedHeader::<TestHeadersSyncPipeline>::new(Default::default()),
);
queue.orphan.entry(3).or_default().insert(
hash(4),
QueuedHeader::<EthereumHeadersSyncPipeline>::new(Default::default()),
QueuedHeader::<TestHeadersSyncPipeline>::new(Default::default()),
);
queue.maybe_extra.entry(4).or_default().insert(
hash(5),
QueuedHeader::<EthereumHeadersSyncPipeline>::new(Default::default()),
QueuedHeader::<TestHeadersSyncPipeline>::new(Default::default()),
);
queue.ready.entry(5).or_default().insert(
hash(6),
QueuedHeader::<EthereumHeadersSyncPipeline>::new(Default::default()),
QueuedHeader::<TestHeadersSyncPipeline>::new(Default::default()),
);
queue.incomplete.entry(6).or_default().insert(
hash(7),
QueuedHeader::<EthereumHeadersSyncPipeline>::new(Default::default()),
QueuedHeader::<TestHeadersSyncPipeline>::new(Default::default()),
);
assert_eq!(queue.total_headers(), 7);
}
@@ -836,48 +842,48 @@ pub(crate) mod tests {
#[test]
fn best_queued_number_works() {
// initially there are headers in MaybeOrphan queue only
let mut queue = QueuedHeaders::<EthereumHeadersSyncPipeline>::new();
let mut queue = QueuedHeaders::<TestHeadersSyncPipeline>::default();
queue.maybe_orphan.entry(1).or_default().insert(
hash(1),
QueuedHeader::<EthereumHeadersSyncPipeline>::new(Default::default()),
QueuedHeader::<TestHeadersSyncPipeline>::new(Default::default()),
);
queue.maybe_orphan.entry(1).or_default().insert(
hash(2),
QueuedHeader::<EthereumHeadersSyncPipeline>::new(Default::default()),
QueuedHeader::<TestHeadersSyncPipeline>::new(Default::default()),
);
queue.maybe_orphan.entry(3).or_default().insert(
hash(3),
QueuedHeader::<EthereumHeadersSyncPipeline>::new(Default::default()),
QueuedHeader::<TestHeadersSyncPipeline>::new(Default::default()),
);
assert_eq!(queue.best_queued_number(), 3);
// and then there's better header in Orphan
queue.orphan.entry(10).or_default().insert(
hash(10),
QueuedHeader::<EthereumHeadersSyncPipeline>::new(Default::default()),
QueuedHeader::<TestHeadersSyncPipeline>::new(Default::default()),
);
assert_eq!(queue.best_queued_number(), 10);
// and then there's better header in MaybeExtra
queue.maybe_extra.entry(20).or_default().insert(
hash(20),
QueuedHeader::<EthereumHeadersSyncPipeline>::new(Default::default()),
QueuedHeader::<TestHeadersSyncPipeline>::new(Default::default()),
);
assert_eq!(queue.best_queued_number(), 20);
// and then there's better header in Ready
queue.ready.entry(30).or_default().insert(
hash(30),
QueuedHeader::<EthereumHeadersSyncPipeline>::new(Default::default()),
QueuedHeader::<TestHeadersSyncPipeline>::new(Default::default()),
);
assert_eq!(queue.best_queued_number(), 30);
// and then there's better header in MaybeOrphan again
queue.maybe_orphan.entry(40).or_default().insert(
hash(40),
QueuedHeader::<EthereumHeadersSyncPipeline>::new(Default::default()),
QueuedHeader::<TestHeadersSyncPipeline>::new(Default::default()),
);
assert_eq!(queue.best_queued_number(), 40);
// and then there's some header in Incomplete
queue.incomplete.entry(50).or_default().insert(
hash(50),
QueuedHeader::<EthereumHeadersSyncPipeline>::new(Default::default()),
QueuedHeader::<TestHeadersSyncPipeline>::new(Default::default()),
);
assert_eq!(queue.best_queued_number(), 50);
}
@@ -885,7 +891,7 @@ pub(crate) mod tests {
#[test]
fn status_works() {
// all headers are unknown initially
let mut queue = QueuedHeaders::<EthereumHeadersSyncPipeline>::new();
let mut queue = QueuedHeaders::<TestHeadersSyncPipeline>::default();
assert_eq!(queue.status(&id(10)), HeaderStatus::Unknown);
// and status is read from the KnownHeaders
queue
@@ -899,22 +905,22 @@ pub(crate) mod tests {
#[test]
fn header_works() {
// initially we have oldest header #10
let mut queue = QueuedHeaders::<EthereumHeadersSyncPipeline>::new();
let mut queue = QueuedHeaders::<TestHeadersSyncPipeline>::default();
queue.maybe_orphan.entry(10).or_default().insert(hash(1), header(100));
assert_eq!(
queue.header(HeaderStatus::MaybeOrphan).unwrap().header().hash.unwrap(),
queue.header(HeaderStatus::MaybeOrphan).unwrap().header().hash,
hash(100)
);
// inserting #20 changes nothing
queue.maybe_orphan.entry(20).or_default().insert(hash(1), header(101));
assert_eq!(
queue.header(HeaderStatus::MaybeOrphan).unwrap().header().hash.unwrap(),
queue.header(HeaderStatus::MaybeOrphan).unwrap().header().hash,
hash(100)
);
// inserting #5 makes it oldest
queue.maybe_orphan.entry(5).or_default().insert(hash(1), header(102));
assert_eq!(
queue.header(HeaderStatus::MaybeOrphan).unwrap().header().hash.unwrap(),
queue.header(HeaderStatus::MaybeOrphan).unwrap().header().hash,
hash(102)
);
}
@@ -922,7 +928,7 @@ pub(crate) mod tests {
#[test]
fn header_response_works() {
// when parent is Synced, we insert to MaybeExtra
let mut queue = QueuedHeaders::<EthereumHeadersSyncPipeline>::new();
let mut queue = QueuedHeaders::<TestHeadersSyncPipeline>::default();
queue
.known_headers
.entry(100)
@@ -932,7 +938,7 @@ pub(crate) mod tests {
assert_eq!(queue.status(&id(101)), HeaderStatus::MaybeExtra);
// when parent is Ready, we insert to MaybeExtra
let mut queue = QueuedHeaders::<EthereumHeadersSyncPipeline>::new();
let mut queue = QueuedHeaders::<TestHeadersSyncPipeline>::default();
queue
.known_headers
.entry(100)
@@ -942,7 +948,7 @@ pub(crate) mod tests {
assert_eq!(queue.status(&id(101)), HeaderStatus::MaybeExtra);
// when parent is Receipts, we insert to MaybeExtra
let mut queue = QueuedHeaders::<EthereumHeadersSyncPipeline>::new();
let mut queue = QueuedHeaders::<TestHeadersSyncPipeline>::default();
queue
.known_headers
.entry(100)
@@ -952,7 +958,7 @@ pub(crate) mod tests {
assert_eq!(queue.status(&id(101)), HeaderStatus::MaybeExtra);
// when parent is MaybeExtra, we insert to MaybeExtra
let mut queue = QueuedHeaders::<EthereumHeadersSyncPipeline>::new();
let mut queue = QueuedHeaders::<TestHeadersSyncPipeline>::default();
queue
.known_headers
.entry(100)
@@ -962,7 +968,7 @@ pub(crate) mod tests {
assert_eq!(queue.status(&id(101)), HeaderStatus::MaybeExtra);
// when parent is Orphan, we insert to Orphan
let mut queue = QueuedHeaders::<EthereumHeadersSyncPipeline>::new();
let mut queue = QueuedHeaders::<TestHeadersSyncPipeline>::default();
queue
.known_headers
.entry(100)
@@ -972,7 +978,7 @@ pub(crate) mod tests {
assert_eq!(queue.status(&id(101)), HeaderStatus::Orphan);
// when parent is MaybeOrphan, we insert to MaybeOrphan
let mut queue = QueuedHeaders::<EthereumHeadersSyncPipeline>::new();
let mut queue = QueuedHeaders::<TestHeadersSyncPipeline>::default();
queue
.known_headers
.entry(100)
@@ -982,7 +988,7 @@ pub(crate) mod tests {
assert_eq!(queue.status(&id(101)), HeaderStatus::MaybeOrphan);
// when parent is unknown, we insert to MaybeOrphan
let mut queue = QueuedHeaders::<EthereumHeadersSyncPipeline>::new();
let mut queue = QueuedHeaders::<TestHeadersSyncPipeline>::default();
queue.header_response(header(101).header().clone());
assert_eq!(queue.status(&id(101)), HeaderStatus::MaybeOrphan);
}
@@ -996,7 +1002,7 @@ pub(crate) mod tests {
// #98 in MaybeExtra
// #97 in Receipts
// #96 in Ready
let mut queue = QueuedHeaders::<EthereumHeadersSyncPipeline>::new();
let mut queue = QueuedHeaders::<TestHeadersSyncPipeline>::default();
queue
.known_headers
.entry(100)
@@ -1053,7 +1059,7 @@ pub(crate) mod tests {
// #101 in Orphan
// #102 in MaybeOrphan
// #103 in Orphan
let mut queue = QueuedHeaders::<EthereumHeadersSyncPipeline>::new();
let mut queue = QueuedHeaders::<TestHeadersSyncPipeline>::default();
queue
.known_headers
.entry(101)
@@ -1095,7 +1101,7 @@ pub(crate) mod tests {
// #102 in MaybeOrphan
// and we have asked for MaybeOrphan status of #100.parent (i.e. #99)
// and the response is: YES, #99 is known to the Substrate runtime
let mut queue = QueuedHeaders::<EthereumHeadersSyncPipeline>::new();
let mut queue = QueuedHeaders::<TestHeadersSyncPipeline>::default();
queue
.known_headers
.entry(100)
@@ -1140,7 +1146,7 @@ pub(crate) mod tests {
// #101 in MaybeOrphan
// and we have asked for MaybeOrphan status of #100.parent (i.e. #99)
// and the response is: NO, #99 is NOT known to the Substrate runtime
let mut queue = QueuedHeaders::<EthereumHeadersSyncPipeline>::new();
let mut queue = QueuedHeaders::<TestHeadersSyncPipeline>::default();
queue
.known_headers
.entry(100)
@@ -1172,7 +1178,7 @@ pub(crate) mod tests {
#[test]
fn positive_maybe_extra_response_works() {
let mut queue = QueuedHeaders::<EthereumHeadersSyncPipeline>::new();
let mut queue = QueuedHeaders::<TestHeadersSyncPipeline>::default();
queue
.known_headers
.entry(100)
@@ -1188,7 +1194,7 @@ pub(crate) mod tests {
#[test]
fn negative_maybe_extra_response_works() {
// when parent header is complete
let mut queue = QueuedHeaders::<EthereumHeadersSyncPipeline>::new();
let mut queue = QueuedHeaders::<TestHeadersSyncPipeline>::default();
queue
.known_headers
.entry(100)
@@ -1217,14 +1223,14 @@ pub(crate) mod tests {
#[test]
fn receipts_response_works() {
// when parent header is complete
let mut queue = QueuedHeaders::<EthereumHeadersSyncPipeline>::new();
let mut queue = QueuedHeaders::<TestHeadersSyncPipeline>::default();
queue
.known_headers
.entry(100)
.or_default()
.insert(hash(100), HeaderStatus::Extra);
queue.extra.entry(100).or_default().insert(hash(100), header(100));
queue.extra_response(&id(100), Vec::new());
queue.extra_response(&id(100), 100_100);
assert!(queue.extra.is_empty());
assert_eq!(queue.ready.len(), 1);
assert_eq!(queue.known_headers[&100][&hash(100)], HeaderStatus::Ready);
@@ -1237,7 +1243,7 @@ pub(crate) mod tests {
.or_default()
.insert(hash(201), HeaderStatus::Extra);
queue.extra.entry(201).or_default().insert(hash(201), header(201));
queue.extra_response(&id(201), Vec::new());
queue.extra_response(&id(201), 201_201);
assert!(queue.extra.is_empty());
assert_eq!(queue.incomplete.len(), 1);
assert_eq!(queue.known_headers[&201][&hash(201)], HeaderStatus::Incomplete);
@@ -1245,7 +1251,7 @@ pub(crate) mod tests {
#[test]
fn header_submitted_works() {
let mut queue = QueuedHeaders::<EthereumHeadersSyncPipeline>::new();
let mut queue = QueuedHeaders::<TestHeadersSyncPipeline>::default();
queue
.known_headers
.entry(100)
@@ -1259,7 +1265,7 @@ pub(crate) mod tests {
#[test]
fn incomplete_header_works() {
let mut queue = QueuedHeaders::<EthereumHeadersSyncPipeline>::new();
let mut queue = QueuedHeaders::<TestHeadersSyncPipeline>::default();
// nothing to complete if queue is empty
assert_eq!(queue.incomplete_header(), None);
@@ -1282,7 +1288,7 @@ pub(crate) mod tests {
#[test]
fn completion_response_works() {
let mut queue = QueuedHeaders::<EthereumHeadersSyncPipeline>::new();
let mut queue = QueuedHeaders::<TestHeadersSyncPipeline>::default();
queue.incomplete_headers.insert(id(100), None);
queue.incomplete_headers.insert(id(200), Some(Instant::now()));
@@ -1299,18 +1305,18 @@ pub(crate) mod tests {
assert_eq!(queue.header_to_complete(), None);
// when response is Some, we're scheduling completion
queue.completion_response(&id(200), Some(()));
queue.completion_response(&id(200), Some(200_200));
assert_eq!(queue.incomplete_headers.len(), 2);
assert_eq!(queue.completion_data.len(), 1);
assert!(queue.incomplete_headers.contains_key(&id(100)));
assert!(queue.completion_data.contains_key(&id(200)));
assert_eq!(queue.header_to_complete(), Some((id(200), &())));
assert_eq!(queue.header_to_complete(), Some((id(200), &200_200)));
}
#[test]
fn header_completed_works() {
let mut queue = QueuedHeaders::<EthereumHeadersSyncPipeline>::new();
queue.completion_data.insert(id(100), ());
let mut queue = QueuedHeaders::<TestHeadersSyncPipeline>::default();
queue.completion_data.insert(id(100), 100_100);
// when unknown header is completed
queue.header_completed(&id(200));
@@ -1323,7 +1329,7 @@ pub(crate) mod tests {
#[test]
fn incomplete_headers_response_works() {
let mut queue = QueuedHeaders::<EthereumHeadersSyncPipeline>::new();
let mut queue = QueuedHeaders::<TestHeadersSyncPipeline>::default();
// when we have already submitted #101 and #102 is ready
queue
@@ -1370,7 +1376,7 @@ pub(crate) mod tests {
#[test]
fn is_parent_incomplete_works() {
let mut queue = QueuedHeaders::<EthereumHeadersSyncPipeline>::new();
let mut queue = QueuedHeaders::<TestHeadersSyncPipeline>::default();
// when we do not know header itself
assert_eq!(queue.is_parent_incomplete(&id(50)), false);
@@ -1404,7 +1410,7 @@ pub(crate) mod tests {
assert_eq!(queue.is_parent_incomplete(&id(200)), true);
// when parent is the incomplete header and we have completion data
queue.completion_data.insert(id(299), ());
queue.completion_data.insert(id(299), 299_299);
queue
.known_headers
.entry(300)
@@ -1416,7 +1422,7 @@ pub(crate) mod tests {
#[test]
fn prune_works() {
let mut queue = QueuedHeaders::<EthereumHeadersSyncPipeline>::new();
let mut queue = QueuedHeaders::<TestHeadersSyncPipeline>::default();
queue
.known_headers
.entry(105)
@@ -1487,7 +1493,7 @@ pub(crate) mod tests {
#[test]
fn incomplete_headers_are_still_incomplete_after_advance() {
let mut queue = QueuedHeaders::<EthereumHeadersSyncPipeline>::new();
let mut queue = QueuedHeaders::<TestHeadersSyncPipeline>::default();
// relay#1 knows that header#100 is incomplete && it has headers 101..104 in incomplete queue
queue.incomplete_headers.insert(id(100), None);
+33
View File
@@ -0,0 +1,33 @@
// Copyright 2019-2020 Parity Technologies (UK) Ltd.
// This file is part of Parity Bridges Common.
// Parity Bridges Common is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Bridges Common is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
//! Relaying source chain headers to target chain. This module provides entrypoint
//! that starts reading new headers from source chain and submit these headers as
//! module/contract transactions to the target chain. Module/contract on the target
//! chain is a light-client of the source chain. All other trustless bridge
//! applications are built using this light-client, so running headers-relay is
//! essential for running all other bridge applications.
// required for futures::select!
#![recursion_limit = "1024"]
#![warn(missing_docs)]
pub mod headers;
pub mod sync;
pub mod sync_loop;
pub mod sync_loop_metrics;
pub mod sync_loop_tests;
pub mod sync_types;
@@ -14,6 +14,11 @@
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
//! Headers synchronization context. This structure wraps headers queue and is
//! able to choose: which headers to read from the source chain? Which headers
//! to submit to the target chain? The context makes decisions basing on parameters
//! passed using `HeadersSyncParams` structure.
use crate::headers::QueuedHeaders;
use crate::sync_types::{HeaderIdOf, HeaderStatus, HeadersSyncPipeline, QueuedHeader};
use num_traits::{One, Saturating, Zero};
@@ -36,38 +41,6 @@ pub struct HeadersSyncParams {
pub target_tx_mode: TargetTransactionMode,
}
impl HeadersSyncParams {
/// Default parameters for syncing Ethereum headers.
pub fn ethereum_sync_default() -> Self {
use crate::ethereum_sync_loop::consts::*;
Self {
max_future_headers_to_download: MAX_FUTURE_HEADERS_TO_DOWNLOAD,
max_headers_in_submitted_status: MAX_SUBMITTED_HEADERS,
max_headers_in_single_submit: MAX_HEADERS_IN_SINGLE_SUBMIT,
max_headers_size_in_single_submit: MAX_HEADERS_SIZE_IN_SINGLE_SUBMIT,
prune_depth: PRUNE_DEPTH,
target_tx_mode: TargetTransactionMode::Signed,
}
}
/// Default parameters for syncing Substrate headers.
pub fn substrate_sync_default() -> Self {
use crate::substrate_sync_loop::consts::*;
Self {
max_future_headers_to_download: MAX_FUTURE_HEADERS_TO_DOWNLOAD,
max_headers_in_submitted_status: MAX_SUBMITTED_HEADERS,
// since we always have single Substrate header in separate Ethereum transaction,
// all max_**_in_single_submit aren't important here
max_headers_in_single_submit: 4,
max_headers_size_in_single_submit: std::usize::MAX,
prune_depth: PRUNE_DEPTH,
target_tx_mode: TargetTransactionMode::Signed,
}
}
}
/// Target transaction mode.
#[derive(Debug, PartialEq, Clone)]
pub enum TargetTransactionMode {
@@ -99,7 +72,7 @@ impl<P: HeadersSyncPipeline> HeadersSync<P> {
/// Creates new headers synchronizer.
pub fn new(params: HeadersSyncParams) -> Self {
HeadersSync {
headers: QueuedHeaders::new(),
headers: QueuedHeaders::default(),
params,
source_best_number: None,
target_best_header: None,
@@ -308,13 +281,13 @@ impl<P: HeadersSyncPipeline> HeadersSync<P> {
#[cfg(test)]
pub mod tests {
use super::*;
use crate::ethereum_types::{EthereumHeadersSyncPipeline, H256};
use crate::headers::tests::{header, id};
use crate::sync_loop_tests::{TestHash, TestHeadersSyncPipeline, TestNumber};
use crate::sync_types::HeaderStatus;
use crate::utils::HeaderId;
use relay_utils::HeaderId;
fn side_hash(number: u64) -> H256 {
H256::from_low_u64_le(1000 + number)
fn side_hash(number: TestNumber) -> TestHash {
1000 + number
}
pub fn default_sync_params() -> HeadersSyncParams {
@@ -330,7 +303,7 @@ pub mod tests {
#[test]
fn select_new_header_to_download_works() {
let mut eth_sync = HeadersSync::<EthereumHeadersSyncPipeline>::new(default_sync_params());
let mut eth_sync = HeadersSync::<TestHeadersSyncPipeline>::new(default_sync_params());
// both best && target headers are unknown
assert_eq!(eth_sync.select_new_header_to_download(), None);
@@ -366,7 +339,7 @@ pub mod tests {
#[test]
fn select_new_header_to_download_works_with_empty_queue() {
let mut eth_sync = HeadersSync::<EthereumHeadersSyncPipeline>::new(default_sync_params());
let mut eth_sync = HeadersSync::<TestHeadersSyncPipeline>::new(default_sync_params());
eth_sync.source_best_header_number_response(100);
// when queue is not empty => everything goes as usually
@@ -489,7 +462,7 @@ pub mod tests {
#[test]
fn pruning_happens_on_target_best_header_response() {
let mut eth_sync = HeadersSync::<EthereumHeadersSyncPipeline>::new(default_sync_params());
let mut eth_sync = HeadersSync::<TestHeadersSyncPipeline>::new(default_sync_params());
eth_sync.params.prune_depth = 50;
eth_sync.target_best_header_response(id(100));
assert_eq!(eth_sync.headers.prune_border(), 50);
@@ -14,17 +14,20 @@
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
use crate::metrics::{start as metrics_start, GlobalMetrics, MetricsParams};
//! Entrypoint for running headers synchronization loop.
use crate::sync::HeadersSyncParams;
use crate::sync_loop_metrics::SyncLoopMetrics;
use crate::sync_types::{HeaderIdOf, HeaderStatus, HeadersSyncPipeline, QueuedHeader, SubmittedHeaders};
use crate::utils::{
format_ids, interval, process_future_result, retry_backoff, MaybeConnectionError, StringifiedMaybeConnectionError,
};
use async_trait::async_trait;
use futures::{future::FutureExt, stream::StreamExt};
use num_traits::{Saturating, Zero};
use relay_utils::{
format_ids, interval,
metrics::{start as metrics_start, GlobalMetrics, MetricsParams},
process_future_result, retry_backoff, MaybeConnectionError, StringifiedMaybeConnectionError,
};
use std::{
collections::HashSet,
future::Future,
@@ -117,8 +120,8 @@ pub fn run<P: HeadersSyncPipeline, TC: TargetClient<P>>(
let mut stall_countdown = None;
let mut last_update_time = Instant::now();
let mut metrics_global = GlobalMetrics::new();
let mut metrics_sync = SyncLoopMetrics::new();
let mut metrics_global = GlobalMetrics::default();
let mut metrics_sync = SyncLoopMetrics::default();
let metrics_enabled = metrics_params.is_some();
metrics_start(
format!("{}_to_{}_Sync", P::SOURCE_NAME, P::TARGET_NAME),
@@ -14,11 +14,13 @@
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
use crate::metrics::{register, GaugeVec, Metrics, Opts, Registry, U64};
//! Metrics for headers synchronization relay loop.
use crate::sync::HeadersSync;
use crate::sync_types::{HeaderStatus, HeadersSyncPipeline};
use num_traits::Zero;
use relay_utils::metrics::{register, GaugeVec, Metrics, Opts, Registry, U64};
/// Headers sync metrics.
pub struct SyncLoopMetrics {
@@ -36,9 +38,8 @@ impl Metrics for SyncLoopMetrics {
}
}
impl SyncLoopMetrics {
/// Creates sync loop metrics.
pub fn new() -> Self {
impl Default for SyncLoopMetrics {
fn default() -> Self {
SyncLoopMetrics {
best_block_numbers: GaugeVec::new(
Opts::new("best_block_numbers", "Best block numbers on source and target nodes"),
@@ -52,7 +53,9 @@ impl SyncLoopMetrics {
.expect("metric is static and thus valid; qed"),
}
}
}
impl SyncLoopMetrics {
/// Update metrics.
pub fn update<P: HeadersSyncPipeline>(&mut self, sync: &HeadersSync<P>) {
let headers = sync.headers();
@@ -18,30 +18,30 @@
use crate::sync_loop::{run, SourceClient, TargetClient};
use crate::sync_types::{HeadersSyncPipeline, QueuedHeader, SourceHeader, SubmittedHeaders};
use crate::utils::{process_future_result, retry_backoff, HeaderId, MaybeConnectionError};
use async_trait::async_trait;
use backoff::backoff::Backoff;
use futures::{future::FutureExt, stream::StreamExt};
use parking_lot::Mutex;
use relay_utils::{process_future_result, retry_backoff, HeaderId, MaybeConnectionError};
use std::{
collections::{HashMap, HashSet},
sync::Arc,
time::Duration,
};
type TestNumber = u64;
type TestHash = u64;
type TestExtra = u64;
type TestCompletion = u64;
type TestHeaderId = HeaderId<TestHash, TestNumber>;
type TestQueuedHeader = QueuedHeader<TestHeadersSyncPipeline>;
pub type TestNumber = u64;
pub type TestHash = u64;
pub type TestHeaderId = HeaderId<TestHash, TestNumber>;
pub type TestExtra = u64;
pub type TestCompletion = u64;
pub type TestQueuedHeader = QueuedHeader<TestHeadersSyncPipeline>;
#[derive(Debug, Clone, PartialEq)]
struct TestHeader {
hash: TestHash,
number: TestNumber,
parent_hash: TestHash,
#[derive(Default, Debug, Clone, PartialEq)]
pub struct TestHeader {
pub hash: TestHash,
pub number: TestNumber,
pub parent_hash: TestHash,
}
impl SourceHeader<TestHash, TestNumber> for TestHeader {
@@ -63,8 +63,8 @@ impl MaybeConnectionError for TestError {
}
}
#[derive(Debug, Clone, Copy)]
struct TestHeadersSyncPipeline;
#[derive(Debug, Clone, Copy, PartialEq)]
pub struct TestHeadersSyncPipeline;
impl HeadersSyncPipeline for TestHeadersSyncPipeline {
const SOURCE_NAME: &'static str = "Source";
@@ -14,8 +14,9 @@
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
use crate::utils::{format_ids, HeaderId};
//! Types that are used by headers synchronization components.
use relay_utils::{format_ids, HeaderId};
use std::{ops::Deref, sync::Arc};
/// Ethereum header synchronization status.
@@ -99,8 +100,7 @@ pub trait SourceHeader<Hash, Number> {
}
/// Header how it's stored in the synchronization queue.
#[derive(Clone, Debug)]
#[cfg_attr(test, derive(PartialEq))]
#[derive(Clone, Debug, PartialEq)]
pub struct QueuedHeader<P: HeadersSyncPipeline>(Arc<QueuedHeaderData<P>>);
impl<P: HeadersSyncPipeline> QueuedHeader<P> {
@@ -129,8 +129,7 @@ impl<P: HeadersSyncPipeline> Deref for QueuedHeader<P> {
}
/// Header how it's stored in the synchronization queue.
#[derive(Clone, Debug, Default)]
#[cfg_attr(test, derive(PartialEq))]
#[derive(Clone, Debug, Default, PartialEq)]
pub struct QueuedHeaderData<P: HeadersSyncPipeline> {
header: P::Header,
extra: Option<P::Extra>,
+15
View File
@@ -0,0 +1,15 @@
[package]
name = "messages-relay"
version = "0.1.0"
authors = ["Parity Technologies <admin@parity.io>"]
edition = "2018"
license = "GPL-3.0-or-later WITH Classpath-exception-2.0"
[dependencies]
async-std = "1.6.2"
async-trait = "0.1.40"
futures = "0.3.5"
log = "0.4.11"
num-traits = "0.2"
parking_lot = "0.11.0"
relay-utils = { path = "../utils" }
+32
View File
@@ -0,0 +1,32 @@
// Copyright 2019-2020 Parity Technologies (UK) Ltd.
// This file is part of Parity Bridges Common.
// Parity Bridges Common is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Bridges Common is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
//! Relaying [`message-lane`](../pallet_message_lane/index.html) application specific
//! data. Message lane allows sending arbitrary messages between bridged chains. This
//! module provides entrypoint that starts reading messages from given message lane
//! of source chain and submits proof-of-message-at-source-chain transactions to the
//! target chain. Additionaly, proofs-of-messages-delivery are sent back from the
//! target chain to the source chain.
// required for futures::select!
#![recursion_limit = "1024"]
#![warn(missing_docs)]
pub mod message_lane;
pub mod message_lane_loop;
pub mod message_race_delivery;
pub mod message_race_loop;
pub mod message_race_receiving;
@@ -19,7 +19,7 @@
//! 1) relay new messages from source to target node;
//! 2) relay proof-of-receiving from target to source node.
use crate::utils::HeaderId;
use relay_utils::HeaderId;
use num_traits::{One, Zero};
use std::fmt::Debug;
@@ -30,10 +30,10 @@
use crate::message_lane::{MessageLane, SourceHeaderIdOf, TargetHeaderIdOf};
use crate::message_race_delivery::run as run_message_delivery_race;
use crate::message_race_receiving::run as run_message_receiving_race;
use crate::utils::{interval, process_future_result, retry_backoff, FailedClient, MaybeConnectionError};
use async_trait::async_trait;
use futures::{channel::mpsc::unbounded, future::FutureExt, stream::StreamExt};
use relay_utils::{interval, process_future_result, retry_backoff, FailedClient, MaybeConnectionError};
use std::{fmt::Debug, future::Future, ops::RangeInclusive, time::Duration};
/// Source client trait.
@@ -334,9 +334,9 @@ async fn run_until_connection_lost<P: MessageLane, SC: SourceClient<P>, TC: Targ
#[cfg(test)]
pub(crate) mod tests {
use super::*;
use crate::utils::HeaderId;
use futures::stream::StreamExt;
use parking_lot::Mutex;
use relay_utils::HeaderId;
use std::sync::Arc;
pub fn header_id(number: TestSourceHeaderNumber) -> HeaderId<TestSourceHeaderNumber, TestSourceHeaderHash> {
@@ -19,11 +19,11 @@ use crate::message_lane_loop::{
TargetClientState,
};
use crate::message_race_loop::{MessageRace, RaceState, RaceStrategy, SourceClient, TargetClient};
use crate::utils::{FailedClient, HeaderId};
use async_trait::async_trait;
use futures::stream::FusedStream;
use num_traits::{One, Zero};
use relay_utils::{FailedClient, HeaderId};
use std::{collections::VecDeque, marker::PhantomData, ops::RangeInclusive, time::Duration};
/// Maximal number of messages to relay in single transaction.
@@ -24,13 +24,13 @@
#![allow(dead_code)]
use crate::message_lane_loop::ClientState;
use crate::utils::{process_future_result, retry_backoff, FailedClient, MaybeConnectionError};
use async_trait::async_trait;
use futures::{
future::FutureExt,
stream::{FusedStream, StreamExt},
};
use relay_utils::{process_future_result, retry_backoff, FailedClient, MaybeConnectionError};
use std::{
fmt::Debug,
ops::RangeInclusive,
@@ -11,6 +11,8 @@
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//! Message receiving race delivers proof-of-messages-delivery from lane.target to lane.source.
use crate::message_lane::{MessageLane, SourceHeaderIdOf, TargetHeaderIdOf};
use crate::message_lane_loop::{
SourceClient as MessageLaneSourceClient, SourceClientState, TargetClient as MessageLaneTargetClient,
@@ -18,10 +20,10 @@ use crate::message_lane_loop::{
};
use crate::message_race_delivery::DeliveryStrategy;
use crate::message_race_loop::{MessageRace, SourceClient, TargetClient};
use crate::utils::FailedClient;
use async_trait::async_trait;
use futures::stream::FusedStream;
use relay_utils::FailedClient;
use std::{marker::PhantomData, ops::RangeInclusive, time::Duration};
/// Message receiving confirmations delivery strategy.
+16
View File
@@ -0,0 +1,16 @@
[package]
name = "relay-utils"
version = "0.1.0"
authors = ["Parity Technologies <admin@parity.io>"]
edition = "2018"
license = "GPL-3.0-or-later WITH Classpath-exception-2.0"
[dependencies]
async-std = "1.6.2"
backoff = "0.2"
futures = "0.3.5"
log = "0.4.11"
sysinfo = "0.15"
# Substrate dependencies
substrate-prometheus-endpoint = { version = "0.8.0-rc6", git = "https://github.com/paritytech/substrate.git", tag = "v2.0.0-rc6" }
@@ -14,16 +14,20 @@
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
//! Utilities used by different relays.
use backoff::{backoff::Backoff, ExponentialBackoff};
use futures::future::FutureExt;
use std::time::Duration;
/// Max delay after connection-unrelated error happened before we'll try the
/// same request again.
const MAX_BACKOFF_INTERVAL: Duration = Duration::from_secs(60);
pub const MAX_BACKOFF_INTERVAL: Duration = Duration::from_secs(60);
/// Delay after connection-related error happened before we'll try
/// reconnection again.
const CONNECTION_ERROR_DELAY: Duration = Duration::from_secs(10);
pub const CONNECTION_ERROR_DELAY: Duration = Duration::from_secs(10);
pub mod metrics;
/// Macro that returns (client, Err(error)) tuple from function if result is Err(error).
#[macro_export]
@@ -176,7 +180,7 @@ impl ProcessFutureResult {
}
/// Process result of the future from a client.
pub(crate) fn process_future_result<TResult, TError, TGoOfflineFuture>(
pub fn process_future_result<TResult, TError, TGoOfflineFuture>(
result: Result<TResult, TError>,
retry_backoff: &mut ExponentialBackoff,
on_success: impl FnOnce(TResult),
@@ -107,9 +107,8 @@ impl Metrics for GlobalMetrics {
}
}
impl GlobalMetrics {
/// Creates global metrics.
pub fn new() -> Self {
impl Default for GlobalMetrics {
fn default() -> Self {
GlobalMetrics {
system: System::new_with_specifics(RefreshKind::everything()),
system_average_load: GaugeVec::new(Opts::new("system_average_load", "System load average"), &["over"])
@@ -123,7 +122,9 @@ impl GlobalMetrics {
.expect("metric is static and thus valid; qed"),
}
}
}
impl GlobalMetrics {
/// Update metrics.
pub fn update(&mut self) {
// update system-wide metrics