Adding Bridges code as git subtree. (#2515)

* Add instructions.

* Squashed 'bridges/' content from commit 345e84a21

git-subtree-dir: bridges
git-subtree-split: 345e84a2146b56628e9888c9f5e129cb40e868a9

* Remove bridges workspace file to avoid confusing Cargo.

* Add some bridges primitives to Polkadot workspace.

* Improve docs.
This commit is contained in:
Tomasz Drwięga
2021-03-01 22:33:16 +01:00
committed by GitHub
parent 7a2c7aa3fe
commit 5169155f94
291 changed files with 64249 additions and 0 deletions
@@ -0,0 +1,16 @@
[package]
name = "exchange-relay"
version = "0.1.0"
authors = ["Parity Technologies <admin@parity.io>"]
edition = "2018"
license = "GPL-3.0-or-later WITH Classpath-exception-2.0"
[dependencies]
async-std = "1.6.5"
async-trait = "0.1.40"
backoff = "0.2"
futures = "0.3.5"
log = "0.4.11"
num-traits = "0.2"
parking_lot = "0.11.0"
relay-utils = { path = "../utils" }
@@ -0,0 +1,916 @@
// Copyright 2019-2020 Parity Technologies (UK) Ltd.
// This file is part of Parity Bridges Common.
// Parity Bridges Common is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Bridges Common is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
//! Relaying proofs of exchange transaction.
use async_trait::async_trait;
use relay_utils::{
relay_loop::Client as RelayClient, FailedClient, MaybeConnectionError, StringifiedMaybeConnectionError,
};
use std::{
fmt::{Debug, Display},
string::ToString,
};
/// Transaction proof pipeline.
pub trait TransactionProofPipeline {
/// Name of the transaction proof source.
const SOURCE_NAME: &'static str;
/// Name of the transaction proof target.
const TARGET_NAME: &'static str;
/// Block type.
type Block: SourceBlock;
/// Transaction inclusion proof type.
type TransactionProof;
}
/// Block that is participating in exchange.
pub trait SourceBlock {
/// Block hash type.
type Hash: Clone + Debug + Display;
/// Block number type.
type Number: Debug
+ Display
+ Clone
+ Copy
+ Into<u64>
+ std::cmp::Ord
+ std::ops::Add<Output = Self::Number>
+ num_traits::One;
/// Block transaction.
type Transaction: SourceTransaction;
/// Return hash of the block.
fn id(&self) -> relay_utils::HeaderId<Self::Hash, Self::Number>;
/// Return block transactions iterator.
fn transactions(&self) -> Vec<Self::Transaction>;
}
/// Transaction that is participating in exchange.
pub trait SourceTransaction {
/// Transaction hash type.
type Hash: Debug + Display;
/// Return transaction hash.
fn hash(&self) -> Self::Hash;
}
/// Block hash for given pipeline.
pub type BlockHashOf<P> = <<P as TransactionProofPipeline>::Block as SourceBlock>::Hash;
/// Block number for given pipeline.
pub type BlockNumberOf<P> = <<P as TransactionProofPipeline>::Block as SourceBlock>::Number;
/// Transaction hash for given pipeline.
pub type TransactionOf<P> = <<P as TransactionProofPipeline>::Block as SourceBlock>::Transaction;
/// Transaction hash for given pipeline.
pub type TransactionHashOf<P> = <TransactionOf<P> as SourceTransaction>::Hash;
/// Header id.
pub type HeaderId<P> = relay_utils::HeaderId<BlockHashOf<P>, BlockNumberOf<P>>;
/// Source client API.
#[async_trait]
pub trait SourceClient<P: TransactionProofPipeline>: RelayClient {
/// Sleep until exchange-related data is (probably) updated.
async fn tick(&self);
/// Get block by hash.
async fn block_by_hash(&self, hash: BlockHashOf<P>) -> Result<P::Block, Self::Error>;
/// Get canonical block by number.
async fn block_by_number(&self, number: BlockNumberOf<P>) -> Result<P::Block, Self::Error>;
/// Return block + index where transaction has been **mined**. May return `Ok(None)` if transaction
/// is unknown to the source node.
async fn transaction_block(&self, hash: &TransactionHashOf<P>)
-> Result<Option<(HeaderId<P>, usize)>, Self::Error>;
/// Prepare transaction proof.
async fn transaction_proof(&self, block: &P::Block, tx_index: usize) -> Result<P::TransactionProof, Self::Error>;
}
/// Target client API.
#[async_trait]
pub trait TargetClient<P: TransactionProofPipeline>: RelayClient {
/// Sleep until exchange-related data is (probably) updated.
async fn tick(&self);
/// Returns `Ok(true)` if header is known to the target node.
async fn is_header_known(&self, id: &HeaderId<P>) -> Result<bool, Self::Error>;
/// Returns `Ok(true)` if header is finalized by the target node.
async fn is_header_finalized(&self, id: &HeaderId<P>) -> Result<bool, Self::Error>;
/// Returns best finalized header id.
async fn best_finalized_header_id(&self) -> Result<HeaderId<P>, Self::Error>;
/// Returns `Ok(true)` if transaction proof is need to be relayed.
async fn filter_transaction_proof(&self, proof: &P::TransactionProof) -> Result<bool, Self::Error>;
/// Submits transaction proof to the target node.
async fn submit_transaction_proof(&self, proof: P::TransactionProof) -> Result<(), Self::Error>;
}
/// Block transaction statistics.
#[derive(Debug, Default)]
#[cfg_attr(test, derive(PartialEq))]
pub struct RelayedBlockTransactions {
/// Total number of transactions processed (either relayed or ignored) so far.
pub processed: usize,
/// Total number of transactions successfully relayed so far.
pub relayed: usize,
/// Total number of transactions that we have failed to relay so far.
pub failed: usize,
}
/// Relay all suitable transactions from single block.
///
/// If connection error occurs, returns Err with number of successfully processed transactions.
/// If some other error occurs, it is ignored and other transactions are processed.
///
/// All transaction-level traces are written by this function. This function is not tracing
/// any information about block.
pub async fn relay_block_transactions<P: TransactionProofPipeline>(
source_client: &impl SourceClient<P>,
target_client: &impl TargetClient<P>,
source_block: &P::Block,
mut relayed_transactions: RelayedBlockTransactions,
) -> Result<RelayedBlockTransactions, (FailedClient, RelayedBlockTransactions)> {
let transactions_to_process = source_block
.transactions()
.into_iter()
.enumerate()
.skip(relayed_transactions.processed);
for (source_tx_index, source_tx) in transactions_to_process {
let result = async {
let source_tx_id = format!("{}/{}", source_block.id().1, source_tx_index);
let source_tx_proof =
prepare_transaction_proof(source_client, &source_tx_id, source_block, source_tx_index)
.await
.map_err(|e| (FailedClient::Source, e))?;
let needs_to_be_relayed =
target_client
.filter_transaction_proof(&source_tx_proof)
.await
.map_err(|err| {
(
FailedClient::Target,
StringifiedMaybeConnectionError::new(
err.is_connection_error(),
format!("Transaction filtering has failed with {:?}", err),
),
)
})?;
if !needs_to_be_relayed {
return Ok(false);
}
relay_ready_transaction_proof(target_client, &source_tx_id, source_tx_proof)
.await
.map(|_| true)
.map_err(|e| (FailedClient::Target, e))
}
.await;
// We have two options here:
// 1) retry with the same transaction later;
// 2) report error and proceed with next transaction.
//
// Option#1 may seems better, but:
// 1) we do not track if transaction is mined (without an error) by the target node;
// 2) error could be irrecoverable (e.g. when block is already pruned by bridge module or tx
// has invalid format) && we'll end up in infinite loop of retrying the same transaction proof.
//
// So we're going with option#2 here (the only exception are connection errors).
match result {
Ok(false) => {
relayed_transactions.processed += 1;
}
Ok(true) => {
log::info!(
target: "bridge",
"{} transaction {} proof has been successfully submitted to {} node",
P::SOURCE_NAME,
source_tx.hash(),
P::TARGET_NAME,
);
relayed_transactions.processed += 1;
relayed_transactions.relayed += 1;
}
Err((failed_client, err)) => {
log::error!(
target: "bridge",
"Error relaying {} transaction {} proof to {} node: {}. {}",
P::SOURCE_NAME,
source_tx.hash(),
P::TARGET_NAME,
err.to_string(),
if err.is_connection_error() {
"Going to retry after delay..."
} else {
"You may need to submit proof of this transaction manually"
},
);
if err.is_connection_error() {
return Err((failed_client, relayed_transactions));
}
relayed_transactions.processed += 1;
relayed_transactions.failed += 1;
}
}
}
Ok(relayed_transactions)
}
/// Relay single transaction proof.
pub async fn relay_single_transaction_proof<P: TransactionProofPipeline>(
source_client: &impl SourceClient<P>,
target_client: &impl TargetClient<P>,
source_tx_hash: TransactionHashOf<P>,
) -> Result<(), String> {
// wait for transaction and header on source node
let (source_header_id, source_tx_index) = wait_transaction_mined(source_client, &source_tx_hash).await?;
let source_block = source_client.block_by_hash(source_header_id.1.clone()).await;
let source_block = source_block.map_err(|err| {
format!(
"Error retrieving block {} from {} node: {:?}",
source_header_id.1,
P::SOURCE_NAME,
err,
)
})?;
// wait for transaction and header on target node
wait_header_imported(target_client, &source_header_id).await?;
wait_header_finalized(target_client, &source_header_id).await?;
// and finally - prepare and submit transaction proof to target node
let source_tx_id = format!("{}", source_tx_hash);
relay_ready_transaction_proof(
target_client,
&source_tx_id,
prepare_transaction_proof(source_client, &source_tx_id, &source_block, source_tx_index)
.await
.map_err(|err| err.to_string())?,
)
.await
.map_err(|err| err.to_string())
}
/// Prepare transaction proof.
async fn prepare_transaction_proof<P: TransactionProofPipeline>(
source_client: &impl SourceClient<P>,
source_tx_id: &str,
source_block: &P::Block,
source_tx_index: usize,
) -> Result<P::TransactionProof, StringifiedMaybeConnectionError> {
source_client
.transaction_proof(source_block, source_tx_index)
.await
.map_err(|err| {
StringifiedMaybeConnectionError::new(
err.is_connection_error(),
format!(
"Error building transaction {} proof on {} node: {:?}",
source_tx_id,
P::SOURCE_NAME,
err,
),
)
})
}
/// Relay prepared proof of transaction.
async fn relay_ready_transaction_proof<P: TransactionProofPipeline>(
target_client: &impl TargetClient<P>,
source_tx_id: &str,
source_tx_proof: P::TransactionProof,
) -> Result<(), StringifiedMaybeConnectionError> {
target_client
.submit_transaction_proof(source_tx_proof)
.await
.map_err(|err| {
StringifiedMaybeConnectionError::new(
err.is_connection_error(),
format!(
"Error submitting transaction {} proof to {} node: {:?}",
source_tx_id,
P::TARGET_NAME,
err,
),
)
})
}
/// Wait until transaction is mined by source node.
async fn wait_transaction_mined<P: TransactionProofPipeline>(
source_client: &impl SourceClient<P>,
source_tx_hash: &TransactionHashOf<P>,
) -> Result<(HeaderId<P>, usize), String> {
loop {
let source_header_and_tx = source_client.transaction_block(&source_tx_hash).await.map_err(|err| {
format!(
"Error retrieving transaction {} from {} node: {:?}",
source_tx_hash,
P::SOURCE_NAME,
err,
)
})?;
match source_header_and_tx {
Some((source_header_id, source_tx)) => {
log::info!(
target: "bridge",
"Transaction {} is retrieved from {} node. Continuing...",
source_tx_hash,
P::SOURCE_NAME,
);
return Ok((source_header_id, source_tx));
}
None => {
log::info!(
target: "bridge",
"Waiting for transaction {} to be mined by {} node...",
source_tx_hash,
P::SOURCE_NAME,
);
source_client.tick().await;
}
}
}
}
/// Wait until target node imports required header.
async fn wait_header_imported<P: TransactionProofPipeline>(
target_client: &impl TargetClient<P>,
source_header_id: &HeaderId<P>,
) -> Result<(), String> {
loop {
let is_header_known = target_client.is_header_known(&source_header_id).await.map_err(|err| {
format!(
"Failed to check existence of header {}/{} on {} node: {:?}",
source_header_id.0,
source_header_id.1,
P::TARGET_NAME,
err,
)
})?;
match is_header_known {
true => {
log::info!(
target: "bridge",
"Header {}/{} is known to {} node. Continuing.",
source_header_id.0,
source_header_id.1,
P::TARGET_NAME,
);
return Ok(());
}
false => {
log::info!(
target: "bridge",
"Waiting for header {}/{} to be imported by {} node...",
source_header_id.0,
source_header_id.1,
P::TARGET_NAME,
);
target_client.tick().await;
}
}
}
}
/// Wait until target node finalizes required header.
async fn wait_header_finalized<P: TransactionProofPipeline>(
target_client: &impl TargetClient<P>,
source_header_id: &HeaderId<P>,
) -> Result<(), String> {
loop {
let is_header_finalized = target_client
.is_header_finalized(&source_header_id)
.await
.map_err(|err| {
format!(
"Failed to check finality of header {}/{} on {} node: {:?}",
source_header_id.0,
source_header_id.1,
P::TARGET_NAME,
err,
)
})?;
match is_header_finalized {
true => {
log::info!(
target: "bridge",
"Header {}/{} is finalizd by {} node. Continuing.",
source_header_id.0,
source_header_id.1,
P::TARGET_NAME,
);
return Ok(());
}
false => {
log::info!(
target: "bridge",
"Waiting for header {}/{} to be finalized by {} node...",
source_header_id.0,
source_header_id.1,
P::TARGET_NAME,
);
target_client.tick().await;
}
}
}
}
#[cfg(test)]
pub(crate) mod tests {
use super::*;
use parking_lot::Mutex;
use relay_utils::HeaderId;
use std::{
collections::{HashMap, HashSet},
sync::Arc,
};
pub fn test_block_id() -> TestHeaderId {
HeaderId(1, 1)
}
pub fn test_next_block_id() -> TestHeaderId {
HeaderId(2, 2)
}
pub fn test_transaction_hash(tx_index: u64) -> TestTransactionHash {
200 + tx_index
}
pub fn test_transaction(tx_index: u64) -> TestTransaction {
TestTransaction(test_transaction_hash(tx_index))
}
pub fn test_block() -> TestBlock {
TestBlock(test_block_id(), vec![test_transaction(0)])
}
pub fn test_next_block() -> TestBlock {
TestBlock(test_next_block_id(), vec![test_transaction(1)])
}
pub type TestBlockNumber = u64;
pub type TestBlockHash = u64;
pub type TestTransactionHash = u64;
pub type TestHeaderId = HeaderId<TestBlockHash, TestBlockNumber>;
#[derive(Debug, Clone, PartialEq)]
pub struct TestError(pub bool);
impl MaybeConnectionError for TestError {
fn is_connection_error(&self) -> bool {
self.0
}
}
pub struct TestTransactionProofPipeline;
impl TransactionProofPipeline for TestTransactionProofPipeline {
const SOURCE_NAME: &'static str = "TestSource";
const TARGET_NAME: &'static str = "TestTarget";
type Block = TestBlock;
type TransactionProof = TestTransactionProof;
}
#[derive(Debug, Clone)]
pub struct TestBlock(pub TestHeaderId, pub Vec<TestTransaction>);
impl SourceBlock for TestBlock {
type Hash = TestBlockHash;
type Number = TestBlockNumber;
type Transaction = TestTransaction;
fn id(&self) -> TestHeaderId {
self.0
}
fn transactions(&self) -> Vec<TestTransaction> {
self.1.clone()
}
}
#[derive(Debug, Clone)]
pub struct TestTransaction(pub TestTransactionHash);
impl SourceTransaction for TestTransaction {
type Hash = TestTransactionHash;
fn hash(&self) -> Self::Hash {
self.0
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct TestTransactionProof(pub TestTransactionHash);
#[derive(Clone)]
pub struct TestTransactionsSource {
pub on_tick: Arc<dyn Fn(&mut TestTransactionsSourceData) + Send + Sync>,
pub data: Arc<Mutex<TestTransactionsSourceData>>,
}
pub struct TestTransactionsSourceData {
pub block: Result<TestBlock, TestError>,
pub transaction_block: Result<Option<(TestHeaderId, usize)>, TestError>,
pub proofs_to_fail: HashMap<TestTransactionHash, TestError>,
}
impl TestTransactionsSource {
pub fn new(on_tick: Box<dyn Fn(&mut TestTransactionsSourceData) + Send + Sync>) -> Self {
Self {
on_tick: Arc::new(on_tick),
data: Arc::new(Mutex::new(TestTransactionsSourceData {
block: Ok(test_block()),
transaction_block: Ok(Some((test_block_id(), 0))),
proofs_to_fail: HashMap::new(),
})),
}
}
}
#[async_trait]
impl RelayClient for TestTransactionsSource {
type Error = TestError;
async fn reconnect(&mut self) -> Result<(), TestError> {
Ok(())
}
}
#[async_trait]
impl SourceClient<TestTransactionProofPipeline> for TestTransactionsSource {
async fn tick(&self) {
(self.on_tick)(&mut *self.data.lock())
}
async fn block_by_hash(&self, _: TestBlockHash) -> Result<TestBlock, TestError> {
self.data.lock().block.clone()
}
async fn block_by_number(&self, _: TestBlockNumber) -> Result<TestBlock, TestError> {
self.data.lock().block.clone()
}
async fn transaction_block(&self, _: &TestTransactionHash) -> Result<Option<(TestHeaderId, usize)>, TestError> {
self.data.lock().transaction_block.clone()
}
async fn transaction_proof(&self, block: &TestBlock, index: usize) -> Result<TestTransactionProof, TestError> {
let tx_hash = block.1[index].hash();
let proof_error = self.data.lock().proofs_to_fail.get(&tx_hash).cloned();
if let Some(err) = proof_error {
return Err(err);
}
Ok(TestTransactionProof(tx_hash))
}
}
#[derive(Clone)]
pub struct TestTransactionsTarget {
pub on_tick: Arc<dyn Fn(&mut TestTransactionsTargetData) + Send + Sync>,
pub data: Arc<Mutex<TestTransactionsTargetData>>,
}
pub struct TestTransactionsTargetData {
pub is_header_known: Result<bool, TestError>,
pub is_header_finalized: Result<bool, TestError>,
pub best_finalized_header_id: Result<TestHeaderId, TestError>,
pub transactions_to_accept: HashSet<TestTransactionHash>,
pub submitted_proofs: Vec<TestTransactionProof>,
}
impl TestTransactionsTarget {
pub fn new(on_tick: Box<dyn Fn(&mut TestTransactionsTargetData) + Send + Sync>) -> Self {
Self {
on_tick: Arc::new(on_tick),
data: Arc::new(Mutex::new(TestTransactionsTargetData {
is_header_known: Ok(true),
is_header_finalized: Ok(true),
best_finalized_header_id: Ok(test_block_id()),
transactions_to_accept: vec![test_transaction_hash(0)].into_iter().collect(),
submitted_proofs: Vec::new(),
})),
}
}
}
#[async_trait]
impl RelayClient for TestTransactionsTarget {
type Error = TestError;
async fn reconnect(&mut self) -> Result<(), TestError> {
Ok(())
}
}
#[async_trait]
impl TargetClient<TestTransactionProofPipeline> for TestTransactionsTarget {
async fn tick(&self) {
(self.on_tick)(&mut *self.data.lock())
}
async fn is_header_known(&self, _: &TestHeaderId) -> Result<bool, TestError> {
self.data.lock().is_header_known.clone()
}
async fn is_header_finalized(&self, _: &TestHeaderId) -> Result<bool, TestError> {
self.data.lock().is_header_finalized.clone()
}
async fn best_finalized_header_id(&self) -> Result<TestHeaderId, TestError> {
self.data.lock().best_finalized_header_id.clone()
}
async fn filter_transaction_proof(&self, proof: &TestTransactionProof) -> Result<bool, TestError> {
Ok(self.data.lock().transactions_to_accept.contains(&proof.0))
}
async fn submit_transaction_proof(&self, proof: TestTransactionProof) -> Result<(), TestError> {
self.data.lock().submitted_proofs.push(proof);
Ok(())
}
}
fn ensure_relay_single_success(source: &TestTransactionsSource, target: &TestTransactionsTarget) {
assert_eq!(
async_std::task::block_on(relay_single_transaction_proof(source, target, test_transaction_hash(0),)),
Ok(()),
);
assert_eq!(
target.data.lock().submitted_proofs,
vec![TestTransactionProof(test_transaction_hash(0))],
);
}
fn ensure_relay_single_failure(source: TestTransactionsSource, target: TestTransactionsTarget) {
assert!(async_std::task::block_on(relay_single_transaction_proof(
&source,
&target,
test_transaction_hash(0),
))
.is_err(),);
assert!(target.data.lock().submitted_proofs.is_empty());
}
#[test]
fn ready_transaction_proof_relayed_immediately() {
let source = TestTransactionsSource::new(Box::new(|_| unreachable!("no ticks allowed")));
let target = TestTransactionsTarget::new(Box::new(|_| unreachable!("no ticks allowed")));
ensure_relay_single_success(&source, &target)
}
#[test]
fn relay_transaction_proof_waits_for_transaction_to_be_mined() {
let source = TestTransactionsSource::new(Box::new(|source_data| {
assert_eq!(source_data.transaction_block, Ok(None));
source_data.transaction_block = Ok(Some((test_block_id(), 0)));
}));
let target = TestTransactionsTarget::new(Box::new(|_| unreachable!("no ticks allowed")));
// transaction is not yet mined, but will be available after first wait (tick)
source.data.lock().transaction_block = Ok(None);
ensure_relay_single_success(&source, &target)
}
#[test]
fn relay_transaction_fails_when_transaction_retrieval_fails() {
let source = TestTransactionsSource::new(Box::new(|_| unreachable!("no ticks allowed")));
let target = TestTransactionsTarget::new(Box::new(|_| unreachable!("no ticks allowed")));
source.data.lock().transaction_block = Err(TestError(false));
ensure_relay_single_failure(source, target)
}
#[test]
fn relay_transaction_fails_when_proof_retrieval_fails() {
let source = TestTransactionsSource::new(Box::new(|_| unreachable!("no ticks allowed")));
let target = TestTransactionsTarget::new(Box::new(|_| unreachable!("no ticks allowed")));
source
.data
.lock()
.proofs_to_fail
.insert(test_transaction_hash(0), TestError(false));
ensure_relay_single_failure(source, target)
}
#[test]
fn relay_transaction_proof_waits_for_header_to_be_imported() {
let source = TestTransactionsSource::new(Box::new(|_| unreachable!("no ticks allowed")));
let target = TestTransactionsTarget::new(Box::new(|target_data| {
assert_eq!(target_data.is_header_known, Ok(false));
target_data.is_header_known = Ok(true);
}));
// header is not yet imported, but will be available after first wait (tick)
target.data.lock().is_header_known = Ok(false);
ensure_relay_single_success(&source, &target)
}
#[test]
fn relay_transaction_proof_fails_when_is_header_known_fails() {
let source = TestTransactionsSource::new(Box::new(|_| unreachable!("no ticks allowed")));
let target = TestTransactionsTarget::new(Box::new(|_| unreachable!("no ticks allowed")));
target.data.lock().is_header_known = Err(TestError(false));
ensure_relay_single_failure(source, target)
}
#[test]
fn relay_transaction_proof_waits_for_header_to_be_finalized() {
let source = TestTransactionsSource::new(Box::new(|_| unreachable!("no ticks allowed")));
let target = TestTransactionsTarget::new(Box::new(|target_data| {
assert_eq!(target_data.is_header_finalized, Ok(false));
target_data.is_header_finalized = Ok(true);
}));
// header is not yet finalized, but will be available after first wait (tick)
target.data.lock().is_header_finalized = Ok(false);
ensure_relay_single_success(&source, &target)
}
#[test]
fn relay_transaction_proof_fails_when_is_header_finalized_fails() {
let source = TestTransactionsSource::new(Box::new(|_| unreachable!("no ticks allowed")));
let target = TestTransactionsTarget::new(Box::new(|_| unreachable!("no ticks allowed")));
target.data.lock().is_header_finalized = Err(TestError(false));
ensure_relay_single_failure(source, target)
}
#[test]
fn relay_transaction_proof_fails_when_target_node_rejects_proof() {
let source = TestTransactionsSource::new(Box::new(|_| unreachable!("no ticks allowed")));
let target = TestTransactionsTarget::new(Box::new(|_| unreachable!("no ticks allowed")));
target
.data
.lock()
.transactions_to_accept
.remove(&test_transaction_hash(0));
ensure_relay_single_success(&source, &target)
}
fn test_relay_block_transactions(
source: &TestTransactionsSource,
target: &TestTransactionsTarget,
pre_relayed: RelayedBlockTransactions,
) -> Result<RelayedBlockTransactions, RelayedBlockTransactions> {
async_std::task::block_on(relay_block_transactions(
source,
target,
&TestBlock(
test_block_id(),
vec![test_transaction(0), test_transaction(1), test_transaction(2)],
),
pre_relayed,
))
.map_err(|(_, transactions)| transactions)
}
#[test]
fn relay_block_transactions_process_all_transactions() {
let source = TestTransactionsSource::new(Box::new(|_| unreachable!("no ticks allowed")));
let target = TestTransactionsTarget::new(Box::new(|_| unreachable!("no ticks allowed")));
// let's only accept tx#1
target
.data
.lock()
.transactions_to_accept
.remove(&test_transaction_hash(0));
target
.data
.lock()
.transactions_to_accept
.insert(test_transaction_hash(1));
let relayed_transactions = test_relay_block_transactions(&source, &target, Default::default());
assert_eq!(
relayed_transactions,
Ok(RelayedBlockTransactions {
processed: 3,
relayed: 1,
failed: 0,
}),
);
assert_eq!(
target.data.lock().submitted_proofs,
vec![TestTransactionProof(test_transaction_hash(1))],
);
}
#[test]
fn relay_block_transactions_ignores_transaction_failure() {
let source = TestTransactionsSource::new(Box::new(|_| unreachable!("no ticks allowed")));
let target = TestTransactionsTarget::new(Box::new(|_| unreachable!("no ticks allowed")));
// let's reject proof for tx#0
source
.data
.lock()
.proofs_to_fail
.insert(test_transaction_hash(0), TestError(false));
let relayed_transactions = test_relay_block_transactions(&source, &target, Default::default());
assert_eq!(
relayed_transactions,
Ok(RelayedBlockTransactions {
processed: 3,
relayed: 0,
failed: 1,
}),
);
assert_eq!(target.data.lock().submitted_proofs, vec![],);
}
#[test]
fn relay_block_transactions_fails_on_connection_error() {
let source = TestTransactionsSource::new(Box::new(|_| unreachable!("no ticks allowed")));
let target = TestTransactionsTarget::new(Box::new(|_| unreachable!("no ticks allowed")));
// fail with connection error when preparing proof for tx#1
source
.data
.lock()
.proofs_to_fail
.insert(test_transaction_hash(1), TestError(true));
let relayed_transactions = test_relay_block_transactions(&source, &target, Default::default());
assert_eq!(
relayed_transactions,
Err(RelayedBlockTransactions {
processed: 1,
relayed: 1,
failed: 0,
}),
);
assert_eq!(
target.data.lock().submitted_proofs,
vec![TestTransactionProof(test_transaction_hash(0))],
);
// now do not fail on tx#2
source.data.lock().proofs_to_fail.clear();
// and also relay tx#3
target
.data
.lock()
.transactions_to_accept
.insert(test_transaction_hash(2));
let relayed_transactions = test_relay_block_transactions(&source, &target, relayed_transactions.unwrap_err());
assert_eq!(
relayed_transactions,
Ok(RelayedBlockTransactions {
processed: 3,
relayed: 2,
failed: 0,
}),
);
assert_eq!(
target.data.lock().submitted_proofs,
vec![
TestTransactionProof(test_transaction_hash(0)),
TestTransactionProof(test_transaction_hash(2))
],
);
}
}
@@ -0,0 +1,331 @@
// Copyright 2019-2020 Parity Technologies (UK) Ltd.
// This file is part of Parity Bridges Common.
// Parity Bridges Common is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Bridges Common is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
//! Relaying proofs of exchange transactions.
use crate::exchange::{
relay_block_transactions, BlockNumberOf, RelayedBlockTransactions, SourceClient, TargetClient,
TransactionProofPipeline,
};
use crate::exchange_loop_metrics::ExchangeLoopMetrics;
use backoff::backoff::Backoff;
use futures::{future::FutureExt, select};
use num_traits::One;
use relay_utils::{
metrics::{start as metrics_start, GlobalMetrics, MetricsParams},
retry_backoff, FailedClient, MaybeConnectionError,
};
use std::future::Future;
/// Transactions proofs relay state.
#[derive(Debug)]
pub struct TransactionProofsRelayState<BlockNumber> {
/// Number of last header we have processed so far.
pub best_processed_header_number: BlockNumber,
}
/// Transactions proofs relay storage.
pub trait TransactionProofsRelayStorage: Clone {
/// Associated block number.
type BlockNumber;
/// Get relay state.
fn state(&self) -> TransactionProofsRelayState<Self::BlockNumber>;
/// Update relay state.
fn set_state(&mut self, state: &TransactionProofsRelayState<Self::BlockNumber>);
}
/// In-memory storage for auto-relay loop.
#[derive(Debug, Clone)]
pub struct InMemoryStorage<BlockNumber> {
best_processed_header_number: BlockNumber,
}
impl<BlockNumber> InMemoryStorage<BlockNumber> {
/// Created new in-memory storage with given best processed block number.
pub fn new(best_processed_header_number: BlockNumber) -> Self {
InMemoryStorage {
best_processed_header_number,
}
}
}
impl<BlockNumber: Clone + Copy> TransactionProofsRelayStorage for InMemoryStorage<BlockNumber> {
type BlockNumber = BlockNumber;
fn state(&self) -> TransactionProofsRelayState<BlockNumber> {
TransactionProofsRelayState {
best_processed_header_number: self.best_processed_header_number,
}
}
fn set_state(&mut self, state: &TransactionProofsRelayState<BlockNumber>) {
self.best_processed_header_number = state.best_processed_header_number;
}
}
/// Run proofs synchronization.
pub fn run<P: TransactionProofPipeline>(
storage: impl TransactionProofsRelayStorage<BlockNumber = BlockNumberOf<P>>,
source_client: impl SourceClient<P>,
target_client: impl TargetClient<P>,
metrics_params: Option<MetricsParams>,
exit_signal: impl Future<Output = ()>,
) {
let exit_signal = exit_signal.shared();
let metrics_global = GlobalMetrics::default();
let metrics_exch = ExchangeLoopMetrics::default();
let metrics_enabled = metrics_params.is_some();
metrics_start(
format!("{}_to_{}_Exchange", P::SOURCE_NAME, P::TARGET_NAME),
metrics_params,
&metrics_global,
&metrics_exch,
);
relay_utils::relay_loop::run(
relay_utils::relay_loop::RECONNECT_DELAY,
source_client,
target_client,
|source_client, target_client| {
run_until_connection_lost(
storage.clone(),
source_client,
target_client,
if metrics_enabled {
Some(metrics_global.clone())
} else {
None
},
if metrics_enabled {
Some(metrics_exch.clone())
} else {
None
},
exit_signal.clone(),
)
},
);
}
/// Run proofs synchronization.
async fn run_until_connection_lost<P: TransactionProofPipeline>(
mut storage: impl TransactionProofsRelayStorage<BlockNumber = BlockNumberOf<P>>,
source_client: impl SourceClient<P>,
target_client: impl TargetClient<P>,
metrics_global: Option<GlobalMetrics>,
metrics_exch: Option<ExchangeLoopMetrics>,
exit_signal: impl Future<Output = ()>,
) -> Result<(), FailedClient> {
let mut retry_backoff = retry_backoff();
let mut state = storage.state();
let mut current_finalized_block = None;
let exit_signal = exit_signal.fuse();
futures::pin_mut!(exit_signal);
loop {
let iteration_result = run_loop_iteration(
&mut storage,
&source_client,
&target_client,
&mut state,
&mut current_finalized_block,
metrics_exch.as_ref(),
)
.await;
if let Some(ref metrics_global) = metrics_global {
metrics_global.update().await;
}
if let Err((is_connection_error, failed_client)) = iteration_result {
if is_connection_error {
return Err(failed_client);
}
let retry_timeout = retry_backoff
.next_backoff()
.unwrap_or(relay_utils::relay_loop::RECONNECT_DELAY);
select! {
_ = async_std::task::sleep(retry_timeout).fuse() => {},
_ = exit_signal => return Ok(()),
}
} else {
retry_backoff.reset();
select! {
_ = source_client.tick().fuse() => {},
_ = exit_signal => return Ok(()),
}
}
}
}
/// Run exchange loop until we need to break.
async fn run_loop_iteration<P: TransactionProofPipeline>(
storage: &mut impl TransactionProofsRelayStorage<BlockNumber = BlockNumberOf<P>>,
source_client: &impl SourceClient<P>,
target_client: &impl TargetClient<P>,
state: &mut TransactionProofsRelayState<BlockNumberOf<P>>,
current_finalized_block: &mut Option<(P::Block, RelayedBlockTransactions)>,
exchange_loop_metrics: Option<&ExchangeLoopMetrics>,
) -> Result<(), (bool, FailedClient)> {
let best_finalized_header_id = match target_client.best_finalized_header_id().await {
Ok(best_finalized_header_id) => {
log::debug!(
target: "bridge",
"Got best finalized {} block from {} node: {:?}",
P::SOURCE_NAME,
P::TARGET_NAME,
best_finalized_header_id,
);
best_finalized_header_id
}
Err(err) => {
log::error!(
target: "bridge",
"Failed to retrieve best {} header id from {} node: {:?}. Going to retry...",
P::SOURCE_NAME,
P::TARGET_NAME,
err,
);
return Err((err.is_connection_error(), FailedClient::Target));
}
};
loop {
// if we already have some finalized block body, try to relay its transactions
if let Some((block, relayed_transactions)) = current_finalized_block.take() {
let result = relay_block_transactions(source_client, target_client, &block, relayed_transactions).await;
match result {
Ok(relayed_transactions) => {
log::info!(
target: "bridge",
"Relay has processed {} block #{}. Total/Relayed/Failed transactions: {}/{}/{}",
P::SOURCE_NAME,
state.best_processed_header_number,
relayed_transactions.processed,
relayed_transactions.relayed,
relayed_transactions.failed,
);
state.best_processed_header_number = state.best_processed_header_number + One::one();
storage.set_state(state);
if let Some(ref exchange_loop_metrics) = exchange_loop_metrics {
exchange_loop_metrics.update::<P>(
state.best_processed_header_number,
best_finalized_header_id.0,
relayed_transactions,
);
}
// we have just updated state => proceed to next block retrieval
}
Err((failed_client, relayed_transactions)) => {
*current_finalized_block = Some((block, relayed_transactions));
return Err((true, failed_client));
}
}
}
// we may need to retrieve finalized block body from source node
if best_finalized_header_id.0 > state.best_processed_header_number {
let next_block_number = state.best_processed_header_number + One::one();
let result = source_client.block_by_number(next_block_number).await;
match result {
Ok(block) => {
*current_finalized_block = Some((block, RelayedBlockTransactions::default()));
// we have received new finalized block => go back to relay its transactions
continue;
}
Err(err) => {
log::error!(
target: "bridge",
"Failed to retrieve canonical block #{} from {} node: {:?}. Going to retry...",
next_block_number,
P::SOURCE_NAME,
err,
);
return Err((err.is_connection_error(), FailedClient::Source));
}
}
}
// there are no any transactions we need to relay => wait for new data
return Ok(());
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::exchange::tests::{
test_next_block, test_next_block_id, test_transaction_hash, TestTransactionProof, TestTransactionsSource,
TestTransactionsTarget,
};
use futures::{future::FutureExt, stream::StreamExt};
#[test]
fn exchange_loop_is_able_to_relay_proofs() {
let storage = InMemoryStorage {
best_processed_header_number: 0,
};
let target = TestTransactionsTarget::new(Box::new(|_| unreachable!("no target ticks allowed")));
let target_data = target.data.clone();
let (exit_sender, exit_receiver) = futures::channel::mpsc::unbounded();
let source = TestTransactionsSource::new(Box::new(move |data| {
let transaction1_relayed = target_data
.lock()
.submitted_proofs
.contains(&TestTransactionProof(test_transaction_hash(0)));
let transaction2_relayed = target_data
.lock()
.submitted_proofs
.contains(&TestTransactionProof(test_transaction_hash(1)));
match (transaction1_relayed, transaction2_relayed) {
(true, true) => exit_sender.unbounded_send(()).unwrap(),
(true, false) => {
data.block = Ok(test_next_block());
target_data.lock().best_finalized_header_id = Ok(test_next_block_id());
target_data
.lock()
.transactions_to_accept
.insert(test_transaction_hash(1));
}
_ => (),
}
}));
run(
storage,
source,
target,
None,
exit_receiver.into_future().map(|(_, _)| ()),
);
}
}
@@ -0,0 +1,88 @@
// Copyright 2019-2020 Parity Technologies (UK) Ltd.
// This file is part of Parity Bridges Common.
// Parity Bridges Common is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Bridges Common is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
//! Metrics for currency-exchange relay loop.
use crate::exchange::{BlockNumberOf, RelayedBlockTransactions, TransactionProofPipeline};
use relay_utils::metrics::{register, Counter, CounterVec, GaugeVec, Metrics, Opts, Registry, U64};
/// Exchange transactions relay metrics.
#[derive(Clone)]
pub struct ExchangeLoopMetrics {
/// Best finalized block numbers - "processed" and "known".
best_block_numbers: GaugeVec<U64>,
/// Number of processed blocks ("total").
processed_blocks: Counter<U64>,
/// Number of processed transactions ("total", "relayed" and "failed").
processed_transactions: CounterVec<U64>,
}
impl Metrics for ExchangeLoopMetrics {
fn register(&self, registry: &Registry) -> Result<(), String> {
register(self.best_block_numbers.clone(), registry).map_err(|e| e.to_string())?;
register(self.processed_blocks.clone(), registry).map_err(|e| e.to_string())?;
register(self.processed_transactions.clone(), registry).map_err(|e| e.to_string())?;
Ok(())
}
}
impl Default for ExchangeLoopMetrics {
fn default() -> Self {
ExchangeLoopMetrics {
best_block_numbers: GaugeVec::new(
Opts::new("best_block_numbers", "Best finalized block numbers"),
&["type"],
)
.expect("metric is static and thus valid; qed"),
processed_blocks: Counter::new("processed_blocks", "Total number of processed blocks")
.expect("metric is static and thus valid; qed"),
processed_transactions: CounterVec::new(
Opts::new("processed_transactions", "Total number of processed transactions"),
&["type"],
)
.expect("metric is static and thus valid; qed"),
}
}
}
impl ExchangeLoopMetrics {
/// Update metrics when single block is relayed.
pub fn update<P: TransactionProofPipeline>(
&self,
best_processed_block_number: BlockNumberOf<P>,
best_known_block_number: BlockNumberOf<P>,
relayed_transactions: RelayedBlockTransactions,
) {
self.best_block_numbers
.with_label_values(&["processed"])
.set(best_processed_block_number.into());
self.best_block_numbers
.with_label_values(&["known"])
.set(best_known_block_number.into());
self.processed_blocks.inc();
self.processed_transactions
.with_label_values(&["total"])
.inc_by(relayed_transactions.processed as _);
self.processed_transactions
.with_label_values(&["relayed"])
.inc_by(relayed_transactions.relayed as _);
self.processed_transactions
.with_label_values(&["failed"])
.inc_by(relayed_transactions.failed as _);
}
}
@@ -0,0 +1,26 @@
// Copyright 2019-2020 Parity Technologies (UK) Ltd.
// This file is part of Parity Bridges Common.
// Parity Bridges Common is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Bridges Common is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
//! Relaying [`currency-exchange`](../pallet_bridge_currency_exchange/index.html) application
//! specific data. Currency exchange application allows exchanging tokens between bridged chains.
//! This module provides entrypoints for crafting and submitting (single and multiple)
//! proof-of-exchange-at-source-chain transaction(s) to target chain.
#![warn(missing_docs)]
pub mod exchange;
pub mod exchange_loop;
pub mod exchange_loop_metrics;