Make relay generic over source/target chains (#58)

* renamed to-be-generic files

* make everything required generic over source/target chains

* some more fixes

* cargo fmt --all

* trait functions -> trait constants

* cargo --fmt --all
This commit is contained in:
Svyatoslav Nikolsky
2020-04-08 03:48:15 +03:00
committed by Bastian Köcher
parent c6c46462ab
commit d631178e41
11 changed files with 1305 additions and 917 deletions
+1
View File
@@ -15,6 +15,7 @@ futures = "0.3.1"
jsonrpsee = { git = "https://github.com/paritytech/jsonrpsee.git", default-features = false, features = ["http"] }
linked-hash-map = "0.5.2"
log = "0.4.8"
num-traits = "0.2"
parking_lot = "0.10.0"
rustc-hex = "2.0.1"
serde = { version = "1.0.106", features = ["derive"] }
+41 -40
View File
@@ -2,43 +2,44 @@ name: ethsub-bridge
version: "0.1.0"
author: Parity Technologies <admin@parity.io>
about: Parity Ethereum (PoA) <-> Substrate bridge
args:
- eth-host:
long: eth-host
value_name: ETH_HOST
help: Connect to Ethereum node at given host.
takes_value: true
- eth-port:
long: eth-port
value_name: ETH_PORT
help: Connect to Ethereum node at given port.
takes_value: true
- sub-host:
long: sub-host
value_name: SUB_HOST
help: Connect to Substrate node at given host.
takes_value: true
- sub-port:
long: sub-port
value_name: SUB_PORT
help: Connect to Substrate node at given port.
takes_value: true
- sub-tx-mode:
long: sub-tx-mode
value_name: MODE
help: Submit headers using signed (default) or unsigned transactions. Third mode - backup - submits signed transactions only when we believe that sync has stalled.
takes_value: true
possible_values:
- signed
- unsigned
- backup
- sub-signer:
long: sub-signer
value_name: SUB_SIGNER
help: The SURI of secret key to use when transactions are submitted to the Substrate node.
takes_value: true
- sub-signer-password:
long: sub-signer-password
value_name: SUB_SIGNER_PASSWORD
help: The password for the SURI of secret key to use when transactions are submitted to the Substrate node.
takes_value: true
subcommands:
- eth-to-sub:
about: Synchronize headers from Ethereum node to Substrate node.
args:
- eth-host:
long: eth-host
value_name: ETH_HOST
help: Connect to Ethereum node at given host.
takes_value: true
- eth-port:
long: eth-port
value_name: ETH_PORT
help: Connect to Ethereum node at given port.
takes_value: true
- sub-host:
long: sub-host
value_name: SUB_HOST
help: Connect to Substrate node at given host.
takes_value: true
- sub-port:
long: sub-port
value_name: SUB_PORT
help: Connect to Substrate node at given port.
takes_value: true
- sub-tx-mode:
long: sub-tx-mode
value_name: MODE
help: Submit headers using signed (default) or unsigned transactions. Third mode - backup - submits signed transactions only when we believe that sync has stalled.
takes_value: true
possible_values:
- signed
- unsigned
- backup
- sub-signer:
long: sub-signer
value_name: SUB_SIGNER
help: The SURI of secret key to use when transactions are submitted to the Substrate node.
- sub-signer-password:
long: sub-signer-password
value_name: SUB_SIGNER_PASSWORD
help: The password for the SURI of secret key to use when transactions are submitted to the Substrate node.
@@ -14,8 +14,8 @@
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
use crate::ethereum_sync_loop::MaybeConnectionError;
use crate::ethereum_types::{Header, HeaderId, Receipt, H256, U64};
use crate::ethereum_types::{EthereumHeaderId, Header, Receipt, H256, U64};
use crate::sync_types::MaybeConnectionError;
use jsonrpsee::common::Params;
use jsonrpsee::raw::{RawClient, RawClientError};
use jsonrpsee::transport::http::{HttpTransportClient, RequestError};
@@ -117,9 +117,9 @@ pub async fn header_by_hash(client: Client, hash: H256) -> (Client, Result<Heade
/// Retrieve transactions receipts for given block.
pub async fn transactions_receipts(
mut client: Client,
id: HeaderId,
id: EthereumHeaderId,
transacactions: Vec<H256>,
) -> (Client, Result<(HeaderId, Vec<Receipt>), Error>) {
) -> (Client, Result<(EthereumHeaderId, Vec<Receipt>), Error>) {
let mut transactions_receipts = Vec::with_capacity(transacactions.len());
for transacaction in transacactions {
let (next_client, transaction_receipt) = transaction_receipt(client, transacaction).await;
+153 -442
View File
@@ -15,40 +15,18 @@
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
use crate::ethereum_client;
use crate::ethereum_types::HeaderStatus as EthereumHeaderStatus;
use crate::ethereum_types::{EthereumHeaderId, EthereumHeadersSyncPipeline, Header, QueuedEthereumHeader, Receipt};
use crate::substrate_client;
use futures::{future::FutureExt, stream::StreamExt};
// TODO: when SharedClient will be available, switch to Substrate headers subscription
// (because we do not need old Substrate headers)
use crate::sync::{HeadersSyncParams, TargetTransactionMode};
use crate::sync_loop::{SourceClient, TargetClient};
use futures::future::FutureExt;
use std::{future::Future, pin::Pin};
pub use web3::types::H256;
/// Interval (in ms) at which we check new Ethereum headers when we are synced/almost synced.
const ETHEREUM_TICK_INTERVAL_MS: u64 = 10_000;
/// Interval (in ms) at which we check new Substrate blocks.
const SUBSTRATE_TICK_INTERVAL_MS: u64 = 5_000;
/// When we submit Ethereum headers to Substrate runtime, but see no updates of best
/// Ethereum block known to Substrate runtime during STALL_SYNC_TIMEOUT_MS milliseconds,
/// we consider that our headers are rejected because there has been reorg in Substrate.
/// This reorg could invalidate our knowledge about sync process (i.e. we have asked if
/// HeaderA is known to Substrate, but then reorg happened and the answer is different
/// now) => we need to reset sync.
/// The other option is to receive **EVERY** best Substrate header and check if it is
/// direct child of previous best header. But: (1) subscription doesn't guarantee that
/// the subscriber will receive every best header (2) reorg won't always lead to sync
/// stall and restart is a heavy operation (we forget all in-memory headers).
const STALL_SYNC_TIMEOUT_MS: u64 = 30_000;
/// Delay (in milliseconds) after we have seen update of best Ethereum header in Substrate,
/// for us to treat sync stalled. ONLY when relay operates in backup mode.
const BACKUP_STALL_SYNC_TIMEOUT_MS: u64 = 5 * 60_000;
/// Delay (in milliseconds) after connection-related error happened before we'll try
/// reconnection again.
const CONNECTION_ERROR_DELAY_MS: u64 = 10_000;
/// Error type that can signal connection errors.
pub trait MaybeConnectionError {
/// Returns true if error (maybe) represents connection error.
fn is_connection_error(&self) -> bool;
}
/// Ethereum synchronization parameters.
pub struct EthereumSyncParams {
@@ -60,33 +38,10 @@ pub struct EthereumSyncParams {
pub sub_host: String,
/// Substrate RPC port.
pub sub_port: u16,
/// Substrate transactions submission mode.
pub sub_tx_mode: SubstrateTransactionMode,
/// Substrate transactions signer.
pub sub_signer: sp_core::sr25519::Pair,
/// Maximal number of ethereum headers to pre-download.
pub max_future_headers_to_download: usize,
/// Maximal number of active (we believe) submit header transactions.
pub max_headers_in_submitted_status: usize,
/// Maximal number of headers in single submit request.
pub max_headers_in_single_submit: usize,
/// Maximal total headers size in single submit request.
pub max_headers_size_in_single_submit: usize,
/// We only may store and accept (from Ethereum node) headers that have
/// number >= than best_substrate_header.number - prune_depth.
pub prune_depth: u64,
}
/// Substrate transaction mode.
#[derive(Debug, PartialEq)]
pub enum SubstrateTransactionMode {
/// Submit eth headers using signed substrate transactions.
Signed,
/// Submit eth headers using unsigned substrate transactions.
Unsigned,
/// Submit eth headers using signed substrate transactions, but only when we
/// believe that sync has stalled.
Backup,
/// Synchronization parameters.
pub sync_params: HeadersSyncParams,
}
impl std::fmt::Debug for EthereumSyncParams {
@@ -96,15 +51,7 @@ impl std::fmt::Debug for EthereumSyncParams {
.field("eth_port", &self.eth_port)
.field("sub_host", &self.sub_port)
.field("sub_port", &self.sub_port)
.field("sub_tx_mode", &self.sub_tx_mode)
.field("max_future_headers_to_download", &self.max_future_headers_to_download)
.field("max_headers_in_submitted_status", &self.max_headers_in_submitted_status)
.field("max_headers_in_single_submit", &self.max_headers_in_single_submit)
.field(
"max_headers_size_in_single_submit",
&self.max_headers_size_in_single_submit,
)
.field("prune_depth", &self.prune_depth)
.field("sync_params", &self.sync_params)
.finish()
}
}
@@ -116,394 +63,158 @@ impl Default for EthereumSyncParams {
eth_port: 8545,
sub_host: "localhost".into(),
sub_port: 9933,
sub_tx_mode: SubstrateTransactionMode::Signed,
sub_signer: sp_keyring::AccountKeyring::Alice.pair(),
max_future_headers_to_download: 128,
max_headers_in_submitted_status: 128,
max_headers_in_single_submit: 32,
max_headers_size_in_single_submit: 131_072,
prune_depth: 4096,
sync_params: Default::default(),
}
}
}
/// Ethereum client as headers source.
struct EthereumHeadersSource {
/// Ethereum node client.
client: ethereum_client::Client,
}
impl SourceClient<EthereumHeadersSyncPipeline> for EthereumHeadersSource {
type Error = ethereum_client::Error;
type BestBlockNumberFuture = Pin<Box<dyn Future<Output = (Self, Result<u64, Self::Error>)>>>;
type HeaderByHashFuture = Pin<Box<dyn Future<Output = (Self, Result<Header, Self::Error>)>>>;
type HeaderByNumberFuture = Pin<Box<dyn Future<Output = (Self, Result<Header, Self::Error>)>>>;
type HeaderExtraFuture =
Pin<Box<dyn Future<Output = (Self, Result<(EthereumHeaderId, Vec<Receipt>), Self::Error>)>>>;
fn best_block_number(self) -> Self::BestBlockNumberFuture {
ethereum_client::best_block_number(self.client)
.map(|(client, result)| (EthereumHeadersSource { client }, result))
.boxed()
}
fn header_by_hash(self, hash: H256) -> Self::HeaderByHashFuture {
ethereum_client::header_by_hash(self.client, hash)
.map(|(client, result)| (EthereumHeadersSource { client }, result))
.boxed()
}
fn header_by_number(self, number: u64) -> Self::HeaderByNumberFuture {
ethereum_client::header_by_number(self.client, number)
.map(|(client, result)| (EthereumHeadersSource { client }, result))
.boxed()
}
fn header_extra(self, id: EthereumHeaderId, header: &Header) -> Self::HeaderExtraFuture {
ethereum_client::transactions_receipts(self.client, id, header.transactions.clone())
.map(|(client, result)| (EthereumHeadersSource { client }, result))
.boxed()
}
}
/// Substrate client as Ethereum headers target.
struct SubstrateHeadersTarget {
/// Substrate node client.
client: substrate_client::Client,
/// Substrate transactions signer.
signer: sp_core::sr25519::Pair,
/// Whether we want to submit signed (true), or unsigned (false) transactions.
sign_transactions: bool,
}
impl TargetClient<EthereumHeadersSyncPipeline> for SubstrateHeadersTarget {
type Error = substrate_client::Error;
type BestHeaderIdFuture = Pin<Box<dyn Future<Output = (Self, Result<EthereumHeaderId, Self::Error>)>>>;
type IsKnownHeaderFuture = Pin<Box<dyn Future<Output = (Self, Result<(EthereumHeaderId, bool), Self::Error>)>>>;
type RequiresExtraFuture = Pin<Box<dyn Future<Output = (Self, Result<(EthereumHeaderId, bool), Self::Error>)>>>;
type SubmitHeadersFuture = Pin<Box<dyn Future<Output = (Self, Result<Vec<EthereumHeaderId>, Self::Error>)>>>;
fn best_header_id(self) -> Self::BestHeaderIdFuture {
let (signer, sign_transactions) = (self.signer, self.sign_transactions);
substrate_client::best_ethereum_block(self.client)
.map(move |(client, result)| {
(
SubstrateHeadersTarget {
client,
signer,
sign_transactions,
},
result,
)
})
.boxed()
}
fn is_known_header(self, id: EthereumHeaderId) -> Self::IsKnownHeaderFuture {
let (signer, sign_transactions) = (self.signer, self.sign_transactions);
substrate_client::ethereum_header_known(self.client, id)
.map(move |(client, result)| {
(
SubstrateHeadersTarget {
client,
signer,
sign_transactions,
},
result,
)
})
.boxed()
}
fn requires_extra(self, header: &QueuedEthereumHeader) -> Self::RequiresExtraFuture {
// we can minimize number of receipts_check calls by checking header
// logs bloom here, but it may give us false positives (when authorities
// source is contract, we never need any logs)
let (signer, sign_transactions) = (self.signer, self.sign_transactions);
substrate_client::ethereum_receipts_required(self.client, header.clone())
.map(move |(client, result)| {
(
SubstrateHeadersTarget {
client,
signer,
sign_transactions,
},
result,
)
})
.boxed()
}
fn submit_headers(self, headers: Vec<QueuedEthereumHeader>) -> Self::SubmitHeadersFuture {
let (signer, sign_transactions) = (self.signer, self.sign_transactions);
substrate_client::submit_ethereum_headers(self.client, signer.clone(), headers, sign_transactions)
.map(move |(client, result)| {
(
SubstrateHeadersTarget {
client,
signer,
sign_transactions,
},
result.map(|(_, submitted_headers)| submitted_headers),
)
})
.boxed()
}
}
/// Run Ethereum headers synchronization.
pub fn run(params: EthereumSyncParams) {
let mut local_pool = futures::executor::LocalPool::new();
let mut progress_context = (std::time::Instant::now(), None, None);
let sign_sub_transactions = match params.sub_tx_mode {
SubstrateTransactionMode::Signed | SubstrateTransactionMode::Backup => true,
SubstrateTransactionMode::Unsigned => false,
let eth_uri = format!("http://{}:{}", params.eth_host, params.eth_port);
let eth_client = ethereum_client::client(&eth_uri);
let sub_uri = format!("http://{}:{}", params.sub_host, params.sub_port);
let sub_client = substrate_client::client(&sub_uri);
let sub_signer = params.sub_signer;
let sign_sub_transactions = match params.sync_params.target_tx_mode {
TargetTransactionMode::Signed | TargetTransactionMode::Backup => true,
TargetTransactionMode::Unsigned => false,
};
local_pool.run_until(async move {
let eth_uri = format!("http://{}:{}", params.eth_host, params.eth_port);
let sub_uri = format!("http://{}:{}", params.sub_host, params.sub_port);
let sub_signer = params.sub_signer.clone();
let mut eth_sync = crate::ethereum_sync::HeadersSync::new(params);
let mut stall_countdown = None;
let mut last_update_time = std::time::Instant::now();
let mut eth_maybe_client = None;
let mut eth_best_block_number_required = false;
let eth_best_block_number_future = ethereum_client::best_block_number(ethereum_client::client(&eth_uri)).fuse();
let eth_new_header_future = futures::future::Fuse::terminated();
let eth_orphan_header_future = futures::future::Fuse::terminated();
let eth_receipts_future = futures::future::Fuse::terminated();
let eth_go_offline_future = futures::future::Fuse::terminated();
let eth_tick_stream = interval(ETHEREUM_TICK_INTERVAL_MS).fuse();
let mut sub_maybe_client = None;
let mut sub_best_block_required = false;
let sub_best_block_future =
substrate_client::best_ethereum_block(substrate_client::client(&sub_uri, sub_signer)).fuse();
let sub_receipts_check_future = futures::future::Fuse::terminated();
let sub_existence_status_future = futures::future::Fuse::terminated();
let sub_submit_header_future = futures::future::Fuse::terminated();
let sub_go_offline_future = futures::future::Fuse::terminated();
let sub_tick_stream = interval(SUBSTRATE_TICK_INTERVAL_MS).fuse();
futures::pin_mut!(
eth_best_block_number_future,
eth_new_header_future,
eth_orphan_header_future,
eth_receipts_future,
eth_go_offline_future,
eth_tick_stream,
sub_best_block_future,
sub_receipts_check_future,
sub_existence_status_future,
sub_submit_header_future,
sub_go_offline_future,
sub_tick_stream
);
loop {
futures::select! {
(eth_client, eth_best_block_number) = eth_best_block_number_future => {
eth_best_block_number_required = false;
process_future_result(
&mut eth_maybe_client,
eth_client,
eth_best_block_number,
|eth_best_block_number| eth_sync.ethereum_best_header_number_response(eth_best_block_number),
&mut eth_go_offline_future,
|eth_client| delay(CONNECTION_ERROR_DELAY_MS, eth_client),
"Error retrieving best header number from Ethereum number",
);
},
(eth_client, eth_new_header) = eth_new_header_future => {
process_future_result(
&mut eth_maybe_client,
eth_client,
eth_new_header,
|eth_new_header| eth_sync.headers_mut().header_response(eth_new_header),
&mut eth_go_offline_future,
|eth_client| delay(CONNECTION_ERROR_DELAY_MS, eth_client),
"Error retrieving header from Ethereum node",
);
},
(eth_client, eth_orphan_header) = eth_orphan_header_future => {
process_future_result(
&mut eth_maybe_client,
eth_client,
eth_orphan_header,
|eth_orphan_header| eth_sync.headers_mut().header_response(eth_orphan_header),
&mut eth_go_offline_future,
|eth_client| delay(CONNECTION_ERROR_DELAY_MS, eth_client),
"Error retrieving orphan header from Ethereum node",
);
},
(eth_client, eth_receipts) = eth_receipts_future => {
process_future_result(
&mut eth_maybe_client,
eth_client,
eth_receipts,
|(header, receipts)| eth_sync.headers_mut().receipts_response(&header, receipts),
&mut eth_go_offline_future,
|eth_client| delay(CONNECTION_ERROR_DELAY_MS, eth_client),
"Error retrieving transactions receipts from Ethereum node",
);
},
eth_client = eth_go_offline_future => {
eth_maybe_client = Some(eth_client);
},
_ = eth_tick_stream.next() => {
if eth_sync.is_almost_synced() {
eth_best_block_number_required = true;
}
},
(sub_client, sub_best_block) = sub_best_block_future => {
sub_best_block_required = false;
process_future_result(
&mut sub_maybe_client,
sub_client,
sub_best_block,
|sub_best_block| {
let head_updated = eth_sync.substrate_best_header_response(sub_best_block);
if head_updated {
last_update_time = std::time::Instant::now();
}
match head_updated {
// IF head is updated AND there are still our transactions:
// => restart stall countdown timer
true if eth_sync.headers().headers_in_status(EthereumHeaderStatus::Submitted) != 0 =>
stall_countdown = Some(std::time::Instant::now()),
// IF head is updated AND there are no our transactions:
// => stop stall countdown timer
true => stall_countdown = None,
// IF head is not updated AND stall countdown is not yet completed
// => do nothing
false if stall_countdown
.map(|stall_countdown| std::time::Instant::now() - stall_countdown <
std::time::Duration::from_millis(STALL_SYNC_TIMEOUT_MS))
.unwrap_or(true)
=> (),
// IF head is not updated AND stall countdown has completed
// => restart sync
false => {
log::info!(
target: "bridge",
"Possible Substrate fork detected. Restarting Ethereum headers synchronization.",
);
stall_countdown = None;
eth_sync.restart();
},
}
},
&mut sub_go_offline_future,
|sub_client| delay(CONNECTION_ERROR_DELAY_MS, sub_client),
"Error retrieving best known header from Substrate node",
);
},
(sub_client, sub_existence_status) = sub_existence_status_future => {
process_future_result(
&mut sub_maybe_client,
sub_client,
sub_existence_status,
|(sub_header, sub_existence_status)| eth_sync
.headers_mut()
.maybe_orphan_response(&sub_header, sub_existence_status),
&mut sub_go_offline_future,
|sub_client| delay(CONNECTION_ERROR_DELAY_MS, sub_client),
"Error retrieving existence status from Substrate node",
);
},
(sub_client, sub_submit_header_result) = sub_submit_header_future => {
process_future_result(
&mut sub_maybe_client,
sub_client,
sub_submit_header_result,
|(_, submitted_headers)| eth_sync.headers_mut().headers_submitted(submitted_headers),
&mut sub_go_offline_future,
|sub_client| delay(CONNECTION_ERROR_DELAY_MS, sub_client),
"Error submitting headers to Substrate node",
);
},
(sub_client, sub_receipts_check_result) = sub_receipts_check_future => {
// we can minimize number of receipts_check calls by checking header
// logs bloom here, but it may give us false positives (when authorities
// source is contract, we never need any logs)
process_future_result(
&mut sub_maybe_client,
sub_client,
sub_receipts_check_result,
|(header, receipts_check_result)| eth_sync
.headers_mut()
.maybe_receipts_response(&header, receipts_check_result),
&mut sub_go_offline_future,
|sub_client| delay(CONNECTION_ERROR_DELAY_MS, sub_client),
"Error retrieving receipts requirement from Substrate node",
);
},
sub_client = sub_go_offline_future => {
sub_maybe_client = Some(sub_client);
},
_ = sub_tick_stream.next() => {
sub_best_block_required = true;
},
}
// print progress
progress_context = print_progress(progress_context, &eth_sync);
// if client is available: wait, or call Substrate RPC methods
if let Some(sub_client) = sub_maybe_client.take() {
// the priority is to:
// 1) get best block - it stops us from downloading/submitting new blocks + we call it rarely;
// 2) check transactions receipts - it stops us from downloading/submitting new blocks;
// 3) check existence - it stops us from submitting new blocks;
// 4) submit header
if sub_best_block_required {
log::debug!(target: "bridge", "Asking Substrate about best block");
sub_best_block_future.set(substrate_client::best_ethereum_block(sub_client).fuse());
} else if let Some(header) = eth_sync.headers().header(EthereumHeaderStatus::MaybeReceipts) {
log::debug!(
target: "bridge",
"Checking if header submission requires receipts: {:?}",
header.id(),
);
let header = header.clone();
sub_receipts_check_future
.set(substrate_client::ethereum_receipts_required(sub_client, header).fuse());
} else if let Some(header) = eth_sync.headers().header(EthereumHeaderStatus::MaybeOrphan) {
// for MaybeOrphan we actually ask for parent' header existence
let parent_id = header.parent_id();
log::debug!(
target: "bridge",
"Asking Substrate node for existence of: {:?}",
parent_id,
);
sub_existence_status_future
.set(substrate_client::ethereum_header_known(sub_client, parent_id).fuse());
} else if let Some(headers) = eth_sync.select_headers_to_submit(
last_update_time.elapsed() > std::time::Duration::from_millis(BACKUP_STALL_SYNC_TIMEOUT_MS),
) {
let ids = match headers.len() {
1 => format!("{:?}", headers[0].id()),
2 => format!("[{:?}, {:?}]", headers[0].id(), headers[1].id()),
len => format!("[{:?} ... {:?}]", headers[0].id(), headers[len - 1].id()),
};
log::debug!(
target: "bridge",
"Submitting {} header(s) to Substrate node: {:?}",
headers.len(),
ids,
);
let headers = headers.into_iter().cloned().collect();
sub_submit_header_future.set(
substrate_client::submit_ethereum_headers(sub_client, headers, sign_sub_transactions).fuse(),
);
// remember that we have submitted some headers
if stall_countdown.is_none() {
stall_countdown = Some(std::time::Instant::now());
}
} else {
sub_maybe_client = Some(sub_client);
}
}
// if client is available: wait, or call Ethereum RPC methods
if let Some(eth_client) = eth_maybe_client.take() {
// the priority is to:
// 1) get best block - it stops us from downloading new blocks + we call it rarely;
// 2) check transactions receipts - it stops us from downloading/submitting new blocks;
// 3) check existence - it stops us from submitting new blocks;
// 4) submit header
if eth_best_block_number_required {
log::debug!(target: "bridge", "Asking Ethereum node about best block number");
eth_best_block_number_future.set(ethereum_client::best_block_number(eth_client).fuse());
} else if let Some(header) = eth_sync.headers().header(EthereumHeaderStatus::Receipts) {
let id = header.id();
log::debug!(
target: "bridge",
"Retrieving receipts for header: {:?}",
id,
);
eth_receipts_future.set(
ethereum_client::transactions_receipts(eth_client, id, header.header().transactions.clone())
.fuse(),
);
} else if let Some(header) = eth_sync.headers().header(EthereumHeaderStatus::Orphan) {
// for Orphan we actually ask for parent' header
let parent_id = header.parent_id();
log::debug!(
target: "bridge",
"Going to download orphan header from Ethereum node: {:?}",
parent_id,
);
eth_orphan_header_future.set(ethereum_client::header_by_hash(eth_client, parent_id.1).fuse());
} else if let Some(id) = eth_sync.select_new_header_to_download() {
log::debug!(
target: "bridge",
"Going to download new header from Ethereum node: {:?}",
id,
);
eth_new_header_future.set(ethereum_client::header_by_number(eth_client, id).fuse());
} else {
eth_maybe_client = Some(eth_client);
}
}
}
});
}
fn print_progress(
progress_context: (std::time::Instant, Option<u64>, Option<u64>),
eth_sync: &crate::ethereum_sync::HeadersSync,
) -> (std::time::Instant, Option<u64>, Option<u64>) {
let (prev_time, prev_best_header, prev_target_header) = progress_context;
let now_time = std::time::Instant::now();
let (now_best_header, now_target_header) = eth_sync.status();
let need_update = now_time - prev_time > std::time::Duration::from_secs(10)
|| match (prev_best_header, now_best_header) {
(Some(prev_best_header), Some(now_best_header)) => now_best_header.0.saturating_sub(prev_best_header) > 10,
_ => false,
};
if !need_update {
return (prev_time, prev_best_header, prev_target_header);
}
log::info!(
target: "bridge",
"Synced {:?} of {:?} headers",
now_best_header.map(|id| id.0),
now_target_header,
crate::sync_loop::run(
EthereumHeadersSource { client: eth_client },
ETHEREUM_TICK_INTERVAL_MS,
SubstrateHeadersTarget {
client: sub_client,
signer: sub_signer,
sign_transactions: sign_sub_transactions,
},
SUBSTRATE_TICK_INTERVAL_MS,
params.sync_params,
);
(now_time, now_best_header.clone().map(|id| id.0), *now_target_header)
}
async fn delay<T>(timeout_ms: u64, retval: T) -> T {
async_std::task::sleep(std::time::Duration::from_millis(timeout_ms)).await;
retval
}
fn interval(timeout_ms: u64) -> impl futures::Stream<Item = ()> {
futures::stream::unfold((), move |_| async move {
delay(timeout_ms, ()).await;
Some(((), ()))
})
}
fn process_future_result<TClient, TResult, TError, TGoOfflineFuture>(
maybe_client: &mut Option<TClient>,
client: TClient,
result: Result<TResult, TError>,
on_success: impl FnOnce(TResult),
go_offline_future: &mut std::pin::Pin<&mut futures::future::Fuse<TGoOfflineFuture>>,
go_offline: impl FnOnce(TClient) -> TGoOfflineFuture,
error_pattern: &'static str,
) where
TError: std::fmt::Debug + MaybeConnectionError,
TGoOfflineFuture: FutureExt,
{
match result {
Ok(result) => {
*maybe_client = Some(client);
on_success(result);
}
Err(error) => {
if error.is_connection_error() {
go_offline_future.set(go_offline(client).fuse());
} else {
*maybe_client = Some(client);
}
log::error!(target: "bridge", "{}: {:?}", error_pattern, error);
}
}
}
+33 -70
View File
@@ -14,6 +14,10 @@
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
use crate::substrate_types::{into_substrate_ethereum_header, into_substrate_ethereum_receipts};
use crate::sync_types::{HeaderId, HeadersSyncPipeline, QueuedHeader, SourceHeader};
use codec::Encode;
pub use web3::types::{Bytes, H256, U128, U64};
/// When header is just received from the Ethereum node, we check that it has
@@ -30,84 +34,43 @@ pub type Header = web3::types::Block<H256>;
/// Ethereum transaction receipt type.
pub type Receipt = web3::types::TransactionReceipt;
/// Ethereum header Id.
#[derive(Debug, Clone, Copy, PartialEq)]
pub struct HeaderId(pub u64, pub H256);
/// Ethereum header ID.
pub type EthereumHeaderId = HeaderId<H256, u64>;
impl From<&Header> for HeaderId {
fn from(header: &Header) -> HeaderId {
HeaderId(
header.number.expect(HEADER_ID_PROOF).as_u64(),
header.hash.expect(HEADER_ID_PROOF),
)
}
}
/// Queued ethereum header ID.
pub type QueuedEthereumHeader = QueuedHeader<EthereumHeadersSyncPipeline>;
/// Ethereum header synchronization status.
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum HeaderStatus {
/// Header is unknown.
Unknown,
/// Header is in MaybeOrphan queue.
MaybeOrphan,
/// Header is in Orphan queue.
Orphan,
/// Header is in MaybeReceipts queue.
MaybeReceipts,
/// Header is in Receipts queue.
Receipts,
/// Header is in Ready queue.
Ready,
/// Header has been recently submitted to the Substrate runtime.
Submitted,
/// Header is known to the Substrate runtime.
Synced,
}
#[derive(Clone, Debug, Default)]
/// Ethereum synchronization pipeline.
#[derive(Clone, Copy, Debug)]
#[cfg_attr(test, derive(PartialEq))]
pub struct QueuedHeader {
header: Header,
receipts: Option<Vec<Receipt>>,
pub struct EthereumHeadersSyncPipeline;
impl HeadersSyncPipeline for EthereumHeadersSyncPipeline {
const SOURCE_NAME: &'static str = "Ethereum";
const TARGET_NAME: &'static str = "Substrate";
type Hash = H256;
type Number = u64;
type Header = Header;
type Extra = Vec<Receipt>;
fn estimate_size(source: &QueuedHeader<Self>) -> usize {
into_substrate_ethereum_header(source.header()).encode().len()
+ into_substrate_ethereum_receipts(source.extra())
.map(|extra| extra.encode().len())
.unwrap_or(0)
}
}
impl QueuedHeader {
/// Creates new queued header.
pub fn new(header: Header) -> Self {
QueuedHeader { header, receipts: None }
}
/// Returns ID of header.
pub fn id(&self) -> HeaderId {
(&self.header).into()
}
/// Returns ID of parent header.
pub fn parent_id(&self) -> HeaderId {
impl SourceHeader<H256, u64> for Header {
fn id(&self) -> EthereumHeaderId {
HeaderId(
self.header.number.expect(HEADER_ID_PROOF).as_u64() - 1,
self.header.parent_hash,
self.number.expect(HEADER_ID_PROOF).as_u64(),
self.hash.expect(HEADER_ID_PROOF),
)
}
/// Returns reference to header.
pub fn header(&self) -> &Header {
&self.header
}
/// Returns reference to transactions receipts.
pub fn receipts(&self) -> &Option<Vec<Receipt>> {
&self.receipts
}
/// Extract header and receipts from self.
pub fn extract(self) -> (Header, Option<Vec<Receipt>>) {
(self.header, self.receipts)
}
/// Set associated transaction receipts.
pub fn set_receipts(mut self, receipts: Vec<Receipt>) -> Self {
self.receipts = Some(receipts);
self
fn parent_id(&self) -> EthereumHeaderId {
HeaderId(self.number.expect(HEADER_ID_PROOF).as_u64() - 1, self.parent_hash)
}
}
+26 -16
View File
@@ -17,12 +17,14 @@
#![recursion_limit = "1024"]
mod ethereum_client;
mod ethereum_headers;
mod ethereum_sync;
mod ethereum_sync_loop;
mod ethereum_types;
mod headers;
mod substrate_client;
mod substrate_types;
mod sync;
mod sync_loop;
mod sync_types;
use sp_core::crypto::Pair;
use std::io::Write;
@@ -30,13 +32,24 @@ use std::io::Write;
fn main() {
initialize();
ethereum_sync_loop::run(match ethereum_sync_params() {
Ok(ethereum_sync_params) => ethereum_sync_params,
Err(err) => {
log::error!(target: "bridge", "Error parsing parameters: {}", err);
let yaml = clap::load_yaml!("cli.yml");
let matches = clap::App::from_yaml(yaml).get_matches();
match matches.subcommand() {
("eth-to-sub", Some(eth_to_sub_matches)) => {
ethereum_sync_loop::run(match ethereum_sync_params(&eth_to_sub_matches) {
Ok(ethereum_sync_params) => ethereum_sync_params,
Err(err) => {
log::error!(target: "bridge", "Error parsing parameters: {}", err);
return;
}
});
}
("", _) => {
log::error!(target: "bridge", "No subcommand specified");
return;
}
});
_ => unreachable!("all possible subcommands are checked above; qed"),
}
}
fn initialize() {
@@ -76,10 +89,7 @@ fn initialize() {
builder.init();
}
fn ethereum_sync_params() -> Result<ethereum_sync_loop::EthereumSyncParams, String> {
let yaml = clap::load_yaml!("cli.yml");
let matches = clap::App::from_yaml(yaml).get_matches();
fn ethereum_sync_params(matches: &clap::ArgMatches) -> Result<ethereum_sync_loop::EthereumSyncParams, String> {
let mut eth_sync_params = ethereum_sync_loop::EthereumSyncParams::default();
if let Some(eth_host) = matches.value_of("eth-host") {
eth_sync_params.eth_host = eth_host.into();
@@ -100,16 +110,16 @@ fn ethereum_sync_params() -> Result<ethereum_sync_loop::EthereumSyncParams, Stri
}
match matches.value_of("sub-tx-mode") {
Some("signed") => eth_sync_params.sub_tx_mode = ethereum_sync_loop::SubstrateTransactionMode::Signed,
Some("signed") => eth_sync_params.sync_params.target_tx_mode = sync::TargetTransactionMode::Signed,
Some("unsigned") => {
eth_sync_params.sub_tx_mode = ethereum_sync_loop::SubstrateTransactionMode::Unsigned;
eth_sync_params.sync_params.target_tx_mode = sync::TargetTransactionMode::Unsigned;
// tx pool won't accept too much unsigned transactions
eth_sync_params.max_headers_in_submitted_status = 10;
eth_sync_params.sync_params.max_headers_in_submitted_status = 10;
}
Some("backup") => eth_sync_params.sub_tx_mode = ethereum_sync_loop::SubstrateTransactionMode::Backup,
Some("backup") => eth_sync_params.sync_params.target_tx_mode = sync::TargetTransactionMode::Backup,
Some(mode) => return Err(format!("Invalid sub-tx-mode: {}", mode)),
None => eth_sync_params.sub_tx_mode = ethereum_sync_loop::SubstrateTransactionMode::Signed,
None => eth_sync_params.sync_params.target_tx_mode = sync::TargetTransactionMode::Signed,
}
Ok(eth_sync_params)
+10 -12
View File
@@ -14,9 +14,9 @@
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
use crate::ethereum_sync_loop::MaybeConnectionError;
use crate::ethereum_types::{Bytes, HeaderId as EthereumHeaderId, QueuedHeader as QueuedEthereumHeader, H256};
use crate::ethereum_types::{Bytes, EthereumHeaderId, QueuedEthereumHeader, H256};
use crate::substrate_types::{into_substrate_ethereum_header, into_substrate_ethereum_receipts, TransactionHash};
use crate::sync_types::{HeaderId, MaybeConnectionError, SourceHeader};
use codec::{Decode, Encode};
use jsonrpsee::common::Params;
use jsonrpsee::raw::{RawClient, RawClientError};
@@ -29,8 +29,6 @@ use sp_runtime::traits::IdentifyAccount;
pub struct Client {
/// Substrate RPC client.
rpc_client: RawClient<HttpTransportClient>,
/// Transactions signer.
signer: sp_core::sr25519::Pair,
/// Genesis block hash.
genesis_hash: Option<H256>,
}
@@ -58,11 +56,10 @@ impl MaybeConnectionError for Error {
}
/// Returns client that is able to call RPCs on Substrate node.
pub fn client(uri: &str, signer: sp_core::sr25519::Pair) -> Client {
pub fn client(uri: &str) -> Client {
let transport = HttpTransportClient::new(uri);
Client {
rpc_client: RawClient::new(transport),
signer,
genesis_hash: None,
}
}
@@ -78,7 +75,7 @@ pub async fn best_ethereum_block(client: Client) -> (Client, Result<EthereumHead
]),
)
.await;
(client, result.map(|(num, hash)| EthereumHeaderId(num, hash)))
(client, result.map(|(num, hash)| HeaderId(num, hash)))
}
/// Returns true if transactions receipts are required for Ethereum header submission.
@@ -86,7 +83,7 @@ pub async fn ethereum_receipts_required(
client: Client,
header: QueuedEthereumHeader,
) -> (Client, Result<(EthereumHeaderId, bool), Error>) {
let id = header.id();
let id = header.header().id();
let header = into_substrate_ethereum_header(header.header());
let encoded_header = header.encode();
let (client, receipts_required) = call_rpc(
@@ -131,11 +128,12 @@ pub async fn ethereum_header_known(
/// Submits Ethereum header to Substrate runtime.
pub async fn submit_ethereum_headers(
client: Client,
signer: sp_core::sr25519::Pair,
headers: Vec<QueuedEthereumHeader>,
sign_transactions: bool,
) -> (Client, Result<(Vec<TransactionHash>, Vec<EthereumHeaderId>), Error>) {
match sign_transactions {
true => submit_signed_ethereum_headers(client, headers).await,
true => submit_signed_ethereum_headers(client, signer, headers).await,
false => submit_unsigned_ethereum_headers(client, headers).await,
}
}
@@ -143,6 +141,7 @@ pub async fn submit_ethereum_headers(
/// Submits signed Ethereum header to Substrate runtime.
pub async fn submit_signed_ethereum_headers(
client: Client,
signer: sp_core::sr25519::Pair,
headers: Vec<QueuedEthereumHeader>,
) -> (Client, Result<(Vec<TransactionHash>, Vec<EthereumHeaderId>), Error>) {
let ids = headers.iter().map(|header| header.id()).collect();
@@ -158,15 +157,14 @@ pub async fn submit_signed_ethereum_headers(
(client, genesis_hash)
}
};
let account_id = client.signer.public().as_array_ref().clone().into();
let account_id = signer.public().as_array_ref().clone().into();
let (client, nonce) = next_account_index(client, account_id).await;
let nonce = match nonce {
Ok(nonce) => nonce,
Err(err) => return (client, Err(err)),
};
let transaction = create_signed_submit_transaction(headers, &client.signer, nonce, genesis_hash);
let transaction = create_signed_submit_transaction(headers, &signer, nonce, genesis_hash);
let encoded_transaction = transaction.encode();
let (client, transaction_hash) = call_rpc(
client,
@@ -14,69 +14,97 @@
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
use crate::ethereum_headers::QueuedHeaders;
use crate::ethereum_sync_loop::{EthereumSyncParams, SubstrateTransactionMode};
use crate::ethereum_types::{HeaderId, HeaderStatus, QueuedHeader};
use crate::substrate_types::{into_substrate_ethereum_header, into_substrate_ethereum_receipts};
use codec::Encode;
use crate::headers::QueuedHeaders;
use crate::sync_types::{HeaderId, HeaderStatus, HeadersSyncPipeline, QueuedHeader};
use num_traits::{One, Saturating};
/// Ethereum headers synchronization context.
/// Common sync params.
#[derive(Debug)]
pub struct HeadersSync {
/// Synchronization parameters.
params: EthereumSyncParams,
/// Best header number known to Ethereum node.
target_header_number: Option<u64>,
/// Best header known to Substrate node.
best_header: Option<HeaderId>,
/// Headers queue.
headers: QueuedHeaders,
pub struct HeadersSyncParams {
/// Maximal number of ethereum headers to pre-download.
pub max_future_headers_to_download: usize,
/// Maximal number of active (we believe) submit header transactions.
pub max_headers_in_submitted_status: usize,
/// Maximal number of headers in single submit request.
pub max_headers_in_single_submit: usize,
/// Maximal total headers size in single submit request.
pub max_headers_size_in_single_submit: usize,
/// We only may store and accept (from Ethereum node) headers that have
/// number >= than best_substrate_header.number - prune_depth.
pub prune_depth: u32,
/// Target transactions mode.
pub target_tx_mode: TargetTransactionMode,
}
impl HeadersSync {
/// Creates new Ethereum headers synchronizer.
pub fn new(params: EthereumSyncParams) -> Self {
/// Target transaction mode.
#[derive(Debug, PartialEq)]
pub enum TargetTransactionMode {
/// Submit new headers using signed transactions.
Signed,
/// Submit new headers using unsigned transactions.
Unsigned,
/// Submit new headers using signed transactions, but only when we
/// believe that sync has stalled.
Backup,
}
/// Headers synchronization context.
#[derive(Debug)]
pub struct HeadersSync<P: HeadersSyncPipeline> {
/// Synchronization parameters.
params: HeadersSyncParams,
/// Best header number known to source node.
source_best_number: Option<P::Number>,
/// Best header known to target node.
target_best_header: Option<HeaderId<P::Hash, P::Number>>,
/// Headers queue.
headers: QueuedHeaders<P>,
}
impl<P: HeadersSyncPipeline> HeadersSync<P> {
/// Creates new headers synchronizer.
pub fn new(params: HeadersSyncParams) -> Self {
HeadersSync {
headers: QueuedHeaders::new(),
params,
target_header_number: None,
best_header: None,
headers: Default::default(),
source_best_number: None,
target_best_header: None,
}
}
/// Returns true if we have synced almost all known headers.
pub fn is_almost_synced(&self) -> bool {
match self.target_header_number {
Some(target_header_number) => self
.best_header
.map(|best| target_header_number.saturating_sub(best.0) < 4)
match self.source_best_number {
Some(source_best_number) => self
.target_best_header
.map(|best| source_best_number.saturating_sub(best.0) < 4.into())
.unwrap_or(false),
None => true,
}
}
/// Returns synchronization status.
pub fn status(&self) -> (&Option<HeaderId>, &Option<u64>) {
(&self.best_header, &self.target_header_number)
pub fn status(&self) -> (&Option<HeaderId<P::Hash, P::Number>>, &Option<P::Number>) {
(&self.target_best_header, &self.source_best_number)
}
/// Returns reference to the headers queue.
pub fn headers(&self) -> &QueuedHeaders {
pub fn headers(&self) -> &QueuedHeaders<P> {
&self.headers
}
/// Returns mutable reference to the headers queue.
pub fn headers_mut(&mut self) -> &mut QueuedHeaders {
pub fn headers_mut(&mut self) -> &mut QueuedHeaders<P> {
&mut self.headers
}
/// Select header that needs to be downloaded from the Ethereum node.
pub fn select_new_header_to_download(&self) -> Option<u64> {
// if we haven't received best header from Ethereum node yet, there's nothing we can download
let target_header_number = self.target_header_number.clone()?;
/// Select header that needs to be downloaded from the source node.
pub fn select_new_header_to_download(&self) -> Option<P::Number> {
// if we haven't received best header from source node yet, there's nothing we can download
let source_best_number = self.source_best_number.clone()?;
// if we haven't received known best header from Substrate node yet, there's nothing we can download
let best_header = self.best_header.as_ref()?;
// if we haven't received known best header from target node yet, there's nothing we can download
let target_best_header = self.target_best_header.as_ref()?;
// if there's too many headers in the queue, stop downloading
let in_memory_headers = self.headers.total_headers();
@@ -85,19 +113,19 @@ impl HeadersSync {
}
// we assume that there were no reorgs if we have already downloaded best header
let best_downloaded_number = std::cmp::max(self.headers.best_queued_number(), best_header.0);
if best_downloaded_number == target_header_number {
let best_downloaded_number = std::cmp::max(self.headers.best_queued_number(), target_best_header.0);
if best_downloaded_number == source_best_number {
return None;
}
// download new header
Some(best_downloaded_number + 1)
Some(best_downloaded_number + One::one())
}
/// Select headers that need to be submitted to the Substrate node.
pub fn select_headers_to_submit(&self, stalled: bool) -> Option<Vec<&QueuedHeader>> {
/// Select headers that need to be submitted to the target node.
pub fn select_headers_to_submit(&self, stalled: bool) -> Option<Vec<&QueuedHeader<P>>> {
// if we operate in backup mode, we only submit headers when sync has stalled
if self.params.sub_tx_mode == SubstrateTransactionMode::Backup && !stalled {
if self.params.target_tx_mode == TargetTransactionMode::Backup && !stalled {
return None;
}
@@ -117,10 +145,7 @@ impl HeadersSync {
return false;
}
let encoded_size = into_substrate_ethereum_header(header.header()).encode().len()
+ into_substrate_ethereum_receipts(header.receipts())
.map(|receipts| receipts.encode().len())
.unwrap_or(0);
let encoded_size = P::estimate_size(header);
if total_headers != 0 && total_size + encoded_size > self.params.max_headers_size_in_single_submit {
return false;
}
@@ -132,48 +157,72 @@ impl HeadersSync {
})
}
/// Receive new target header number from the Ethereum node.
pub fn ethereum_best_header_number_response(&mut self, best_header_number: u64) {
log::debug!(target: "bridge", "Received best header number from Ethereum: {}", best_header_number);
self.target_header_number = Some(best_header_number);
/// Receive new target header number from the source node.
pub fn source_best_header_number_response(&mut self, best_header_number: P::Number) {
log::debug!(
target: "bridge",
"Received best header number from {} node: {}",
P::SOURCE_NAME,
best_header_number,
);
self.source_best_number = Some(best_header_number);
}
/// Receive new best header from the Substrate node.
/// Receive new best header from the target node.
/// Returns true if it is different from the previous block known to us.
pub fn substrate_best_header_response(&mut self, best_header: HeaderId) -> bool {
log::debug!(target: "bridge", "Received best known header from Substrate: {:?}", best_header);
pub fn target_best_header_response(&mut self, best_header: HeaderId<P::Hash, P::Number>) -> bool {
log::debug!(
target: "bridge",
"Received best known header from {}: {:?}",
P::TARGET_NAME,
best_header,
);
// early return if it is still the same
if self.best_header == Some(best_header) {
if self.target_best_header == Some(best_header) {
return false;
}
// remember that this header is now known to the Substrate runtime
self.headers.substrate_best_header_response(&best_header);
self.headers.target_best_header_response(&best_header);
// prune ancient headers
self.headers
.prune(best_header.0.saturating_sub(self.params.prune_depth));
.prune(best_header.0.saturating_sub(self.params.prune_depth.into()));
// finally remember the best header itself
self.best_header = Some(best_header);
self.target_best_header = Some(best_header);
true
}
/// Restart synchronization.
pub fn restart(&mut self) {
self.target_header_number = None;
self.best_header = None;
self.source_best_number = None;
self.target_best_header = None;
self.headers.clear();
}
}
impl Default for HeadersSyncParams {
fn default() -> Self {
HeadersSyncParams {
max_future_headers_to_download: 128,
max_headers_in_submitted_status: 128,
max_headers_in_single_submit: 32,
max_headers_size_in_single_submit: 131_072,
prune_depth: 4096,
target_tx_mode: TargetTransactionMode::Signed,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::ethereum_headers::tests::{header, id};
use crate::ethereum_types::{HeaderStatus, H256};
use crate::ethereum_types::{EthereumHeadersSyncPipeline, H256};
use crate::headers::tests::{header, id};
use crate::sync_types::HeaderStatus;
fn side_hash(number: u64) -> H256 {
H256::from_low_u64_le(1000 + number)
@@ -181,26 +230,26 @@ mod tests {
#[test]
fn select_new_header_to_download_works() {
let mut eth_sync = HeadersSync::new(Default::default());
let mut eth_sync = HeadersSync::<EthereumHeadersSyncPipeline>::new(Default::default());
// both best && target headers are unknown
assert_eq!(eth_sync.select_new_header_to_download(), None);
// best header is known, target header is unknown
eth_sync.best_header = Some(HeaderId(0, Default::default()));
eth_sync.target_best_header = Some(HeaderId(0, Default::default()));
assert_eq!(eth_sync.select_new_header_to_download(), None);
// target header is known, best header is unknown
eth_sync.best_header = None;
eth_sync.target_header_number = Some(100);
eth_sync.target_best_header = None;
eth_sync.source_best_number = Some(100);
assert_eq!(eth_sync.select_new_header_to_download(), None);
// when our best block has the same number as the target
eth_sync.best_header = Some(HeaderId(100, Default::default()));
eth_sync.target_best_header = Some(HeaderId(100, Default::default()));
assert_eq!(eth_sync.select_new_header_to_download(), None);
// when we actually need a new header
eth_sync.target_header_number = Some(101);
eth_sync.source_best_number = Some(101);
assert_eq!(eth_sync.select_new_header_to_download(), Some(101));
// when there are too many headers scheduled for submitting
@@ -216,18 +265,18 @@ mod tests {
eth_sync.params.max_headers_in_submitted_status = 1;
// ethereum reports best header #102
eth_sync.ethereum_best_header_number_response(102);
eth_sync.source_best_header_number_response(102);
// substrate reports that it is at block #100
eth_sync.substrate_best_header_response(id(100));
eth_sync.target_best_header_response(id(100));
// block #101 is downloaded first
assert_eq!(eth_sync.select_new_header_to_download(), Some(101));
eth_sync.headers.header_response(header(101).header().clone());
// now header #101 is ready to be submitted
assert_eq!(eth_sync.headers.header(HeaderStatus::MaybeReceipts), Some(&header(101)));
eth_sync.headers.maybe_receipts_response(&id(101), false);
assert_eq!(eth_sync.headers.header(HeaderStatus::MaybeExtra), Some(&header(101)));
eth_sync.headers.maybe_extra_response(&id(101), false);
assert_eq!(eth_sync.headers.header(HeaderStatus::Ready), Some(&header(101)));
assert_eq!(eth_sync.select_headers_to_submit(false), Some(vec![&header(101)]));
@@ -240,20 +289,20 @@ mod tests {
// we have nothing to submit because previous header hasn't been confirmed yet
// (and we allow max 1 submit transaction in the wild)
assert_eq!(eth_sync.headers.header(HeaderStatus::MaybeReceipts), Some(&header(102)));
eth_sync.headers.maybe_receipts_response(&id(102), false);
assert_eq!(eth_sync.headers.header(HeaderStatus::MaybeExtra), Some(&header(102)));
eth_sync.headers.maybe_extra_response(&id(102), false);
assert_eq!(eth_sync.headers.header(HeaderStatus::Ready), Some(&header(102)));
assert_eq!(eth_sync.select_headers_to_submit(false), None);
// substrate reports that it has imported block #101
eth_sync.substrate_best_header_response(id(101));
eth_sync.target_best_header_response(id(101));
// and we are ready to submit #102
assert_eq!(eth_sync.select_headers_to_submit(false), Some(vec![&header(102)]));
eth_sync.headers.headers_submitted(vec![id(102)]);
// substrate reports that it has imported block #102
eth_sync.substrate_best_header_response(id(102));
eth_sync.target_best_header_response(id(102));
// and we have nothing to download
assert_eq!(eth_sync.select_new_header_to_download(), None);
@@ -264,10 +313,10 @@ mod tests {
let mut eth_sync = HeadersSync::new(Default::default());
// ethereum reports best header #102
eth_sync.ethereum_best_header_number_response(102);
eth_sync.source_best_header_number_response(102);
// substrate reports that it is at block #100, but it isn't part of best chain
eth_sync.substrate_best_header_response(HeaderId(100, side_hash(100)));
eth_sync.target_best_header_response(HeaderId(100, side_hash(100)));
// block #101 is downloaded first
assert_eq!(eth_sync.select_new_header_to_download(), Some(101));
@@ -296,40 +345,40 @@ mod tests {
eth_sync.headers.maybe_orphan_response(&id(99), true);
// and we are ready to submit #100
assert_eq!(eth_sync.headers.header(HeaderStatus::MaybeReceipts), Some(&header(100)));
eth_sync.headers.maybe_receipts_response(&id(100), false);
assert_eq!(eth_sync.headers.header(HeaderStatus::MaybeExtra), Some(&header(100)));
eth_sync.headers.maybe_extra_response(&id(100), false);
assert_eq!(eth_sync.select_headers_to_submit(false), Some(vec![&header(100)]));
eth_sync.headers.headers_submitted(vec![id(100)]);
// and we are ready to submit #101
assert_eq!(eth_sync.headers.header(HeaderStatus::MaybeReceipts), Some(&header(101)));
eth_sync.headers.maybe_receipts_response(&id(101), false);
assert_eq!(eth_sync.headers.header(HeaderStatus::MaybeExtra), Some(&header(101)));
eth_sync.headers.maybe_extra_response(&id(101), false);
assert_eq!(eth_sync.select_headers_to_submit(false), Some(vec![&header(101)]));
eth_sync.headers.headers_submitted(vec![id(101)]);
}
#[test]
fn pruning_happens_on_substrate_best_header_response() {
let mut eth_sync = HeadersSync::new(Default::default());
fn pruning_happens_on_target_best_header_response() {
let mut eth_sync = HeadersSync::<EthereumHeadersSyncPipeline>::new(Default::default());
eth_sync.params.prune_depth = 50;
eth_sync.substrate_best_header_response(id(100));
eth_sync.target_best_header_response(id(100));
assert_eq!(eth_sync.headers.prune_border(), 50);
}
#[test]
fn only_submitting_headers_in_backup_mode_when_stalled() {
let mut eth_sync = HeadersSync::new(Default::default());
eth_sync.params.sub_tx_mode = SubstrateTransactionMode::Backup;
eth_sync.params.target_tx_mode = TargetTransactionMode::Backup;
// ethereum reports best header #102
eth_sync.ethereum_best_header_number_response(102);
eth_sync.source_best_header_number_response(102);
// substrate reports that it is at block #100
eth_sync.substrate_best_header_response(id(100));
eth_sync.target_best_header_response(id(100));
// block #101 is downloaded first
eth_sync.headers.header_response(header(101).header().clone());
eth_sync.headers.maybe_receipts_response(&id(101), false);
eth_sync.headers.maybe_extra_response(&id(101), false);
// ensure that headers are not submitted when sync is not stalled
assert_eq!(eth_sync.select_headers_to_submit(false), None);
+464
View File
@@ -0,0 +1,464 @@
// Copyright 2019-2020 Parity Technologies (UK) Ltd.
// This file is part of Parity Bridges Common.
// Parity Bridges Common is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Bridges Common is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
use crate::sync::HeadersSyncParams;
use crate::sync_types::{HeaderId, HeaderStatus, HeadersSyncPipeline, MaybeConnectionError, QueuedHeader};
use futures::{future::FutureExt, stream::StreamExt};
use num_traits::Saturating;
use std::future::Future;
/// When we submit headers to target node, but see no updates of best
/// source block known to target node during STALL_SYNC_TIMEOUT_MS milliseconds,
/// we consider that our headers are rejected because there has been reorg in target chain.
/// This reorg could invalidate our knowledge about sync process (i.e. we have asked if
/// HeaderA is known to target, but then reorg happened and the answer is different
/// now) => we need to reset sync.
/// The other option is to receive **EVERY** best target header and check if it is
/// direct child of previous best header. But: (1) subscription doesn't guarantee that
/// the subscriber will receive every best header (2) reorg won't always lead to sync
/// stall and restart is a heavy operation (we forget all in-memory headers).
const STALL_SYNC_TIMEOUT_MS: u64 = 30_000;
/// Delay (in milliseconds) after we have seen update of best source header at target node,
/// for us to treat sync stalled. ONLY when relay operates in backup mode.
const BACKUP_STALL_SYNC_TIMEOUT_MS: u64 = 5 * 60_000;
/// Delay (in milliseconds) after connection-related error happened before we'll try
/// reconnection again.
const CONNECTION_ERROR_DELAY_MS: u64 = 10_000;
/// Source client trait.
pub trait SourceClient<P: HeadersSyncPipeline>: Sized {
/// Type of error this clients returns.
type Error: std::fmt::Debug + MaybeConnectionError;
/// Future that returns best block number.
type BestBlockNumberFuture: Future<Output = (Self, Result<P::Number, Self::Error>)>;
/// Future that returns header by hash.
type HeaderByHashFuture: Future<Output = (Self, Result<P::Header, Self::Error>)>;
/// Future that returns header by number.
type HeaderByNumberFuture: Future<Output = (Self, Result<P::Header, Self::Error>)>;
/// Future that returns extra data associated with header.
type HeaderExtraFuture: Future<Output = (Self, Result<(HeaderId<P::Hash, P::Number>, P::Extra), Self::Error>)>;
/// Get best block number.
fn best_block_number(self) -> Self::BestBlockNumberFuture;
/// Get header by hash.
fn header_by_hash(self, hash: P::Hash) -> Self::HeaderByHashFuture;
/// Get canonical header by number.
fn header_by_number(self, number: P::Number) -> Self::HeaderByNumberFuture;
/// Get extra data by header hash.
fn header_extra(self, id: HeaderId<P::Hash, P::Number>, header: &P::Header) -> Self::HeaderExtraFuture;
}
/// Target client trait.
pub trait TargetClient<P: HeadersSyncPipeline>: Sized {
/// Type of error this clients returns.
type Error: std::fmt::Debug + MaybeConnectionError;
/// Future that returns best header id.
type BestHeaderIdFuture: Future<Output = (Self, Result<HeaderId<P::Hash, P::Number>, Self::Error>)>;
/// Future that returns known header check result.
type IsKnownHeaderFuture: Future<Output = (Self, Result<(HeaderId<P::Hash, P::Number>, bool), Self::Error>)>;
/// Future that returns extra check result.
type RequiresExtraFuture: Future<Output = (Self, Result<(HeaderId<P::Hash, P::Number>, bool), Self::Error>)>;
/// Future that returns header submission result.
type SubmitHeadersFuture: Future<Output = (Self, Result<Vec<HeaderId<P::Hash, P::Number>>, Self::Error>)>;
/// Returns ID of best header known to the target node.
fn best_header_id(self) -> Self::BestHeaderIdFuture;
/// Returns true if header is known to the target node.
fn is_known_header(self, id: HeaderId<P::Hash, P::Number>) -> Self::IsKnownHeaderFuture;
/// Returns true if header requires extra data to be submitted.
fn requires_extra(self, header: &QueuedHeader<P>) -> Self::RequiresExtraFuture;
/// Submit headers.
fn submit_headers(self, headers: Vec<QueuedHeader<P>>) -> Self::SubmitHeadersFuture;
}
/// Run headers synchronization.
pub fn run<P: HeadersSyncPipeline>(
source_client: impl SourceClient<P>,
source_tick_ms: u64,
target_client: impl TargetClient<P>,
target_tick_ms: u64,
sync_params: HeadersSyncParams,
) {
let mut local_pool = futures::executor::LocalPool::new();
let mut progress_context = (std::time::Instant::now(), None, None);
local_pool.run_until(async move {
let mut sync = crate::sync::HeadersSync::<P>::new(sync_params);
let mut stall_countdown = None;
let mut last_update_time = std::time::Instant::now();
let mut source_maybe_client = None;
let mut source_best_block_number_required = false;
let source_best_block_number_future = source_client.best_block_number().fuse();
let source_new_header_future = futures::future::Fuse::terminated();
let source_orphan_header_future = futures::future::Fuse::terminated();
let source_extra_future = futures::future::Fuse::terminated();
let source_go_offline_future = futures::future::Fuse::terminated();
let source_tick_stream = interval(source_tick_ms).fuse();
let mut target_maybe_client = None;
let mut target_best_block_required = false;
let target_best_block_future = target_client.best_header_id().fuse();
let target_extra_check_future = futures::future::Fuse::terminated();
let target_existence_status_future = futures::future::Fuse::terminated();
let target_submit_header_future = futures::future::Fuse::terminated();
let target_go_offline_future = futures::future::Fuse::terminated();
let target_tick_stream = interval(target_tick_ms).fuse();
futures::pin_mut!(
source_best_block_number_future,
source_new_header_future,
source_orphan_header_future,
source_extra_future,
source_go_offline_future,
source_tick_stream,
target_best_block_future,
target_extra_check_future,
target_existence_status_future,
target_submit_header_future,
target_go_offline_future,
target_tick_stream
);
loop {
futures::select! {
(source_client, source_best_block_number) = source_best_block_number_future => {
source_best_block_number_required = false;
process_future_result(
&mut source_maybe_client,
source_client,
source_best_block_number,
|source_best_block_number| sync.source_best_header_number_response(source_best_block_number),
&mut source_go_offline_future,
|source_client| delay(CONNECTION_ERROR_DELAY_MS, source_client),
|| format!("Error retrieving best header number from {}", P::SOURCE_NAME),
);
},
(source_client, source_new_header) = source_new_header_future => {
process_future_result(
&mut source_maybe_client,
source_client,
source_new_header,
|source_new_header| sync.headers_mut().header_response(source_new_header),
&mut source_go_offline_future,
|source_client| delay(CONNECTION_ERROR_DELAY_MS, source_client),
|| format!("Error retrieving header from {} node", P::SOURCE_NAME),
);
},
(source_client, source_orphan_header) = source_orphan_header_future => {
process_future_result(
&mut source_maybe_client,
source_client,
source_orphan_header,
|source_orphan_header| sync.headers_mut().header_response(source_orphan_header),
&mut source_go_offline_future,
|source_client| delay(CONNECTION_ERROR_DELAY_MS, source_client),
|| format!("Error retrieving orphan header from {} node", P::SOURCE_NAME),
);
},
(source_client, source_extra) = source_extra_future => {
process_future_result(
&mut source_maybe_client,
source_client,
source_extra,
|(header, extra)| sync.headers_mut().extra_response(&header, extra),
&mut source_go_offline_future,
|source_client| delay(CONNECTION_ERROR_DELAY_MS, source_client),
|| format!("Error retrieving extra data from {} node", P::SOURCE_NAME),
);
},
source_client = source_go_offline_future => {
source_maybe_client = Some(source_client);
},
_ = source_tick_stream.next() => {
if sync.is_almost_synced() {
source_best_block_number_required = true;
}
},
(target_client, target_best_block) = target_best_block_future => {
target_best_block_required = false;
process_future_result(
&mut target_maybe_client,
target_client,
target_best_block,
|target_best_block| {
let head_updated = sync.target_best_header_response(target_best_block);
if head_updated {
last_update_time = std::time::Instant::now();
}
match head_updated {
// IF head is updated AND there are still our transactions:
// => restart stall countdown timer
true if sync.headers().headers_in_status(HeaderStatus::Submitted) != 0 =>
stall_countdown = Some(std::time::Instant::now()),
// IF head is updated AND there are no our transactions:
// => stop stall countdown timer
true => stall_countdown = None,
// IF head is not updated AND stall countdown is not yet completed
// => do nothing
false if stall_countdown
.map(|stall_countdown| std::time::Instant::now() - stall_countdown <
std::time::Duration::from_millis(STALL_SYNC_TIMEOUT_MS))
.unwrap_or(true)
=> (),
// IF head is not updated AND stall countdown has completed
// => restart sync
false => {
log::info!(
target: "bridge",
"Possible {} fork detected. Restarting {} headers synchronization.",
P::TARGET_NAME,
P::SOURCE_NAME,
);
stall_countdown = None;
sync.restart();
},
}
},
&mut target_go_offline_future,
|target_client| delay(CONNECTION_ERROR_DELAY_MS, target_client),
|| format!("Error retrieving best known header from {} node", P::TARGET_NAME),
);
},
(target_client, target_existence_status) = target_existence_status_future => {
process_future_result(
&mut target_maybe_client,
target_client,
target_existence_status,
|(target_header, target_existence_status)| sync
.headers_mut()
.maybe_orphan_response(&target_header, target_existence_status),
&mut target_go_offline_future,
|target_client| delay(CONNECTION_ERROR_DELAY_MS, target_client),
|| format!("Error retrieving existence status from {} node", P::TARGET_NAME),
);
},
(target_client, target_submit_header_result) = target_submit_header_future => {
process_future_result(
&mut target_maybe_client,
target_client,
target_submit_header_result,
|submitted_headers| sync.headers_mut().headers_submitted(submitted_headers),
&mut target_go_offline_future,
|target_client| delay(CONNECTION_ERROR_DELAY_MS, target_client),
|| format!("Error submitting headers to {} node", P::TARGET_NAME),
);
},
(target_client, target_extra_check_result) = target_extra_check_future => {
process_future_result(
&mut target_maybe_client,
target_client,
target_extra_check_result,
|(header, extra_check_result)| sync
.headers_mut()
.maybe_extra_response(&header, extra_check_result),
&mut target_go_offline_future,
|target_client| delay(CONNECTION_ERROR_DELAY_MS, target_client),
|| format!("Error retrieving receipts requirement from {} node", P::TARGET_NAME),
);
},
target_client = target_go_offline_future => {
target_maybe_client = Some(target_client);
},
_ = target_tick_stream.next() => {
target_best_block_required = true;
},
}
// print progress
progress_context = print_sync_progress(progress_context, &sync);
// if target client is available: wait, or call required target methods
if let Some(target_client) = target_maybe_client.take() {
// the priority is to:
// 1) get best block - it stops us from downloading/submitting new blocks + we call it rarely;
// 2) check if we need extra data from source - it stops us from downloading/submitting new blocks;
// 3) check existence - it stops us from submitting new blocks;
// 4) submit header
if target_best_block_required {
log::debug!(target: "bridge", "Asking {} about best block", P::TARGET_NAME);
target_best_block_future.set(target_client.best_header_id().fuse());
} else if let Some(header) = sync.headers().header(HeaderStatus::MaybeExtra) {
log::debug!(
target: "bridge",
"Checking if header submission requires extra: {:?}",
header.id(),
);
target_extra_check_future.set(target_client.requires_extra(header).fuse());
} else if let Some(header) = sync.headers().header(HeaderStatus::MaybeOrphan) {
// for MaybeOrphan we actually ask for parent' header existence
let parent_id = header.parent_id();
log::debug!(
target: "bridge",
"Asking {} node for existence of: {:?}",
P::TARGET_NAME,
parent_id,
);
target_existence_status_future.set(target_client.is_known_header(parent_id).fuse());
} else if let Some(headers) = sync.select_headers_to_submit(
last_update_time.elapsed() > std::time::Duration::from_millis(BACKUP_STALL_SYNC_TIMEOUT_MS),
) {
let ids = match headers.len() {
1 => format!("{:?}", headers[0].id()),
2 => format!("[{:?}, {:?}]", headers[0].id(), headers[1].id()),
len => format!("[{:?} ... {:?}]", headers[0].id(), headers[len - 1].id()),
};
log::debug!(
target: "bridge",
"Submitting {} header(s) to {} node: {:?}",
headers.len(),
P::TARGET_NAME,
ids,
);
let headers = headers.into_iter().cloned().collect();
target_submit_header_future.set(target_client.submit_headers(headers).fuse());
// remember that we have submitted some headers
if stall_countdown.is_none() {
stall_countdown = Some(std::time::Instant::now());
}
} else {
target_maybe_client = Some(target_client);
}
}
// if source client is available: wait, or call required source methods
if let Some(source_client) = source_maybe_client.take() {
// the priority is to:
// 1) get best block - it stops us from downloading new blocks + we call it rarely;
// 2) download extra data - it stops us from submitting new blocks;
// 3) download missing headers - it stops us from downloading/submitting new blocks;
// 4) downloading new headers
if source_best_block_number_required {
log::debug!(target: "bridge", "Asking {} node about best block number", P::SOURCE_NAME);
source_best_block_number_future.set(source_client.best_block_number().fuse());
} else if let Some(header) = sync.headers().header(HeaderStatus::Extra) {
let id = header.id();
log::debug!(
target: "bridge",
"Retrieving extra data for header: {:?}",
id,
);
source_extra_future.set(source_client.header_extra(id, header.header()).fuse());
} else if let Some(header) = sync.headers().header(HeaderStatus::Orphan) {
// for Orphan we actually ask for parent' header
let parent_id = header.parent_id();
log::debug!(
target: "bridge",
"Going to download orphan header from {} node: {:?}",
P::SOURCE_NAME,
parent_id,
);
source_orphan_header_future.set(source_client.header_by_hash(parent_id.1).fuse());
} else if let Some(id) = sync.select_new_header_to_download() {
log::debug!(
target: "bridge",
"Going to download new header from {} node: {:?}",
P::SOURCE_NAME,
id,
);
source_new_header_future.set(source_client.header_by_number(id).fuse());
} else {
source_maybe_client = Some(source_client);
}
}
}
});
}
/// Future that resolves into given value after given timeout.
async fn delay<T>(timeout_ms: u64, retval: T) -> T {
async_std::task::sleep(std::time::Duration::from_millis(timeout_ms)).await;
retval
}
/// Stream that emits item every `timeout_ms` milliseconds.
fn interval(timeout_ms: u64) -> impl futures::Stream<Item = ()> {
futures::stream::unfold((), move |_| async move {
delay(timeout_ms, ()).await;
Some(((), ()))
})
}
/// Process result of the future that may have been caused by connection failure.
fn process_future_result<TClient, TResult, TError, TGoOfflineFuture>(
maybe_client: &mut Option<TClient>,
client: TClient,
result: Result<TResult, TError>,
on_success: impl FnOnce(TResult),
go_offline_future: &mut std::pin::Pin<&mut futures::future::Fuse<TGoOfflineFuture>>,
go_offline: impl FnOnce(TClient) -> TGoOfflineFuture,
error_pattern: impl FnOnce() -> String,
) where
TError: std::fmt::Debug + MaybeConnectionError,
TGoOfflineFuture: FutureExt,
{
match result {
Ok(result) => {
*maybe_client = Some(client);
on_success(result);
}
Err(error) => {
if error.is_connection_error() {
go_offline_future.set(go_offline(client).fuse());
} else {
*maybe_client = Some(client);
}
log::error!(target: "bridge", "{}: {:?}", error_pattern(), error);
}
}
}
/// Print synchronization progress.
fn print_sync_progress<P: HeadersSyncPipeline>(
progress_context: (std::time::Instant, Option<P::Number>, Option<P::Number>),
eth_sync: &crate::sync::HeadersSync<P>,
) -> (std::time::Instant, Option<P::Number>, Option<P::Number>) {
let (prev_time, prev_best_header, prev_target_header) = progress_context;
let now_time = std::time::Instant::now();
let (now_best_header, now_target_header) = eth_sync.status();
let need_update = now_time - prev_time > std::time::Duration::from_secs(10)
|| match (prev_best_header, now_best_header) {
(Some(prev_best_header), Some(now_best_header)) => {
now_best_header.0.saturating_sub(prev_best_header) > 10.into()
}
_ => false,
};
if !need_update {
return (prev_time, prev_best_header, prev_target_header);
}
log::info!(
target: "bridge",
"Synced {:?} of {:?} headers",
now_best_header.map(|id| id.0),
now_target_header,
);
(now_time, now_best_header.clone().map(|id| id.0), *now_target_header)
}
+130
View File
@@ -0,0 +1,130 @@
// Copyright 2019-2020 Parity Technologies (UK) Ltd.
// This file is part of Parity Bridges Common.
// Parity Bridges Common is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Bridges Common is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
/// Ethereum header Id.
#[derive(Debug, Clone, Copy, PartialEq)]
pub struct HeaderId<Hash, Number>(pub Number, pub Hash);
/// Ethereum header synchronization status.
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum HeaderStatus {
/// Header is unknown.
Unknown,
/// Header is in MaybeOrphan queue.
MaybeOrphan,
/// Header is in Orphan queue.
Orphan,
/// Header is in MaybeExtra queue.
MaybeExtra,
/// Header is in Extra queue.
Extra,
/// Header is in Ready queue.
Ready,
/// Header has been recently submitted to the target node.
Submitted,
/// Header is known to the target node.
Synced,
}
/// Error type that can signal connection errors.
pub trait MaybeConnectionError {
/// Returns true if error (maybe) represents connection error.
fn is_connection_error(&self) -> bool;
}
/// Headers synchronization pipeline.
pub trait HeadersSyncPipeline: Clone + Copy {
/// Name of the headers source.
const SOURCE_NAME: &'static str;
/// Name of the headers target.
const TARGET_NAME: &'static str;
/// Headers we're syncing are identified by this hash.
type Hash: Eq + Clone + Copy + std::fmt::Debug + std::fmt::Display + std::hash::Hash;
/// Headers we're syncing are identified by this number.
type Number: From<u32>
+ Ord
+ Clone
+ Copy
+ std::fmt::Debug
+ std::fmt::Display
+ std::ops::Add<Output = Self::Number>
+ std::ops::Sub<Output = Self::Number>
+ num_traits::Saturating
+ num_traits::Zero
+ num_traits::One;
/// Type of header that we're syncing.
type Header: Clone + std::fmt::Debug + SourceHeader<Self::Hash, Self::Number>;
/// Type of extra data for the header that we're receiving from the source node.
type Extra: Clone + std::fmt::Debug;
/// Function used to convert from queued header to target header.
fn estimate_size(source: &QueuedHeader<Self>) -> usize;
}
/// Header that we're receiving from source node.
pub trait SourceHeader<Hash, Number> {
/// Returns ID of header.
fn id(&self) -> HeaderId<Hash, Number>;
/// Returns ID of parent header.
fn parent_id(&self) -> HeaderId<Hash, Number>;
}
/// Header how it's stored in the synchronization queue.
#[derive(Clone, Debug, Default)]
#[cfg_attr(test, derive(PartialEq))]
pub struct QueuedHeader<P: HeadersSyncPipeline> {
header: P::Header,
extra: Option<P::Extra>,
}
impl<P: HeadersSyncPipeline> QueuedHeader<P> {
/// Creates new queued header.
pub fn new(header: P::Header) -> Self {
QueuedHeader { header, extra: None }
}
/// Returns ID of header.
pub fn id(&self) -> HeaderId<P::Hash, P::Number> {
self.header.id()
}
/// Returns ID of parent header.
pub fn parent_id(&self) -> HeaderId<P::Hash, P::Number> {
self.header.parent_id()
}
/// Returns reference to header.
pub fn header(&self) -> &P::Header {
&self.header
}
/// Returns reference to associated extra data.
pub fn extra(&self) -> &Option<P::Extra> {
&self.extra
}
/// Extract header and extra from self.
pub fn extract(self) -> (P::Header, Option<P::Extra>) {
(self.header, self.extra)
}
/// Set associated extra data.
pub fn set_extra(mut self, extra: P::Extra) -> Self {
self.extra = Some(extra);
self
}
}